10. Data Push

 Documentation Summary
 Page Summary

Client-server data synchronization

In classic Flex applications using remoting, data is updated only when the user does an action that triggers a call to the server. As it is possible to do many things purely on the client without involving the server at all, that can lead to stale client state if someone else has modified something between updates.
Optimistic locking ensures that the data will keep consistent on the server and in the database, but it would be better if data updates were pushed in real-time to all connected clients.

Tide makes this possible by integrating with the JPA provider and the Gravity messaging broker to dispatch data updates to subscribed clients.

This requires a bit of configuration :

  • Define a Gravity topic
  • Add the Tide JPA listener DataPublishListener on entities that should be tracked
  • Add the Tide annotation DataEnabled on all server components involved in modifications of these data
  • Subscribe to the topic on the client with the DataObserver component

We are going to see all this in details :

Define a Gravity topic

In the general case, it can be done in services-config.xml :

services-config.xml
<service id="gravity-service"
    class="flex.messaging.services.MessagingService"
    messageTypes="flex.messaging.messages.AsyncMessage">
    <adapters>
        <adapter-definition id="simple" 
            class="org.granite.gravity.adapters.SimpleServiceAdapter"/>
    </adapters>

    <destination id="dataTopic">
        <properties>
            <no-local>true</no-local>
            <session-selector>true</session-selector>
        </properties>
        <channels>
            <channel ref="gravityamf"/>
        </channels>
        <adapter ref="simple"/>
    </destination>
</service>
...
<channel-definition id="gravityamf" class="org.granite.gravity.channels.GravityChannel">
    <endpoint
        uri="http://{server.name}:{server.port}/{context.root}/gravityamf/amf"
        class="flex.messaging.endpoints.AMFEndpoint"/>
</channel-definition>

With Spring or Seam, this can be done in the respective configuration files applicationContext.xml or components.xml :

Spring applicationContext.xml
<graniteds:messaging-destination id="dataTopic" no-local="true" session-selector="true"/>
Seam components.xml
<graniteds:messaging-destination name="dataTopic" no-local="true" session-selector="true"/>

This example configuration defines a simple Gravity destination but it's also possible to use the JMS, ActiveMQ or any custom adapter if you need transactional behaviour or better scalabilty.

The two important parameters for the topic definition are :

  • no-local must be true. In fact it is not critical if it's not but that means that the client that triggers the modification will received the result of the update twice : first by the remoting call, then by the messaging update.
  • session-selector must be true. Tide uses JMS-style selectors to determine which data will be sent to which clients and thus needs to store the current messaging selector state in the session.

Add the Tide JPA publishing listener

@Entity
@EntityListeners({DataPublishListener.class})
public abstract class MyEntity {
    ...	
}

Add the Tide data annotation on all services

Example with Spring
@DataEnabled(topic="dataTopic", params=ObserveAllPublishAll.class, 
  publish=PublishMode.ON_SUCCESS)
public interface MyService {
    ...
}

It's generally recommended to put the annotation on the service interface but it can also work when defined on the service implementation. Note that even services that only read data should be annotated this @DataEnabled because they also participate in the building of the message selector.

The attributes of this annotations are :

  • topic: the name of the messaging topic that will be used to dispatch updates. Obviously this is the one we declared just before in services-config.xml.
  • params: a filter class that will define to which updates are sent to which clients. If you don't want any filtering, you can use the following class, otherwise see below for a more detailed explanation.
    ObserveAllPublishAll.java
    public class ObserveAllPublishAll implements DataTopicParams {
    
        @Override
        public void observes(DataObserveParams params) {
        }
    
        @Override
        public void publishes(DataPublishParams params, Object entity) {
        }
    }
    
  • publish: the publishing mode. PublishMode.MANUAL means that you will have to trigger the dispatch manually, for example in an interceptor. PublishMode.ON_SUCCESS means that Tide will automatically dispatch the updates on any successful call. PublishMode.ON_COMMIT is not yet implemented.

Publishing filters

It is possible to tell the Tide engine how it should dispatch each update (i.e. to which clients). It works in two phases : at each remote call from a client, Tide calls the observes method of the params class and builds the current message selector. Next at each update it calls publishes to set the message headers that will be filtered by the selector. Let's see it on an example to be more clear :

public class AddressBookParams implements DataTopicParams {
	
    public void observes(DataObserveParams params) {
        params.addValue("user", Identity.instance().getCredentials().getUsername());
        params.addValue("user", "__public__");
    }
	
    public void publishes(DataPublishParams params, Object entity) {
        if (((AbstractEntity)entity).isRestricted())
            params.setValue("user", ((AbstractEntity)entity).getCreatedBy());
        else
            params.setValue("user", "__public__");
    }
}

The observes method here add two values to the current selector: the current user name (here retrieved by Seam Identity but could be any other means) and the value __public__. From these values Tide will define a message selector (user = 'username' OR user = '__public__') meaning that we only want to be notified of updates concerning public data or data that we own.

During the publishing phase, Tide will call publishes for each updated entity and build the message headers with the provided values. In the example, an update message will have a user header with either __public__ or the entity owner for restricted data. These headers are then matched with the current message selector for each subscribed client.

Here we have used only one header parameter but it's possible to define as many as you want. Just take care that the match between observed and published values can become very complex and difficult to predict with too many criteria. When having many header values, the resulting selector is a AND of all criteria :

public void observes(DataObserveParams params) {
    params.addValue("user", Identity.instance().getCredentials().getUsername());
    params.addValue("user", "__public__");
    params.addValue("group", "admin");
    params.addValue("group", "superadmin");
}

Will generate this selector :

(user = 'username' OR user = '__public__') AND (group = 'admin' OR group = 'superadmin')

Manual publishing

Even if automatic publishing with PublishMode.ON_SUCCESS is the easiest to setup, it is important to take care that is not transactional, meaning that it will dispatch updates even if the transaction has not been committed but the remote call was successful.
If you need real transactional behaviour, it is more suitable to use manual publishing.

If your use JMS as transport, the better way to publish updates is to define a transactional topic and dispatch the updates manually inside the current transaction with an interceptor :

<destination id="dataTopic">
    <properties>
        <jms>
            <destination-type>Topic</destination-type>
            <connection-factory>ConnectionFactory</connection-factory>
            <destination-jndi-name>topic/dataTopic</destination-jndi-name>
            <destination-name>dataTopic</destination-name>
            <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
            <transacted-sessions>true</transacted-sessions>
            <no-local>true</no-local>
        </jms>
        <no-local>true</no-local>
        <session-selector>true</session-selector>
    </properties>
    <channels>
        <channel ref="gravityamf"/>
    </channels>
    <adapter ref="jms"/>
</destination>
@Interceptor
public class DataPublishInterceptor {

    @AroundInvoke
    public Object aroundInvoke(InvocationContext invocation) throws Exception {
        Object result = invocation.proceed();
        DataContext.publish(PublishMode.MANUAL);
        return result;
    }
}

Alternatively you can use a framework-dependent synchronization to trigger the update on transaction commit, for example with EJB3 :

EJB3
public class MyServiceImpl implements MyService, SessionSynchronization {
    ...

    public void afterBegin() {
    }
   
    public void beforeCompletion() throws EJBException, RemoteException {
    }
   
    public void afterCompletion(boolean success) throws EJBException, RemoteException {
        if (success)
            DataContext.publish(PublishMode.MANUAL);
    }
}

Or with a Seam transactional event :

@Interceptor(within={TransactionInterceptor.class})
public class DataPublishInterceptor extends AbstractInterceptor {

    @AroundInvoke
    public Object aroundInvoke(InvocationContext invocation) throws Exception {
        Events.raiseTransactionSuccessEvent("my.app.graniteds.dispatchUpdate");
        return invocation.proceed();
    }
}

@Name("dataPublisher")
public class DataPublisher {
    @Observer("my.app.graniteds.dispatchUpdate")
    public void dispatchUpdate() {
        DataContext.publish(PublishMode.MANUAL);
    }
}

Or with the JTA API :

@Interceptor
public class DataPublishInterceptor implements Synchronization {

    @AroundInvoke
    public Object aroundInvoke(InvocationContext invocation) throws Exception {
        InitialContext ic = new InitialContext();
        TransactionManager tm = (TransactionManager)ic.lookup("TransactionManager");
        Transaction t = tm.getTransaction();
        t.registerSynchronization(this);
        return invocation.proceed();
    }

    public void beforeCompletion() {
    }

    public void afterCompletion(int status) {
        if (status == Status.STATUS_COMMITTED)
            DataContext.publish(PublishMode.MANUAL);
    }
}

Comments

Anonymous says:

hi, i need an example with more details about Manual publishing,this is too vague!!
I don't now how use the interceptor.
Is there anybody here who can help me????????

Anonymous says:

It's not easy to understand the manual data push.
Would be so helpfull a full example of it.


Browse Space

- Pages
- Blog
- Labels
- Attachments
- Bookmarks
- Mail
- Advanced

Explore Confluence

- Popular Labels
- Notation Guide

Your Account

Log In

Other Features

Add Content


Copyright © 2011 Granite Data Services S.A.S. All Rights Reserved.