Обмен сообщениями — чрезвычайно мощный инструмент для построения распределенных программных систем различного уровня. Как правило, по крайней мере в экосистеме Java, клиент (клиент) никогда не взаимодействует напрямую с брокером сообщений (или не обменивается ими), а делает это, вызывая сервисы на стороне сервера (сервер). Или клиент может даже не знать, что существует решение для обмена сообщениями.
С ростом популярности Websockets и широкой поддержкой текстовых протоколов, таких как STOMP (используемых для связи с посредником или обменом сообщениями), все будет иметь значение. В сегодняшнем посте мы попытаемся объяснить, насколько просто представить две очень популярные реализации JMS , Apache ActiveMQ и JBoss HornetQ , которые доступны для веб-интерфейса (JavaScript) с использованием STOMP через Websockets .
Прежде чем копаться в коде, можно возразить, что это не очень хорошая идея. Так в чем же цель? Ответ действительно зависит:
- Вы разрабатываете прототип / подтверждение концепции и вам нужен простой способ интеграции публикации / подписки или обмена сообщениями между пользователями.
- Вы не хотите / не должны создавать сложную архитектуру, и достаточно простого решения, которое работает
Масштабируемость, отказоустойчивость и многие другие очень важные решения здесь не принимаются во внимание, но, безусловно, это следует делать, если вы разрабатываете надежную и отказоустойчивую архитектуру.
Итак, начнем. Как всегда, лучше начать с проблемы, которую мы пытаемся решить: мы хотели бы разработать простое решение публикации / подписки, где веб-клиент, написанный на JavaScript, сможет отправлять сообщения и прослушивать определенную тему. Всякий раз, когда какое-либо сообщение было получено, клиент просто показывает простое окно предупреждения. Обратите внимание, что нам нужно использовать современный браузер с поддержкой веб-сокетов , например, Google Chrome или Mozilla Firefox .
Для обоих наших примеров клиентский код остается тем же самым, и поэтому давайте начнем с этого. Хорошей отправной точкой является статья STOMP Over WebSocket, в которой представлен модуль stomp.js, а вот наш index.html :
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 | <script src="stomp.js"></script><script type="text/javascript"> var client = Stomp.client( "ws://localhost:61614/stomp", "v11.stomp"); client.connect( "", "",  function() {      client.subscribe("jms.topic.test",       function( message ) {           alert( message );          },     { priority: 9}       );   client.send("jms.topic.test", { priority: 9}, "Pub/Sub over STOMP!");  } );</script> | 
Чрезвычайно простой код, но некоторые детали заслуживают объяснения. Во-первых, мы ищем конечную точку Websockets на ws: // localhost: 61614 / stomp . Этого достаточно для локального развертывания, но лучше заменить localhost реальным IP-адресом или именем хоста. Во-вторых, после подключения клиент подписывается на тему (интересуются только сообщениями с приоритетом: 9 ) и сразу после этого публикует сообщение в этой теме. С точки зрения клиента, мы сделали.
Давайте перейдем к брокеру сообщений, и наш первый в списке — Apache ActiveMQ . Чтобы упростить пример, мы встроим брокер Apache ActiveMQ в простое приложение Spring без использования файлов конфигурации XML. Поскольку исходный код доступен на GitHub , я пропущу фрагмент файла POM и просто покажу код:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | packagecom.example.messaging;importjava.util.Collections;importorg.apache.activemq.broker.BrokerService;importorg.apache.activemq.broker.jmx.ManagementContext;importorg.apache.activemq.command.ActiveMQDestination;importorg.apache.activemq.command.ActiveMQTopic;importorg.apache.activemq.hooks.SpringContextHook;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassAppConfig {    @Bean( initMethod = "start", destroyMethod = "stop")    publicBrokerService broker() throwsException {        finalBrokerService broker = newBrokerService();            broker.addConnector( "ws://localhost:61614");         broker.setPersistent( false);        broker.setShutdownHooks( Collections.< Runnable >singletonList( newSpringContextHook() ) );        finalActiveMQTopic topic = newActiveMQTopic( "jms.topic.test");        broker.setDestinations( newActiveMQDestination[] { topic }  );        finalManagementContext managementContext = newManagementContext();        managementContext.setCreateConnector( true);        broker.setManagementContext( managementContext );        returnbroker;    }} | 
Как мы видим, брокер ActiveMQ настроен с соединителем ws: // localhost: 61614 , который предполагает использование протокола STOMP . Также мы создаем тему JMS с именем jms.topic.test и включаем инструментарий управления JMX. И запустить его, простой класс Starter :
| 01 02 03 04 05 06 07 08 09 10 | packagecom.example.messaging;importorg.springframework.context.ApplicationContext;importorg.springframework.context.annotation.AnnotationConfigApplicationContext;publicclassStarter  {    publicstaticvoidmain( String[] args ) {        ApplicationContext context = newAnnotationConfigApplicationContext( AppConfig.class);    }} | 
Теперь, когда он запущен и работает, давайте откроем файл index.html в браузере, мы должны увидеть что-то вроде этого:
Просто! Для любопытных читателей ActiveMQ использует Jetty 7.6.7.v20120910 для поддержки Websockets и не будет работать с последними дистрибутивами Jetty .
В дальнейшем, в отношении HornetQ реализации выглядят немного иначе, хотя и не очень сложно. Поскольку класс Starter остается прежним, единственным изменением является конфигурация:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | packagecom.example.hornetq;importjava.util.Collections;importjava.util.HashMap;importjava.util.Map;importorg.hornetq.api.core.TransportConfiguration;importorg.hornetq.core.config.impl.ConfigurationImpl;importorg.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;importorg.hornetq.core.remoting.impl.netty.TransportConstants;importorg.hornetq.core.server.JournalType;importorg.hornetq.jms.server.config.ConnectionFactoryConfiguration;importorg.hornetq.jms.server.config.JMSConfiguration;importorg.hornetq.jms.server.config.TopicConfiguration;importorg.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;importorg.hornetq.jms.server.config.impl.JMSConfigurationImpl;importorg.hornetq.jms.server.config.impl.TopicConfigurationImpl;importorg.hornetq.jms.server.embedded.EmbeddedJMS;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassAppConfig {    @Bean( initMethod = "start", destroyMethod = "stop")    publicEmbeddedJMS broker() throwsException {        finalConfigurationImpl configuration = newConfigurationImpl();        configuration.setPersistenceEnabled( false);        configuration.setJournalType( JournalType.NIO );        configuration.setJMXManagementEnabled( true);        configuration.setSecurityEnabled( false);        finalMap< String, Object > params = newHashMap<>();        params.put( TransportConstants.HOST_PROP_NAME, "localhost");        params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws");        params.put( TransportConstants.PORT_PROP_NAME, "61614");        finalTransportConfiguration stomp = newTransportConfiguration( NettyAcceptorFactory.class.getName(), params );        configuration.getAcceptorConfigurations().add( stomp );        configuration.getConnectorConfigurations().put( "stomp_ws", stomp );        finalConnectionFactoryConfiguration cfConfig = newConnectionFactoryConfigurationImpl( "cf", true, "/cf");        cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws") );        finalJMSConfiguration jmsConfig = newJMSConfigurationImpl();        jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );        finalTopicConfiguration topicConfig = newTopicConfigurationImpl( "test", "/topic/test");        jmsConfig.getTopicConfigurations().add( topicConfig );        finalEmbeddedJMS jmsServer = newEmbeddedJMS();        jmsServer.setConfiguration( configuration );        jmsServer.setJmsConfiguration( jmsConfig );        returnjmsServer;    }} | 
Полный исходный код находится на GitHub . После запуска класса Starter и открытия index.html в браузере мы должны увидеть очень похожие результаты:
Конфигурация HornetQ выглядит немного более многословно, однако никаких дополнительных зависимостей нет, кроме блестящей среды Netty .
Для собственного любопытства я заменил брокера ActiveMQ на реализацию Apollo . Хотя мне удалось заставить его работать должным образом, я нашел API очень громоздким, по крайней мере в текущей версии 1.6 , поэтому я не освещал его в этом посте.

