3. Data Push (Gravity)
Introduction
Granite Data Services provides a Data Push feature, code name Gravity, implemented as a Comet-like service with AMF3 data polling over HTTP (producer/consumer based architecture). This implementation is freely based on the Bayeux protocol specification (1.0draft1 at this time).
For a basic sample of GDS/Gravity, download graniteds-chat-1.2.0.zip and import it as a new Eclipse project.
Flex Classes & Sample Usage
GDS Data Push relies on two main AS3 classes on the Flex side: org.granite.gravity.Consumer and org.granite.gravity.Producer. These classes reproduce the original Adobe Flex Consumer and Producer almost exactly. The only differences are that you must use topic instead of subtopic due to a change introduced in Flex 3.
Here is a quick example of GDS Consumer/Producer usage:
... import org.granite.gravity.Consumer; import org.granite.gravity.Producer; ... private var consumer:Consumer = null; private var producer:Producer = null; private function connect():void { consumer = new Consumer(); consumer.destination = "gravity"; consumer.topic = "discussion"; consumer.subscribe(); consumer.addEventListener(MessageEvent.MESSAGE, messageHandler); producer = new Producer(); producer.destination = "gravity"; producer.topic = "discussion"; } private function disconnect():void { consumer.disconnect(); consumer = null; producer.disconnect(); producer = null; } private function messageHandler(event:MessageEvent):void { var msg:AsyncMessage = event.message as AsyncMessage; trace("Received message: " + (msg.body as String)); } private function send(message:String):void { var msg:AsyncMessage = new AsyncMessage(); msg.body = message; producer.send(msg); } ...
In this sample code, the producer sends String messages, which could of course be of any type, and the producer receives String messages as well. These Strings are sent in AsyncMessage envelopes, which is the only envelope type allowed in GDS.
Common Configuration
You need to configure a specific channel and destination as follows:
<services-config> <services> <service id="messaging-service" class="flex.messaging.services.MessagingService" messageTypes="flex.messaging.messages.AsyncMessage"> <adapters> <adapter-definition id="default" class="org.granite.gravity.adapters.SimpleServiceAdapter" default="true"/> </adapters> <destination id="gravity"> <channels> <channel ref="my-gravityamf"/> </channels> </destination> </service> </services> <channels> <channel-definition id="my-gravityamf" class="org.granite.gravity.channels.GravityChannel"> <endpoint uri="http://{server.name}:{server.port}/{context.root}/gravity/amf" class="flex.messaging.endpoints.AMFEndpoint"/> </channel-definition> </channels> </services-config>
Here, we define a GravityChannel (my-gravityamf) and we use it in the gravity destination. See above destination usage in Consumer/Producer usage.
The servlet listener is necessary to ensure proper shutdown of the Gravity services.
Three servlet parameters are shared by the Tomcat and Jetty implementations and related to channel cleanup and client reconnection attempts:
<web-app version="2.4" ...> ... <listener> <listener-class>org.granite.config.GraniteConfigListener</listener-class> </listener> <servlet> <servlet-name>GravityServlet</servlet-name> <servlet-class>org.granite.gravity.[...].Gravity[...]Servlet</servlet-class> <!-- Client idle timeout (default is 30mn): if the server has no incoming request after this maximum duration from a client, it will remove the channel (and subscriptions, etc.) Note: Even if the client does not send any messages, there are still incoming long-polling requests from the Consumer. As such, this timeout only appends with Producer-only applications or dead clients. <init-param> <param-name>clientTimeoutMs</param-name> <param-value>1800000</param-value> </init-param> --> <!-- Together with the next param, thoses values are sent to clients as advices about the interval between reconnection attempts and max reconnection attempts in case of server failure (such as webapp reload). Default is: 30s * 60 times (i.e., half an hour + (60 * browser specific connection timeout when it tries an unsuccessful request)). <init-param> <param-name>reconnectIntervalMs</param-name> <param-value>30000</param-value> </init-param> <init-param> <param-name>reconnectMaxAttemps</param-name> <param-value>60</param-value> </init-param> --> </servlet> <servlet-mapping> <servlet-name>GravityServlet</servlet-name> <url-pattern>/gravity/*</url-pattern> </servlet-mapping> ... </web-app>
Tomcat Specific Configuration
GDS Data Push for Tomcat relies on the org.apache.catalina.CometProcessor interface. In order to enable Comet support in Tomcat, you must configure an APR or NIO connector.
At least for now, APR is the simplest to configure and the most reliable. To configure APR, see documentation here. In Windows®, it's simply a matter of downloading a native dll and putting it in your WINDOWS/system32 directory – while other and better configurations are possible.
Here is a sample web configuration for Tomcat:
<web-app version="2.4" ...> ... <servlet> <servlet-name>GravityServlet</servlet-name> <servlet-class>org.granite.gravity.tomcat.GravityTomcatServlet</servlet-class> <!-- The number of threads to keep in the pool, even if they are idle (default is 5) <init-param> <param-name>OutgoingPool.corePoolSize</param-name> <param-value>5</param-value> </init-param> --> <!-- The maximum number of threads to allow in the pool (default is 20) <init-param> <param-name>OutgoingPool.maximumPoolSize</param-name> <param-value>20</param-value> </init-param> --> <!-- When the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating (default 10s) <init-param> <param-name>OutgoingPool.keepAliveTimeMillis</param-name> <param-value>10000</param-value> </init-param> --> <!-- The capacity of the thread pool queue (default is 2147483647 = Integer.MAX_VALUE) <init-param> <param-name>OutgoingPool.queueCapacity</param-name> <param-value>2147483647</param-value> </init-param> --> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>GravityServlet</servlet-name> <url-pattern>/gravity/*</url-pattern> </servlet-mapping> ... </web-app>
Options used in the GravityTomcatServlet configuration refer to the use of a java.util.concurrent.ThreadPoolExecutor and a java.util.concurrent.LinkedBlockingQueue instances for sending outgoing messages with multiple concurrent threads. Please see Java documentation for details.
JBoss/Tomcat Specific Configuration Tip
For JBoss 4.2.*, you must comment out a specific filter in the default global web.xml (<JBOSS_HOME>/server/default/deploy/jboss-web.deployer/conf/web.xml):
... <!-- Comment this out! <filter> <filter-name>CommonHeadersFilter</filter-name> <filter-class>org.jboss.web.tomcat.filters.ReplyHeaderFilter</filter-class> <init-param> <param-name>X-Powered-By</param-name> <param-value>...</param-value> </init-param> </filter> <filter-mapping> <filter-name>CommonHeadersFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> --> ...
See above for Tomcat configuration.
For JBoss 5+ servers, you must use a specific servlet. JBoss 5 implements its own version of Tomcat, named JBossWeb:
<web-app version="2.4" ...> ... <servlet> <servlet-name>GravityServlet</servlet-name> <servlet-class>org.granite.gravity.jbossweb.GravityJBossWebServlet</servlet-class> ... (see Tomcat configuration above for options) </servlet> ... </web-app>
Note that you do not need to comment out the CommonHeadersFilter with JBoss 5, but you still need to enable APR.
Jetty Specific Configuration
GDS Data Push for Jetty relies on the Jetty continuations implementation and is heavily inspired by the Jetty cometd server implementation with a AMF3 transport instead of a JSON transport. It is supported since Jetty 6.1.15+ and should not need any particular configuration of the Jetty server.
Here is a sample web configuration for Jetty:
<web-app version="2.4" ...> ... <servlet> <servlet-name>GravityServlet</servlet-name> <servlet-class>org.granite.gravity.jetty.GravityJettyServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>GravityServlet</servlet-name> <url-pattern>/gravity/*</url-pattern> </servlet-mapping> ... </web-app>
JMS Configuration & Usage
The JMS adapter configuration follows as closely as possible the standard Flex configuration for the JMS adapter. See here.
Here is a sample configuration for a default JBoss installation with a brief description of the different options:
<adapters> <adapter-definition id="jms" class="org.granite.gravity.adapters.JMSServiceAdapter"/> </adapters> <destination id="chat-jms"> <properties> <jms> <destination-type>Topic</destination-type> <!-- Optional: forces usage of simple text messages <message-type>javax.jms.TextMessage</message-type> --> <connection-factory>ConnectionFactory</connection-factory> <destination-jndi-name>topic/testTopic</destination-jndi-name> <destination-name>TestTopic</destination-name> <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode> <transacted-sessions>false</transacted-sessions> <!-- JNDI environment. Specify the external JNDI configuration to access a remote JMS provider. Sample for a remote JBoss server. --> <initial-context-environment> <property> <name>Context.SECURITY_PRINCIPAL</name> <value>guest</value> </property> <property> <name>Context.SECURITY_CREDENTIALS</name> <value>guest</value> </property> <property> <name>Context.PROVIDER_URL</name> <value>http://my.host.com:1099</value> </property> <property> <name>Context.INITIAL_CONTEXT_FACTORY</name> <value>org.jnp.interfaces.NamingContextFactory</value> </property> <property> <name>Context.URL_PKG_PREFIXES</name> <value>org.jboss.naming:org.jnp.interfaces</value> </property> </initial-context-environment> </jms> ... </properties> ... <adapter ref="jms"/> </destination>
Comments on configuration options:
- destination-type must be Topic for the moment. Queues may be supported later.
- message-type may be forced to simple text messages by specifying javax.jms.TextMessage.
If not specified, the messages will be Object messages containing the original message encoded in AMF3 format. This behavior ensures that AS3 objects exchanged between Flex clients are not deserialized on the server-side when they are just used by Flex clients, and therefore no Java implementation of the corresponding classes is required on the server. - connection-factory and destination-jndi-name are the JNDI names respectively of the JMS ConnectionFactory and of the JMS topic.
- destination-name is just a label but still required.
- acknowledge-mode can have the standard values accepted by any JMS provider: AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, and DUPS_OK_ACKNOWLEDGE.
- transacted-sessions allows the use of transactions in sessions when set to 'true'.
- initial-context-environment: The initial-context parameters allow to access a remote JMS server by setting the JNDI context options.
Embedded ActiveMQ Configuration
In the case of a simple Tomcat/Jetty installation, or to allow Flex-to-Flex interactions with advanced capabilities (durable messages), Gravity allows using an embedded ActiveMQ instance.
To enable ActiveMQ, just put the activemq-xx.jar in your WEB-INF/lib directory. The necessary topic will be lazily created on first use, except if the property create-broker is set to false. The uri of the created ActiveMQ broker will be vm://adapterId.
Here is a sample configuration:
<adapters> <adapter-definition id="activemq" class="org.granite.gravity.adapters.ActiveMQServiceAdapter"/> </adapters> <destination id="chat-activemq"> <properties> <jms> <destination-type>Topic</destination-type> <!-- Optional: forces usage of simple text messages <message-type>javax.jms.TextMessage</message-type> --> <connection-factory>ConnectionFactory</connection-factory> <destination-jndi-name>topic/testTopic</destination-jndi-name> <destination-name>TestTopic</destination-name> <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode> <transacted-sessions>false</transacted-sessions> </jms> <server> <durable>true</durable> <file-store-root>/var/activemq/data</file-store-root> <create-broker>true</create-broker> <wait-for-start>false</wait-for-start> </server> </properties> ... <adapter ref="activemq"/> </destination>
Comments on configuration options:
- Main parameters (<jms>...</jms>) are identical to those used in JMS configuration. See above.
- durable, if set to true, allows for durable messages, stored in the filesystem. The data store directory of ActiveMQ can be specified by the file-store-root parameter.
- create-broker is optional, as well as the dependant wait-for-start attribute. When create-broker is false, creation of the broker is not automatic and has to be done by the application itself. In this case, wait-for-start set to true tells the ActiveMQConnectionFactory to wait for the effective creation of the broker. Please refer to the ActiveMQ documentation for more details on these options.
Server to Client Messaging Sample with JMS
Sending messages from the server to Flex clients simply consists of sending JMS messages to the corresponding JMS topic. Text messages are received as simple text on the Flex side, object messages are serialized in AMF3 and deserialized and received as ActionScript3 objects. The Gravity messaging channel supports lazily loaded collections and objects, exactly as the Granite remoting channel.
@Stateless @Local(Test.class) public class TestBean implements Test { @Resource SessionContext ctx; @Resource(mappedName="java:/ConnectionFactory") ConnectionFactory jmsConnectionFactory; @Resource(mappedName="topic/testTopic") Topic jmsTopic; public TestBean() { super(); } public void notifyClient(Object object) { try { Connection connection = jmsConnectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); javax.jms.Message jmsMessage = session.createObjectMessage(person); MessageProducer producer = session.createProducer(jmsTopic); producer.send(jmsMessage); session.close(); connection.close(); } catch (Exception e) { log.error("Could not publish notification", e); } } }
@Stateless @Local(Test.class) @Name("test") public class TestBean implements Test { private static Logger log = Logger.getLogger(TestBean.class.getName()); @In private TopicPublisher testTopicPublisher; @In private TopicSession topicSession; public void notifyClient(Serializable object) { try { testTopicPublisher.publish(topicSession.createObjectMessage(object)); } catch (Exception e) { log.error("Could not publish notification", e); } } }
Server to Client Messaging Sample with Embedded ActiveMQ
The only difference with standard JMS is that you can get a ConnectionFactory more easily.
public class Test throws JMSException { // adapterId should be the id of the JMS adapter as defined in services-config.xml ConnectionFactory f = new ActiveMQConnectionFactory("vm://adapterId"); Connection connection = jmsConnectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); javax.jms.Message jmsMessage = session.createObjectMessage(person); MessageProducer producer = session.createProducer(jmsTopic); producer.send(jmsMessage); session.close(); connection.close(); }
Server to Client Messaging Sample with SimpleAdapter
If you use the SimpleAdapter, the message send will have to be done at a lower level and you will have a hard dependency on Gravity.
The first step is to get the Gravity object. It is set as an attribute named org.granite.gravity.Gravity in the ServletContext.
The second step is to create a Channel object, which mainly holds the clientId of the server publisher, and register it with the Gravity object.
Finally you can send messages of type flex.messaging.messages.Message by calling the method gravity.publish(publisherChannel, message);
Gravity gravity = (Gravity)servletContext.getAttribute("org.granite.gravity.Gravity"); AbstractChannel publisherChannel = new AbstractChannel(gravity) { @Override protected void clearQueue() { } @Override public void deliver( AbstractChannel from, Message message, String subscriptionId ) { } }; gravity.registerChannel(publisherChannel); AsyncMessage message = new AsyncMessage(); message.setDestination("my-gravity-destination"); message.setHeader(AsyncMessage.SUBTOPIC_HEADER, "my-topic"); message.setBody("Message content"); gravity.publishMessage(publisherChannel, message);
Here the example uses an anonymous inner Channel class for convenience, but it is better to define a normal class that you can reuse in different places. It is also recommended to store the Channel object somewhere and reuse it if you want to send many messages from the same server context.
Comments ( Hide | Add Comment )
Anonymous says:Feb 02, 2010 00:43 ( Permalink | ) |

The link to graniteds-chat-1.2.0.zip application is incorrect.