Статьи

JMS с ActiveMQ

JMS с ActiveMQ

Сокращение от JMS для Java Message Service предоставляет механизм для интеграции слабосвязанных и гибких приложений. JMS доставляет данные асинхронно между приложениями на основе хранения и пересылки. Приложения обмениваются данными через MOM (Message Oriented Middleware), который выступает в роли посредника, не связываясь напрямую.

Архитектура JMS

Основными компонентами JMS являются:

  • Поставщик JMS: система обмена сообщениями, которая реализует интерфейсы JMS и предоставляет функции администрирования и управления
  • Клиенты: Java-приложения, которые отправляют или получают сообщения JMS. Отправитель сообщения называется источником, а получатель — потребителем.
  • Сообщения: объекты, которые передают информацию между клиентами JMS.
  • Администрируемые объекты: предварительно сконфигурированные объекты JMS, созданные администратором для использования клиентами.

Доступно несколько JMS-провайдеров, таких как Apache ActiveMQ и OpenMQ. Здесь я использовал Apache ActiveMQ.

Установка и запуск Apache ActiveMQ на Windows

  1. Скачать бинарный дистрибутив ActiveMQ для Windows
  2. Распакуйте его в нужное место
  3. С помощью командной строки измените каталог на папку bin внутри папки установки ActiveMQ и выполните следующую команду, чтобы запустить ActiveMQ.
  4. 1
    activemq

После запуска ActiveMQ вы можете зайти в консоль администратора по адресу http: // localhost: 8161 / admin / и выполнить административные задачи.

Модели обмена сообщениями JMS

У JMS есть две модели обмена сообщениями: модель обмена сообщениями точка-точка и модель обмена сообщениями подписчика издателя.

Модель обмена сообщениями точка-точка

Producer отправляет сообщение в указанную очередь в JMS-провайдере, и только один из потребителей, прослушивающих эту очередь, получает это сообщение.

Пример 1 и пример 2 почти аналогичны, единственное отличие состоит в том, что пример 1 создает очереди в программе, а пример 2 использует файл jndi.properties для поиска имен и создания очередей.

Пример 1

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.eviac.blog.jms;
 
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
 
import org.apache.log4j.BasicConfigurator;
 
public class Producer {
 
 public Producer() throws JMSException, NamingException {
 
  // Obtain a JNDI connection
  InitialContext jndi = new InitialContext();
 
  // Look up a JMS connection factory
  ConnectionFactory conFactory = (ConnectionFactory) jndi
    .lookup('connectionFactory');
  Connection connection;
 
  // Getting JMS connection from the server and starting it
  connection = conFactory.createConnection();
  try {
   connection.start();
 
   // JMS messages are sent and received using a Session. We will
   // create here a non-transactional session object. If you want
   // to use transactions you should set the first parameter to 'true'
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);
 
   Destination destination = (Destination) jndi.lookup('MyQueue');
 
   // MessageProducer is used for sending messages (as opposed
   // to MessageConsumer which is used for receiving them)
   MessageProducer producer = session.createProducer(destination);
 
   // We will send a small text message saying 'Hello World!'
   TextMessage message = session.createTextMessage('Hello World!');
 
   // Here we are sending the message!
   producer.send(message);
   System.out.println('Sent message '' + message.getText() + ''');
  } finally {
   connection.close();
  }
 }
 
 public static void main(String[] args) throws JMSException {
  try {
   BasicConfigurator.configure();
   new Producer();
  } catch (NamingException e) {
   e.printStackTrace();
  }
 
 }
}
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.eviac.blog.jms;
 
import javax.jms.*;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.BasicConfigurator;
 
public class Consumer {
 // URL of the JMS server
 private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
 
 // Name of the queue we will receive messages from
 private static String subject = 'MYQUEUE';
 
 public static void main(String[] args) throws JMSException {
  BasicConfigurator.configure();
  // Getting JMS connection from the server
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
  Connection connection = connectionFactory.createConnection();
  connection.start();
 
  // Creating session for seding messages
  Session session = connection.createSession(false,
    Session.AUTO_ACKNOWLEDGE);
 
  // Getting the queue
  Destination destination = session.createQueue(subject);
 
  // MessageConsumer is used for receiving (consuming) messages
  MessageConsumer consumer = session.createConsumer(destination);
 
  // Here we receive the message.
  // By default this call is blocking, which means it will wait
  // for a message to arrive on the queue.
  Message message = consumer.receive();
 
  // There are many types of Message and TextMessage
  // is just one of them. Producer sent us a TextMessage
  // so we must cast to it to get access to its .getText()
  // method.
  if (message instanceof TextMessage) {
   TextMessage textMessage = (TextMessage) message;
   System.out.println('Received message '' + textMessage.getText()
     + ''');
  }
  connection.close();
 }
}

jndi.properties

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
# START SNIPPET: jndi
 
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
 
# use the following property to configure the default connector
java.naming.provider.url = vm://localhost
 
# use the following property to specify the JNDI name the connection factory
# should appear as.
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
 
# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.MyQueue = example.MyQueue
 
 
# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.MyTopic = example.MyTopic
 
# END SNIPPET: jndi
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.eviac.blog.jms;
 
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
 
import org.apache.log4j.BasicConfigurator;
 
public class Producer {
 
 public Producer() throws JMSException, NamingException {
 
  // Obtain a JNDI connection
  InitialContext jndi = new InitialContext();
 
  // Look up a JMS connection factory
  ConnectionFactory conFactory = (ConnectionFactory) jndi
    .lookup('connectionFactory');
  Connection connection;
 
  // Getting JMS connection from the server and starting it
  connection = conFactory.createConnection();
  try {
   connection.start();
 
   // JMS messages are sent and received using a Session. We will
   // create here a non-transactional session object. If you want
   // to use transactions you should set the first parameter to 'true'
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);
 
   Destination destination = (Destination) jndi.lookup('MyQueue');
 
   // MessageProducer is used for sending messages (as opposed
   // to MessageConsumer which is used for receiving them)
   MessageProducer producer = session.createProducer(destination);
 
   // We will send a small text message saying 'Hello World!'
   TextMessage message = session.createTextMessage('Hello World!');
 
   // Here we are sending the message!
   producer.send(message);
   System.out.println('Sent message '' + message.getText() + ''');
  } finally {
   connection.close();
  }
 }
 
 public static void main(String[] args) throws JMSException {
  try {
   BasicConfigurator.configure();
   new Producer();
  } catch (NamingException e) {
   e.printStackTrace();
  }
 
 }
}
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.eviac.blog.jms;
 
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
 
import org.apache.log4j.BasicConfigurator;
 
public class Consumer {
 public Consumer() throws NamingException, JMSException {
  Connection connection;
   
  // Obtain a JNDI connection
  InitialContext jndi = new InitialContext();
 
  // Look up a JMS connection factory
  ConnectionFactory conFactory = (ConnectionFactory) jndi
    .lookup('connectionFactory');
  // Getting JMS connection from the server and starting it
  // ConnectionFactory connectionFactory = new
  // ActiveMQConnectionFactory(url);
  connection = conFactory.createConnection();
 
  // // Getting JMS connection from the server
  // ConnectionFactory connectionFactory = new
  // ActiveMQConnectionFactory(url);
  // Connection connection = connectionFactory.createConnection();
  try {
   connection.start();
 
   // Creating session for seding messages
   Session session = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);
 
   // Getting the queue
   Destination destination = (Destination) jndi.lookup('MyQueue');
 
   // MessageConsumer is used for receiving (consuming) messages
   MessageConsumer consumer = session.createConsumer(destination);
 
   // Here we receive the message.
   // By default this call is blocking, which means it will wait
   // for a message to arrive on the queue.
   Message message = consumer.receive();
 
   // There are many types of Message and TextMessage
   // is just one of them. Producer sent us a TextMessage
   // so we must cast to it to get access to its .getText()
   // method.
   if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    System.out.println('Received message '' + textMessage.getText()
      + ''');
   }
  } finally {
   connection.close();
  }
 }
 
 public static void main(String[] args) throws JMSException {
  BasicConfigurator.configure();
  try {
   new Consumer();
  } catch (NamingException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 
 }
}

Модель подписчика издателя

Издатель публикует сообщение в указанной теме в провайдере JMS, и все подписчики, подписавшиеся на эту тему, получают сообщение. Обратите внимание, что только активные подписчики получают сообщение.

Пример двухточечной модели

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.eviac.blog.jms;
 
import javax.jms.*;
import javax.naming.*;
 
import org.apache.log4j.BasicConfigurator;
 
import java.io.BufferedReader;
import java.io.InputStreamReader;
 
public class DemoPublisherSubscriberModel implements javax.jms.MessageListener {
 private TopicSession pubSession;
 private TopicPublisher publisher;
 private TopicConnection connection;
 
 /* Establish JMS publisher and subscriber */
 public DemoPublisherSubscriberModel(String topicName, String username,
   String password) throws Exception {
  // Obtain a JNDI connection
  InitialContext jndi = new InitialContext();
 
  // Look up a JMS connection factory
  TopicConnectionFactory conFactory = (TopicConnectionFactory) jndi
    .lookup('topicConnectionFactry');
 
  // Create a JMS connection
  connection = conFactory.createTopicConnection(username, password);
 
  // Create JMS session objects for publisher and subscriber
  pubSession = connection.createTopicSession(false,
    Session.AUTO_ACKNOWLEDGE);
  TopicSession subSession = connection.createTopicSession(false,
    Session.AUTO_ACKNOWLEDGE);
 
  // Look up a JMS topic
  Topic chatTopic = (Topic) jndi.lookup(topicName);
 
  // Create a JMS publisher and subscriber
  publisher = pubSession.createPublisher(chatTopic);
  TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);
 
  // Set a JMS message listener
  subscriber.setMessageListener(this);
 
  // Start the JMS connection; allows messages to be delivered
  connection.start();
 
  // Create and send message using topic publisher
  TextMessage message = pubSession.createTextMessage();
  message.setText(username + ': Howdy Friends!');
  publisher.publish(message);
 
 }
 
 /*
  * A client can register a message listener with a consumer. A message
  * listener is similar to an event listener. Whenever a message arrives at
  * the destination, the JMS provider delivers the message by calling the
  * listener's onMessage method, which acts on the contents of the message.
  */
 public void onMessage(Message message) {
  try {
   TextMessage textMessage = (TextMessage) message;
   String text = textMessage.getText();
   System.out.println(text);
  } catch (JMSException jmse) {
   jmse.printStackTrace();
  }
 }
 
 public static void main(String[] args) {
  BasicConfigurator.configure();
  try {
   if (args.length != 3)
    System.out
      .println('Please Provide the topic name,username,password!');
 
   DemoPublisherSubscriberModel demo = new DemoPublisherSubscriberModel(
     args[0], args[1], args[2]);
 
   BufferedReader commandLine = new java.io.BufferedReader(
     new InputStreamReader(System.in));
 
   // closes the connection and exit the system when 'exit' enters in
   // the command line
   while (true) {
    String s = commandLine.readLine();
    if (s.equalsIgnoreCase('exit')) {
     demo.connection.close();
     System.exit(0);
 
    }
   }
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

Ссылка: JMS с ActiveMQ от нашего партнера JCG Павитры Сиривардены в блоге EVIAC .