В моем предыдущем посте я рассмотрел пару интересных случаев использования реализации сообщений STOMP через Websockect с использованием известных брокеров сообщений, HornetQ и ActiveMQ . Но тот, который я не рассмотрел, — это Apollo, поскольку, по моему мнению, его API является многословным и недостаточно выразительным, как для разработчика на Java. Тем не менее, чем больше времени я проводил, играя с Аполлоном , тем больше убеждался в том, что у меня есть огромный потенциал. Так что этот пост все об Аполлоне .
Проблема, которую мы пытаемся решить, остается той же: простое решение публикации / подписки, где веб-клиент JavaScript отправляет сообщения и прослушивает определенную тему. При получении любого сообщения клиент отображает окно с предупреждением (обратите внимание, что нам нужно использовать современный браузер с поддержкой веб-сокетов , например, Google Chrome или Mozilla Firefox ).
Давайте запачкаем руки, начав с index.html (который импортирует потрясающую библиотеку JavaScript stomp.js ):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
<script src= "stomp.js" ></script> <script type= "text/javascript" > var client = Stomp.client( "ws://localhost:61614/stomp" , "v11.stomp" ); client.connect( "" , "" , function() { client.subscribe( "/topic/test" , function( message ) { alert( message ); }, { priority: 9 } ); client.send( "/topic/test" , { priority: 9 }, "Pub/Sub over STOMP!" ); } ); </script> |
Клиентская часть ничем не отличается, кроме названия темы, которое теперь / topic / test . Однако сторона сервера сильно отличается. Аполлон написан на Scala и охватывает асинхронную неблокирующую модель программирования. Я думаю, это очень хорошая вещь. Хотя это приносит новую парадигму для программирования, и это также не обязательно плохо. Класс AppConfig предназначен для настройки встроенного брокера Apollo :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
package com.example.messaging; import java.io.File; import org.apache.activemq.apollo.broker.Broker; import org.apache.activemq.apollo.broker.jmx.dto.JmxDTO; import org.apache.activemq.apollo.dto.AcceptingConnectorDTO; import org.apache.activemq.apollo.dto.BrokerDTO; import org.apache.activemq.apollo.dto.TopicDTO; import org.apache.activemq.apollo.dto.VirtualHostDTO; import org.apache.activemq.apollo.dto.WebAdminDTO; import org.apache.activemq.apollo.stomp.dto.StompDTO; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AppConfig { @Bean public Broker broker() throws Exception { final Broker broker = new Broker(); // Configure STOMP over WebSockects connector final AcceptingConnectorDTO ws = new AcceptingConnectorDTO(); ws.id = "ws" ; ws.bind = "ws://localhost:61614" ; ws.protocols.add( new StompDTO() ); // Create a topic with name 'test' final TopicDTO topic = new TopicDTO(); topic.id = "test" ; // Create virtual host (based on localhost) final VirtualHostDTO host = new VirtualHostDTO(); host.id = "localhost" ; host.topics.add( topic ); host.host_names.add( "localhost" ); host.host_names.add( "127.0.0.1" ); host.auto_create_destinations = false ; // Create a web admin UI (REST) accessible at: http://localhost:61680/api/index.html#!/ final WebAdminDTO webadmin = new WebAdminDTO(); // Create JMX instrumentation final JmxDTO jmxService = new JmxDTO(); jmxService.enabled = true ; // Finally, glue all together inside broker configuration final BrokerDTO config = new BrokerDTO(); config.connectors.add( ws ); config.virtual_hosts.add( host ); config.web_admins.add( webadmin ); config.services.add( jmxService ); broker.setConfig( config ); broker.setTmp( new File( System.getProperty( "java.io.tmpdir" ) ) ); broker.start( new Runnable() { @Override public void run() { System.out.println( "The broker has been started started." ); } } ); return broker; } } |
Я предполагаю, что становится ясно, что я имел в виду под многословным и недостаточно выразительным, но по крайней мере это легко понять. Во-первых, мы создаем коннектор Websockects на ws: // localhost: 61614 и просим его поддерживать протокол STOMP . Затем мы создаем простую тему с именем test (которую мы называем / topic / test на стороне клиента). Следующим важным шагом является создание виртуального хоста и привязка к нему тем (и очередей, если таковые имеются). Список имен хостов очень важен, так как логика разрешения адресатов сильно зависит от него. На следующем шаге мы настраиваем пользовательский интерфейс веб-администратора и инструментарий JMX, который предоставляет нам доступ к конфигурации, статистике и мониторингу. Чтобы проверить это, откройте этот URL в своем веб-браузере после запуска брокера Apollo . И, наконец, применив конфигурацию и запустив брокера, мы готовы к работе! Как видите, модель асинхронного программирования приводит к обратным вызовам и анонимным функциям (где вы, Java 8 ?).
Теперь, когда настройка завершена, пришло время взглянуть на логику запуска, помещенную в класс Starter (опять же, обратные вызовы и анонимные функции используются для выполнения изящной логики выключения):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
package com.example.messaging; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.activemq.apollo.broker.Broker; import org.springframework.context.annotation.ConfigurableApplicationContext; public class Starter { public static void main( String[] args ) throws Exception { try ( ConfigurableApplicationContext context = new AnnotationConfigApplicationContext( AppConfig. class ) ) { final Broker broker = context.getBean( Broker. class ); System.out.println( "Press any key to terminate ..." ); System.in.read(); final CountDownLatch latch = new CountDownLatch( 1 ); broker.stop( new Runnable() { @Override public void run() { System.out.println( "The broker has been stopped." ); latch.countDown(); } } ); // Gracefully stop the broker if ( !latch.await( 1 , TimeUnit.SECONDS ) ) { System.out.println( "The broker hasn't been stopped, exiting anyway ..." ); } } } } |
Как и в предыдущих примерах , после запуска класса Starter и открытия index.html в браузере мы должны увидеть что-то вроде этого:
Отлично, работает просто отлично! Я уверен, что просто переписав код в Scala , этот пример использования API Apollo будет выглядеть намного более компактным и лаконичным. В любом случае, я думаю, что брокер сообщений Apollo определенно стоит рассмотреть, если вы ищете выдающуюся архитектуру обмена сообщениями.
Все источники доступны на GitHub: пример Apollo .