Статьи

Простой обмен сообщениями с STOMP через WebSockets с использованием ActiveMQ и HornetQ

Обмен сообщениями — чрезвычайно мощный инструмент для построения распределенных программных систем различного уровня. Как правило, по крайней мере в экосистеме Java, клиент (клиент) никогда не взаимодействует напрямую с брокером сообщений (или не обменивается ими), а делает это, вызывая сервисы на стороне сервера (сервер). Или клиент может даже не знать, что существует решение для обмена сообщениями.

С ростом популярности Websockets и широкой поддержкой текстовых протоколов, таких как STOMP (используемых для связи с посредником или обменом сообщениями), все будет иметь значение. В сегодняшнем посте мы попытаемся объяснить, насколько просто представить две очень популярные реализации JMS , Apache ActiveMQ и JBoss HornetQ , которые доступны для веб-интерфейса (JavaScript) с использованием STOMP через Websockets .

Прежде чем копаться в коде, можно возразить, что это не очень хорошая идея. Так в чем же цель? Ответ действительно зависит:

  • Вы разрабатываете прототип / подтверждение концепции и вам нужен простой способ интеграции публикации / подписки или обмена сообщениями между пользователями.
  • Вы не хотите / не должны создавать сложную архитектуру, и достаточно простого решения, которое работает

Масштабируемость, отказоустойчивость и многие другие очень важные решения здесь не принимаются во внимание, но, безусловно, это следует делать, если вы разрабатываете надежную и отказоустойчивую архитектуру.

Итак, начнем. Как всегда, лучше начать с проблемы, которую мы пытаемся решить: мы хотели бы разработать простое решение публикации / подписки, где веб-клиент, написанный на JavaScript, сможет отправлять сообщения и прослушивать определенную тему. Всякий раз, когда какое-либо сообщение было получено, клиент просто показывает простое окно предупреждения. Обратите внимание, что нам нужно использовать современный браузер с поддержкой веб-сокетов , например, Google Chrome или Mozilla Firefox .

Для обоих наших примеров клиентский код остается тем же самым, и поэтому давайте начнем с этого. Хорошей отправной точкой является статья STOMP Over WebSocket, в которой представлен модуль stomp.js, а вот наш index.html :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
<script src="stomp.js"></script>
 
<script type="text/javascript">
 var client = Stomp.client( "ws://localhost:61614/stomp", "v11.stomp" );
 
 client.connect( "", "",
  function() {
      client.subscribe("jms.topic.test",
       function( message ) {
           alert( message );
          },
    { priority: 9 }
      );
 
   client.send("jms.topic.test", { priority: 9 }, "Pub/Sub over STOMP!");
  }
 );
 
</script>

Чрезвычайно простой код, но некоторые детали заслуживают объяснения. Во-первых, мы ищем конечную точку Websockets на ws: // localhost: 61614 / stomp . Этого достаточно для локального развертывания, но лучше заменить localhost реальным IP-адресом или именем хоста. Во-вторых, после подключения клиент подписывается на тему (интересуются только сообщениями с приоритетом: 9 ) и сразу после этого публикует сообщение в этой теме. С точки зрения клиента, мы сделали.

Давайте перейдем к брокеру сообщений, и наш первый в списке — Apache ActiveMQ . Чтобы упростить пример, мы встроим брокер Apache ActiveMQ в простое приложение Spring без использования файлов конфигурации XML. Поскольку исходный код доступен на GitHub , я пропущу фрагмент файла POM и просто покажу код:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example.messaging;
 
import java.util.Collections;
 
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.hooks.SpringContextHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();   
        broker.addConnector( "ws://localhost:61614" );
        broker.setPersistent( false );
        broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) );
 
        final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" );
        broker.setDestinations( new ActiveMQDestination[] { topic }  );
 
        final ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector( true );
        broker.setManagementContext( managementContext );
 
        return broker;
    }
}

Как мы видим, брокер ActiveMQ настроен с соединителем ws: // localhost: 61614 , который предполагает использование протокола STOMP . Также мы создаем тему JMS с именем jms.topic.test и включаем инструментарий управления JMX. И запустить его, простой класс Starter :

01
02
03
04
05
06
07
08
09
10
package com.example.messaging;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
 
public class Starter  {
    public static void main( String[] args ) {
        ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

Теперь, когда он запущен и работает, давайте откроем файл index.html в браузере, мы должны увидеть что-то вроде этого:

Message.STOMP.1

Просто! Для любопытных читателей ActiveMQ использует Jetty 7.6.7.v20120910 для поддержки Websockets и не будет работать с последними дистрибутивами Jetty .

В дальнейшем, в отношении HornetQ реализации выглядят немного иначе, хотя и не очень сложно. Поскольку класс Starter остается прежним, единственным изменением является конфигурация:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.example.hornetq;
 
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
 
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public EmbeddedJMS broker() throws Exception {
        final ConfigurationImpl configuration = new ConfigurationImpl();
        configuration.setPersistenceEnabled( false );
        configuration.setJournalType( JournalType.NIO );
        configuration.setJMXManagementEnabled( true );
        configuration.setSecurityEnabled( false );
 
        final Map< String, Object > params = new HashMap<>();
        params.put( TransportConstants.HOST_PROP_NAME, "localhost" );
        params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" );
        params.put( TransportConstants.PORT_PROP_NAME, "61614" );
 
        final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory.class.getName(), params );
        configuration.getAcceptorConfigurations().add( stomp );
        configuration.getConnectorConfigurations().put( "stomp_ws", stomp );
 
        final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf", true, "/cf" );
        cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) );
 
        final JMSConfiguration jmsConfig = new JMSConfigurationImpl();
        jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );
 
        final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test", "/topic/test" );
        jmsConfig.getTopicConfigurations().add( topicConfig );
 
        final EmbeddedJMS jmsServer = new EmbeddedJMS();
        jmsServer.setConfiguration( configuration );
        jmsServer.setJmsConfiguration( jmsConfig );
 
        return jmsServer;
    }
}

Полный исходный код находится на GitHub . После запуска класса Starter и открытия index.html в браузере мы должны увидеть очень похожие результаты:

Message.STOMP.2

Конфигурация HornetQ выглядит немного более многословно, однако никаких дополнительных зависимостей нет, кроме блестящей среды Netty .

Для собственного любопытства я заменил брокера ActiveMQ на реализацию Apollo . Хотя мне удалось заставить его работать должным образом, я нашел API очень громоздким, по крайней мере в текущей версии 1.6 , поэтому я не освещал его в этом посте.