Обмен сообщениями — чрезвычайно мощный инструмент для построения распределенных программных систем различного уровня. Как правило, по крайней мере в экосистеме Java, клиент (клиент) никогда не взаимодействует напрямую с брокером сообщений (или не обменивается ими), а делает это, вызывая сервисы на стороне сервера (сервер). Или клиент может даже не знать, что существует решение для обмена сообщениями.
С ростом популярности Websockets и широкой поддержкой текстовых протоколов, таких как STOMP (используемых для связи с посредником или обменом сообщениями), все будет иметь значение. В сегодняшнем посте мы попытаемся объяснить, насколько просто представить две очень популярные реализации JMS , Apache ActiveMQ и JBoss HornetQ , которые доступны для веб-интерфейса (JavaScript) с использованием STOMP через Websockets .
Прежде чем копаться в коде, можно возразить, что это не очень хорошая идея. Так в чем же цель? Ответ действительно зависит:
- Вы разрабатываете прототип / подтверждение концепции и вам нужен простой способ интеграции публикации / подписки или обмена сообщениями между пользователями.
- Вы не хотите / не должны создавать сложную архитектуру, и достаточно простого решения, которое работает
Масштабируемость, отказоустойчивость и многие другие очень важные решения здесь не принимаются во внимание, но, безусловно, это следует делать, если вы разрабатываете надежную и отказоустойчивую архитектуру.
Итак, начнем. Как всегда, лучше начать с проблемы, которую мы пытаемся решить: мы хотели бы разработать простое решение публикации / подписки, где веб-клиент, написанный на JavaScript, сможет отправлять сообщения и прослушивать определенную тему. Всякий раз, когда какое-либо сообщение было получено, клиент просто показывает простое окно предупреждения. Обратите внимание, что нам нужно использовать современный браузер с поддержкой веб-сокетов , например, Google Chrome или Mozilla Firefox .
Для обоих наших примеров клиентский код остается тем же самым, и поэтому давайте начнем с этого. Хорошей отправной точкой является статья STOMP Over WebSocket, в которой представлен модуль stomp.js, а вот наш index.html :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
<script src= "stomp.js" ></script> <script type= "text/javascript" > var client = Stomp.client( "ws://localhost:61614/stomp" , "v11.stomp" ); client.connect( "" , "" , function() { client.subscribe( "jms.topic.test" , function( message ) { alert( message ); }, { priority: 9 } ); client.send( "jms.topic.test" , { priority: 9 }, "Pub/Sub over STOMP!" ); } ); </script> |
Чрезвычайно простой код, но некоторые детали заслуживают объяснения. Во-первых, мы ищем конечную точку Websockets на ws: // localhost: 61614 / stomp . Этого достаточно для локального развертывания, но лучше заменить localhost реальным IP-адресом или именем хоста. Во-вторых, после подключения клиент подписывается на тему (интересуются только сообщениями с приоритетом: 9 ) и сразу после этого публикует сообщение в этой теме. С точки зрения клиента, мы сделали.
Давайте перейдем к брокеру сообщений, и наш первый в списке — Apache ActiveMQ . Чтобы упростить пример, мы встроим брокер Apache ActiveMQ в простое приложение Spring без использования файлов конфигурации XML. Поскольку исходный код доступен на GitHub , я пропущу фрагмент файла POM и просто покажу код:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
package com.example.messaging; import java.util.Collections; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.hooks.SpringContextHook; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AppConfig { @Bean ( initMethod = "start" , destroyMethod = "stop" ) public BrokerService broker() throws Exception { final BrokerService broker = new BrokerService(); broker.addConnector( "ws://localhost:61614" ); broker.setPersistent( false ); broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) ); final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" ); broker.setDestinations( new ActiveMQDestination[] { topic } ); final ManagementContext managementContext = new ManagementContext(); managementContext.setCreateConnector( true ); broker.setManagementContext( managementContext ); return broker; } } |
Как мы видим, брокер ActiveMQ настроен с соединителем ws: // localhost: 61614 , который предполагает использование протокола STOMP . Также мы создаем тему JMS с именем jms.topic.test и включаем инструментарий управления JMX. И запустить его, простой класс Starter :
01
02
03
04
05
06
07
08
09
10
|
package com.example.messaging; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class Starter { public static void main( String[] args ) { ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig. class ); } } |
Теперь, когда он запущен и работает, давайте откроем файл index.html в браузере, мы должны увидеть что-то вроде этого:
Просто! Для любопытных читателей ActiveMQ использует Jetty 7.6.7.v20120910 для поддержки Websockets и не будет работать с последними дистрибутивами Jetty .
В дальнейшем, в отношении HornetQ реализации выглядят немного иначе, хотя и не очень сложно. Поскольку класс Starter остается прежним, единственным изменением является конфигурация:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package com.example.hornetq; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.core.config.impl.ConfigurationImpl; import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory; import org.hornetq.core.remoting.impl.netty.TransportConstants; import org.hornetq.core.server.JournalType; import org.hornetq.jms.server.config.ConnectionFactoryConfiguration; import org.hornetq.jms.server.config.JMSConfiguration; import org.hornetq.jms.server.config.TopicConfiguration; import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl; import org.hornetq.jms.server.config.impl.JMSConfigurationImpl; import org.hornetq.jms.server.config.impl.TopicConfigurationImpl; import org.hornetq.jms.server.embedded.EmbeddedJMS; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AppConfig { @Bean ( initMethod = "start" , destroyMethod = "stop" ) public EmbeddedJMS broker() throws Exception { final ConfigurationImpl configuration = new ConfigurationImpl(); configuration.setPersistenceEnabled( false ); configuration.setJournalType( JournalType.NIO ); configuration.setJMXManagementEnabled( true ); configuration.setSecurityEnabled( false ); final Map< String, Object > params = new HashMap<>(); params.put( TransportConstants.HOST_PROP_NAME, "localhost" ); params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" ); params.put( TransportConstants.PORT_PROP_NAME, "61614" ); final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory. class .getName(), params ); configuration.getAcceptorConfigurations().add( stomp ); configuration.getConnectorConfigurations().put( "stomp_ws" , stomp ); final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf" , true , "/cf" ); cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) ); final JMSConfiguration jmsConfig = new JMSConfigurationImpl(); jmsConfig.getConnectionFactoryConfigurations().add( cfConfig ); final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test" , "/topic/test" ); jmsConfig.getTopicConfigurations().add( topicConfig ); final EmbeddedJMS jmsServer = new EmbeddedJMS(); jmsServer.setConfiguration( configuration ); jmsServer.setJmsConfiguration( jmsConfig ); return jmsServer; } } |
Полный исходный код находится на GitHub . После запуска класса Starter и открытия index.html в браузере мы должны увидеть очень похожие результаты:
Конфигурация HornetQ выглядит немного более многословно, однако никаких дополнительных зависимостей нет, кроме блестящей среды Netty .
Для собственного любопытства я заменил брокера ActiveMQ на реализацию Apollo . Хотя мне удалось заставить его работать должным образом, я нашел API очень громоздким, по крайней мере в текущей версии 1.6 , поэтому я не освещал его в этом посте.