В моем предыдущем посте я рассмотрел пару интересных случаев использования реализации сообщений STOMP через Websockect с использованием известных брокеров сообщений, HornetQ и ActiveMQ . Но тот, который я не рассмотрел, — это Apollo, поскольку, по моему мнению, его API является многословным и недостаточно выразительным, как для разработчика на Java. Тем не менее, чем больше времени я проводил, играя с Аполлоном , тем больше убеждался в том, что у меня есть огромный потенциал. Так что этот пост все об Аполлоне .
Проблема, которую мы пытаемся решить, остается той же: простое решение публикации / подписки, где веб-клиент JavaScript отправляет сообщения и прослушивает определенную тему. При получении любого сообщения клиент отображает окно с предупреждением (обратите внимание, что нам нужно использовать современный браузер с поддержкой веб-сокетов , например, Google Chrome или Mozilla Firefox ).
Давайте запачкаем руки, начав с index.html (который импортирует потрясающую библиотеку JavaScript stomp.js ):
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
<script src="stomp.js"></script><script type="text/javascript"> var client = Stomp.client( "ws://localhost:61614/stomp", "v11.stomp" ); client.connect( "", "", function() { client.subscribe("/topic/test", function( message ) { alert( message ); }, { priority: 9 } ); client.send("/topic/test", { priority: 9 }, "Pub/Sub over STOMP!"); } ); </script> |
Клиентская часть ничем не отличается, кроме названия темы, которое теперь / topic / test . Однако сторона сервера сильно отличается. Аполлон написан на Scala и охватывает асинхронную неблокирующую модель программирования. Я думаю, это очень хорошая вещь. Хотя это приносит новую парадигму для программирования, и это также не обязательно плохо. Класс AppConfig предназначен для настройки встроенного брокера Apollo :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
package com.example.messaging;import java.io.File;import org.apache.activemq.apollo.broker.Broker;import org.apache.activemq.apollo.broker.jmx.dto.JmxDTO;import org.apache.activemq.apollo.dto.AcceptingConnectorDTO;import org.apache.activemq.apollo.dto.BrokerDTO;import org.apache.activemq.apollo.dto.TopicDTO;import org.apache.activemq.apollo.dto.VirtualHostDTO;import org.apache.activemq.apollo.dto.WebAdminDTO;import org.apache.activemq.apollo.stomp.dto.StompDTO;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class AppConfig { @Bean public Broker broker() throws Exception { final Broker broker = new Broker(); // Configure STOMP over WebSockects connector final AcceptingConnectorDTO ws = new AcceptingConnectorDTO(); ws.id = "ws"; ws.bind = "ws://localhost:61614"; ws.protocols.add( new StompDTO() ); // Create a topic with name 'test' final TopicDTO topic = new TopicDTO(); topic.id = "test"; // Create virtual host (based on localhost) final VirtualHostDTO host = new VirtualHostDTO(); host.id = "localhost"; host.topics.add( topic ); host.host_names.add( "localhost" ); host.host_names.add( "127.0.0.1" ); host.auto_create_destinations = false; // Create a web admin UI (REST) accessible at: http://localhost:61680/api/index.html#!/ final WebAdminDTO webadmin = new WebAdminDTO(); // Create JMX instrumentation final JmxDTO jmxService = new JmxDTO(); jmxService.enabled = true; // Finally, glue all together inside broker configuration final BrokerDTO config = new BrokerDTO(); config.connectors.add( ws ); config.virtual_hosts.add( host ); config.web_admins.add( webadmin ); config.services.add( jmxService ); broker.setConfig( config ); broker.setTmp( new File( System.getProperty( "java.io.tmpdir" ) ) ); broker.start( new Runnable() { @Override public void run() { System.out.println("The broker has been started started."); } } ); return broker; }} |
Я предполагаю, что становится ясно, что я имел в виду под многословным и недостаточно выразительным, но по крайней мере это легко понять. Во-первых, мы создаем коннектор Websockects на ws: // localhost: 61614 и просим его поддерживать протокол STOMP . Затем мы создаем простую тему с именем test (которую мы называем / topic / test на стороне клиента). Следующим важным шагом является создание виртуального хоста и привязка к нему тем (и очередей, если таковые имеются). Список имен хостов очень важен, так как логика разрешения адресатов сильно зависит от него. На следующем шаге мы настраиваем пользовательский интерфейс веб-администратора и инструментарий JMX, который предоставляет нам доступ к конфигурации, статистике и мониторингу. Чтобы проверить это, откройте этот URL в своем веб-браузере после запуска брокера Apollo . И, наконец, применив конфигурацию и запустив брокера, мы готовы к работе! Как видите, модель асинхронного программирования приводит к обратным вызовам и анонимным функциям (где вы, Java 8 ?).
Теперь, когда настройка завершена, пришло время взглянуть на логику запуска, помещенную в класс Starter (опять же, обратные вызовы и анонимные функции используются для выполнения изящной логики выключения):
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
package com.example.messaging;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import org.apache.activemq.apollo.broker.Broker;import org.springframework.context.annotation.ConfigurableApplicationContext;public class Starter { public static void main( String[] args ) throws Exception { try( ConfigurableApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class ) ) { final Broker broker = context.getBean( Broker.class ); System.out.println( "Press any key to terminate ..." ); System.in.read(); final CountDownLatch latch = new CountDownLatch( 1 ); broker.stop( new Runnable() { @Override public void run() { System.out.println("The broker has been stopped."); latch.countDown(); } } ); // Gracefully stop the broker if( !latch.await( 1, TimeUnit.SECONDS ) ) { System.out.println("The broker hasn't been stopped, exiting anyway ..."); } } }} |
Как и в предыдущих примерах , после запуска класса Starter и открытия index.html в браузере мы должны увидеть что-то вроде этого:
Отлично, работает просто отлично! Я уверен, что просто переписав код в Scala , этот пример использования API Apollo будет выглядеть намного более компактным и лаконичным. В любом случае, я думаю, что брокер сообщений Apollo определенно стоит рассмотреть, если вы ищете выдающуюся архитектуру обмена сообщениями.
Все источники доступны на GitHub: пример Apollo .
