Сокращение от JMS для Java Message Service предоставляет механизм для интеграции слабосвязанных и гибких приложений. JMS доставляет данные асинхронно между приложениями на основе хранения и пересылки. Приложения обмениваются данными через MOM (Message Oriented Middleware), который выступает в роли посредника, не связываясь напрямую.
Архитектура JMS
Основными компонентами JMS являются:
- Поставщик JMS: система обмена сообщениями, которая реализует интерфейсы JMS и предоставляет функции администрирования и управления
- Клиенты: Java-приложения, которые отправляют или получают сообщения JMS. Отправитель сообщения называется источником, а получатель — потребителем.
- Сообщения: объекты, которые передают информацию между клиентами JMS.
- Администрируемые объекты: предварительно сконфигурированные объекты JMS, созданные администратором для использования клиентами.
Доступно несколько JMS-провайдеров, таких как Apache ActiveMQ и OpenMQ. Здесь я использовал Apache ActiveMQ.
Установка и запуск Apache ActiveMQ на Windows
- Скачать бинарный дистрибутив ActiveMQ для Windows
- Распакуйте его в нужное место
- С помощью командной строки измените каталог на папку bin внутри папки установки ActiveMQ и выполните следующую команду, чтобы запустить ActiveMQ.
|
1
|
activemq |
После запуска ActiveMQ вы можете зайти в консоль администратора по адресу http: // localhost: 8161 / admin / и выполнить административные задачи.
Модели обмена сообщениями JMS
У JMS есть две модели обмена сообщениями: модель обмена сообщениями точка-точка и модель обмена сообщениями подписчика издателя.
Модель обмена сообщениями точка-точка
Producer отправляет сообщение в указанную очередь в JMS-провайдере, и только один из потребителей, прослушивающих эту очередь, получает это сообщение.
Пример 1 и пример 2 почти аналогичны, единственное отличие состоит в том, что пример 1 создает очереди в программе, а пример 2 использует файл jndi.properties для поиска имен и создания очередей.
Пример 1
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
package com.eviac.blog.jms;import javax.jms.*;import javax.naming.InitialContext;import javax.naming.NamingException;import org.apache.log4j.BasicConfigurator;public class Producer { public Producer() throws JMSException, NamingException { // Obtain a JNDI connection InitialContext jndi = new InitialContext(); // Look up a JMS connection factory ConnectionFactory conFactory = (ConnectionFactory) jndi .lookup('connectionFactory'); Connection connection; // Getting JMS connection from the server and starting it connection = conFactory.createConnection(); try { connection.start(); // JMS messages are sent and received using a Session. We will // create here a non-transactional session object. If you want // to use transactions you should set the first parameter to 'true' Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = (Destination) jndi.lookup('MyQueue'); // MessageProducer is used for sending messages (as opposed // to MessageConsumer which is used for receiving them) MessageProducer producer = session.createProducer(destination); // We will send a small text message saying 'Hello World!' TextMessage message = session.createTextMessage('Hello World!'); // Here we are sending the message! producer.send(message); System.out.println('Sent message '' + message.getText() + '''); } finally { connection.close(); } } public static void main(String[] args) throws JMSException { try { BasicConfigurator.configure(); new Producer(); } catch (NamingException e) { e.printStackTrace(); } }} |
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package com.eviac.blog.jms;import javax.jms.*;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.log4j.BasicConfigurator;public class Consumer { // URL of the JMS server private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; // Name of the queue we will receive messages from private static String subject = 'MYQUEUE'; public static void main(String[] args) throws JMSException { BasicConfigurator.configure(); // Getting JMS connection from the server ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); connection.start(); // Creating session for seding messages Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Getting the queue Destination destination = session.createQueue(subject); // MessageConsumer is used for receiving (consuming) messages MessageConsumer consumer = session.createConsumer(destination); // Here we receive the message. // By default this call is blocking, which means it will wait // for a message to arrive on the queue. Message message = consumer.receive(); // There are many types of Message and TextMessage // is just one of them. Producer sent us a TextMessage // so we must cast to it to get access to its .getText() // method. if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println('Received message '' + textMessage.getText() + '''); } connection.close(); }} |
jndi.properties
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
# START SNIPPET: jndijava.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory# use the following property to configure the default connectorjava.naming.provider.url = vm://localhost# use the following property to specify the JNDI name the connection factory# should appear as. #connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry# register some queues in JNDI using the form# queue.[jndiName] = [physicalName]queue.MyQueue = example.MyQueue# register some topics in JNDI using the form# topic.[jndiName] = [physicalName]topic.MyTopic = example.MyTopic# END SNIPPET: jndi |
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
package com.eviac.blog.jms;import javax.jms.*;import javax.naming.InitialContext;import javax.naming.NamingException;import org.apache.log4j.BasicConfigurator;public class Producer { public Producer() throws JMSException, NamingException { // Obtain a JNDI connection InitialContext jndi = new InitialContext(); // Look up a JMS connection factory ConnectionFactory conFactory = (ConnectionFactory) jndi .lookup('connectionFactory'); Connection connection; // Getting JMS connection from the server and starting it connection = conFactory.createConnection(); try { connection.start(); // JMS messages are sent and received using a Session. We will // create here a non-transactional session object. If you want // to use transactions you should set the first parameter to 'true' Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = (Destination) jndi.lookup('MyQueue'); // MessageProducer is used for sending messages (as opposed // to MessageConsumer which is used for receiving them) MessageProducer producer = session.createProducer(destination); // We will send a small text message saying 'Hello World!' TextMessage message = session.createTextMessage('Hello World!'); // Here we are sending the message! producer.send(message); System.out.println('Sent message '' + message.getText() + '''); } finally { connection.close(); } } public static void main(String[] args) throws JMSException { try { BasicConfigurator.configure(); new Producer(); } catch (NamingException e) { e.printStackTrace(); } }} |
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
package com.eviac.blog.jms;import javax.jms.*;import javax.naming.InitialContext;import javax.naming.NamingException;import org.apache.log4j.BasicConfigurator;public class Consumer { public Consumer() throws NamingException, JMSException { Connection connection; // Obtain a JNDI connection InitialContext jndi = new InitialContext(); // Look up a JMS connection factory ConnectionFactory conFactory = (ConnectionFactory) jndi .lookup('connectionFactory'); // Getting JMS connection from the server and starting it // ConnectionFactory connectionFactory = new // ActiveMQConnectionFactory(url); connection = conFactory.createConnection(); // // Getting JMS connection from the server // ConnectionFactory connectionFactory = new // ActiveMQConnectionFactory(url); // Connection connection = connectionFactory.createConnection(); try { connection.start(); // Creating session for seding messages Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Getting the queue Destination destination = (Destination) jndi.lookup('MyQueue'); // MessageConsumer is used for receiving (consuming) messages MessageConsumer consumer = session.createConsumer(destination); // Here we receive the message. // By default this call is blocking, which means it will wait // for a message to arrive on the queue. Message message = consumer.receive(); // There are many types of Message and TextMessage // is just one of them. Producer sent us a TextMessage // so we must cast to it to get access to its .getText() // method. if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println('Received message '' + textMessage.getText() + '''); } } finally { connection.close(); } } public static void main(String[] args) throws JMSException { BasicConfigurator.configure(); try { new Consumer(); } catch (NamingException e) { // TODO Auto-generated catch block e.printStackTrace(); } }} |
Модель подписчика издателя
Издатель публикует сообщение в указанной теме в провайдере JMS, и все подписчики, подписавшиеся на эту тему, получают сообщение. Обратите внимание, что только активные подписчики получают сообщение.
Пример двухточечной модели
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
package com.eviac.blog.jms;import javax.jms.*;import javax.naming.*;import org.apache.log4j.BasicConfigurator;import java.io.BufferedReader;import java.io.InputStreamReader;public class DemoPublisherSubscriberModel implements javax.jms.MessageListener { private TopicSession pubSession; private TopicPublisher publisher; private TopicConnection connection; /* Establish JMS publisher and subscriber */ public DemoPublisherSubscriberModel(String topicName, String username, String password) throws Exception { // Obtain a JNDI connection InitialContext jndi = new InitialContext(); // Look up a JMS connection factory TopicConnectionFactory conFactory = (TopicConnectionFactory) jndi .lookup('topicConnectionFactry'); // Create a JMS connection connection = conFactory.createTopicConnection(username, password); // Create JMS session objects for publisher and subscriber pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // Look up a JMS topic Topic chatTopic = (Topic) jndi.lookup(topicName); // Create a JMS publisher and subscriber publisher = pubSession.createPublisher(chatTopic); TopicSubscriber subscriber = subSession.createSubscriber(chatTopic); // Set a JMS message listener subscriber.setMessageListener(this); // Start the JMS connection; allows messages to be delivered connection.start(); // Create and send message using topic publisher TextMessage message = pubSession.createTextMessage(); message.setText(username + ': Howdy Friends!'); publisher.publish(message); } /* * A client can register a message listener with a consumer. A message * listener is similar to an event listener. Whenever a message arrives at * the destination, the JMS provider delivers the message by calling the * listener's onMessage method, which acts on the contents of the message. */ public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println(text); } catch (JMSException jmse) { jmse.printStackTrace(); } } public static void main(String[] args) { BasicConfigurator.configure(); try { if (args.length != 3) System.out .println('Please Provide the topic name,username,password!'); DemoPublisherSubscriberModel demo = new DemoPublisherSubscriberModel( args[0], args[1], args[2]); BufferedReader commandLine = new java.io.BufferedReader( new InputStreamReader(System.in)); // closes the connection and exit the system when 'exit' enters in // the command line while (true) { String s = commandLine.readLine(); if (s.equalsIgnoreCase('exit')) { demo.connection.close(); System.exit(0); } } } catch (Exception e) { e.printStackTrace(); } }} |
Ссылка: JMS с ActiveMQ от нашего партнера JCG Павитры Сиривардены в блоге EVIAC .





