Статьи

Простой обмен сообщениями с STOMP через WebSockets с использованием Apollo

В моем предыдущем посте я рассмотрел пару интересных случаев использования реализации сообщений STOMP через Websockect с использованием известных брокеров сообщений, HornetQ и ActiveMQ . Но тот, который я не рассмотрел, — это Apollo, поскольку, по моему мнению, его API является многословным и недостаточно выразительным, как для разработчика на Java. Тем не менее, чем больше времени я проводил, играя с Аполлоном , тем больше убеждался в том, что у меня есть огромный потенциал. Так что этот пост все об Аполлоне .

Проблема, которую мы пытаемся решить, остается той же: простое решение публикации / подписки, где веб-клиент JavaScript отправляет сообщения и прослушивает определенную тему. При получении любого сообщения клиент отображает окно с предупреждением (обратите внимание, что нам нужно использовать современный браузер с поддержкой веб-сокетов , например, Google Chrome или Mozilla Firefox ).

Давайте запачкаем руки, начав с index.html (который импортирует потрясающую библиотеку JavaScript stomp.js ):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
<script src="stomp.js"></script>
 
<script type="text/javascript">
    var client = Stomp.client( "ws://localhost:61614/stomp", "v11.stomp" );
 
    client.connect( "", "",
        function() {
            client.subscribe("/topic/test",
                function( message ) {
                    alert( message );
                },
                { priority: 9 }
            );
 
            client.send("/topic/test", { priority: 9 }, "Pub/Sub over STOMP!");
        }
    );
</script>

Клиентская часть ничем не отличается, кроме названия темы, которое теперь / topic / test . Однако сторона сервера сильно отличается. Аполлон написан на Scala и охватывает асинхронную неблокирующую модель программирования. Я думаю, это очень хорошая вещь. Хотя это приносит новую парадигму для программирования, и это также не обязательно плохо. Класс AppConfig предназначен для настройки встроенного брокера Apollo :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.example.messaging;
 
import java.io.File;
 
import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.jmx.dto.JmxDTO;
import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;
import org.apache.activemq.apollo.dto.BrokerDTO;
import org.apache.activemq.apollo.dto.TopicDTO;
import org.apache.activemq.apollo.dto.VirtualHostDTO;
import org.apache.activemq.apollo.dto.WebAdminDTO;
import org.apache.activemq.apollo.stomp.dto.StompDTO;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class AppConfig {
    @Bean
    public Broker broker() throws Exception {
        final Broker broker = new Broker();
 
        // Configure STOMP over WebSockects connector
        final AcceptingConnectorDTO ws = new AcceptingConnectorDTO();
        ws.id = "ws";
        ws.bind = "ws://localhost:61614"
        ws.protocols.add( new StompDTO() );
 
        // Create a topic with name 'test'
        final TopicDTO topic = new TopicDTO();
        topic.id = "test";
 
        // Create virtual host (based on localhost)
        final VirtualHostDTO host = new VirtualHostDTO();
        host.id = "localhost"
        host.topics.add( topic );
        host.host_names.add( "localhost" );
        host.host_names.add( "127.0.0.1" );
        host.auto_create_destinations = false;
 
        // Create a web admin UI (REST) accessible at: http://localhost:61680/api/index.html#!/
        final WebAdminDTO webadmin = new WebAdminDTO();
        webadmin.bind = "http://localhost:61680";
 
        // Create JMX instrumentation
        final JmxDTO jmxService = new JmxDTO();
        jmxService.enabled = true;
 
        // Finally, glue all together inside broker configuration
        final BrokerDTO config = new BrokerDTO();
        config.connectors.add( ws );
        config.virtual_hosts.add( host );
        config.web_admins.add( webadmin );
        config.services.add( jmxService );
 
        broker.setConfig( config );
        broker.setTmp( new File( System.getProperty( "java.io.tmpdir" ) ) );
 
        broker.start( new Runnable() {  
            @Override
            public void run() { 
                System.out.println("The broker has been started started.");
            }
        } );
 
        return broker;
    }
}

Я предполагаю, что становится ясно, что я имел в виду под многословным и недостаточно выразительным, но по крайней мере это легко понять. Во-первых, мы создаем коннектор Websockects на ws: // localhost: 61614 и просим его поддерживать протокол STOMP . Затем мы создаем простую тему с именем test (которую мы называем / topic / test на стороне клиента). Следующим важным шагом является создание виртуального хоста и привязка к нему тем (и очередей, если таковые имеются). Список имен хостов очень важен, так как логика разрешения адресатов сильно зависит от него. На следующем шаге мы настраиваем пользовательский интерфейс веб-администратора и инструментарий JMX, который предоставляет нам доступ к конфигурации, статистике и мониторингу. Чтобы проверить это, откройте этот URL в своем веб-браузере после запуска брокера Apollo . И, наконец, применив конфигурацию и запустив брокера, мы готовы к работе! Как видите, модель асинхронного программирования приводит к обратным вызовам и анонимным функциям (где вы, Java 8 ?).

Теперь, когда настройка завершена, пришло время взглянуть на логику запуска, помещенную в класс Starter (опять же, обратные вызовы и анонимные функции используются для выполнения изящной логики выключения):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example.messaging;
 
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
 
import org.apache.activemq.apollo.broker.Broker;
import org.springframework.context.annotation.ConfigurableApplicationContext;
 
public class Starter  {
    public static void main( String[] args ) throws Exception {
        try( ConfigurableApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class ) ) {
            final Broker broker = context.getBean( Broker.class ); 
            System.out.println( "Press any key to terminate ..." );
            System.in.read();   
 
            final CountDownLatch latch = new CountDownLatch( 1 );
            broker.stop( new Runnable() {
                @Override
                public void run() { 
                    System.out.println("The broker has been stopped.");
                    latch.countDown();
                }
            } );
 
            // Gracefully stop the broker
            if( !latch.await( 1, TimeUnit.SECONDS ) ) {
                System.out.println("The broker hasn't been stopped, exiting anyway ...");
            }
        }
    }
}

Как и в предыдущих примерах , после запуска класса Starter и открытия index.html в браузере мы должны увидеть что-то вроде этого:

Message.STOMP.3

Отлично, работает просто отлично! Я уверен, что просто переписав код в Scala , этот пример использования API Apollo будет выглядеть намного более компактным и лаконичным. В любом случае, я думаю, что брокер сообщений Apollo определенно стоит рассмотреть, если вы ищете выдающуюся архитектуру обмена сообщениями.

Все источники доступны на GitHub: пример Apollo .