Удивительно, как команда JBoss собрала простой способ сделать JMS Clustering из коробки !!
Я начну с простого примера, создавая очередь с именем « MyClusteredQueue ».
В этом примере я использую JBoss AS 5.1 . и два компьютера, подключенные к одной сети, с этими IP-адресами:
— Компьютер A:
— Компьютер B:
Итак, вот шаги:
1 ) Установите JBoss на обоих компьютерах. Мы собираемся использовать конфигурацию « все » для обоих компьютеров.
2 ) Мы создаем нашу очередь на обоих серверах.
Перейдите в папку $ JBOSS_HOME / server / all / deploy / messaging / и отредактируйте файл destination-service.xml . Добавьте MyClusteredQueue перед последним тегом сервера . Это выглядит так:
<!-- Cluster JMS -->
<mbean code="org.jboss.jms.server.destination.QueueService"
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<attribute name="Clustered">true</attribute>
Это то, как вы добавляете очередь в JBoss, и люди, которые знакомы с этим, единственное новое — это добавить атрибут « Clustered ». Этот шаг должен быть установлен на обоих компьютерах. В конце статьи вы можете найти файлы.
3 ) Напишите MDB для использования сообщений и разверните его на двух компьютерах. (Я использую EJB 3 — стиль MDB).
import java.net.InetAddress;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.apache.log4j.Logger;
* @author felipeg
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="queue/MyClusteredQueue")
public class JMSClusterClientHandler implements MessageListener {
Logger log = Logger.getLogger(JMSClusterClientHandler.class);
public void onMessage(Message message) {
if (message instanceof ObjectMessage)
InetAddress addr = InetAddress.getLocalHost();
log.info("########## Processing Host: " + addr.getHostName() + " ##########" );
ObjectMessage objMessage = (ObjectMessage) message;
Object obj = objMessage.getObject();
log.info("Object received:" + obj.toString());
} catch (Exception e) {
4 ) Запустите jboss со следующими параметрами:
Компьютер A :
$ cd $ JBOSS_HOME / bin
$ ./run.sh -c all -b -Djboss.messaging.ServerPeerID = 1
Компьютер B :
$ cd $ JBOSS_HOME / bin
$ ./run.sh -c all -b -Djboss.messaging.ServerPeerID = 2
Необходимо указать ID для каждого сервера, и это выполняется с помощью следующей директивы:
Когда вы запускаете jboss на компьютере A , вы должны увидеть журналы (server.log), сообщающие вам, что готов и прослушивает один узел, и как только вы запустите jboss на компьютере B , в журнале появятся два узла, два IP готовы потреблять сообщения.
ClusteredConnectionFactory » (
JMSDispatcher.java — см. Код ниже).
Также в
файле jndi.properties (если вы используете DefaultContext по умолчанию
) необходимо добавить ip двух компьютеров, разделенных запятой, в
свойство java.naming.provider.url . (В моем случае создайте
переменную Properties, и я установлю все необходимые свойства,
JMSDispatcher.java — см. Код ниже).,
Клиент, который я написал, — это веб-приложение, состоящее из одной
страницы index.jsp , которая содержит форму, которая запрашивает у вас имя очереди, тип сообщений (очередь или тема), ip и порт сервера, как много раз он будет отправлять сообщение и фактическое сообщение, которое будет отправлено; также веб-приложение имеет сервлет (
JMSClusteredClient.java — см. код ниже), который получает класс обратной передачи и вспомогательный класс (
JMSDispatcher.java — см. код ниже), который отправляет сообщение на серверы jboss.
Вы можете развернуть его на любом компьютере. В моем случае я развернул его на
компьютере . И вы можете получить к нему доступ через этот URL: (просто измените IP, на котором развернута война клиентов).
Если вы заметили (в index.jsp — код ниже), я уже поместил некоторые значения по умолчанию, которые отражают имя очереди и IP-адреса моих двух компьютеров. Теперь, если вы увеличиваете количество раз, которое будет отправлено сообщение (возможно, 10), заполняете окно сообщения и нажимаете «
Отправить », вы должны увидеть на двух серверах некоторые сообщения, используемые MDB.
Вот файлы для создания клиента:
<form method="POST" action='<%= request.getRequestURI() + "JMSClusteredClient" %>'>
<legend>JMS Clustered - Test Client</legend>
<td>Server:</td><td><input type="text" name="server" value="," /></td>
<select name="messageType">
<option value="QUEUE" selected="selected">Queue</option>
<option value="TOPIC" >Topic</option>
<td><input type="text" name="topicqueue" value="queue/MyClusteredQueue" /></td>
<td>Times:</td><td><input type="text" name="times" value="3" /></td>
<td>Message:</td><td><textarea rows="3" cols="20" name="message"></textarea></td>
<input type="submit" value="Send">
public class JMSClusteredClient extends HttpServlet {
private static final long serialVersionUID = 1L;
* @see HttpServlet#service(HttpServletRequest request, HttpServletResponse response)
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
PrintWriter out = response.getWriter();
String topicqueue = request.getParameter("topicqueue");
String message = request.getParameter("message");
String server = request.getParameter("server");
String messageType = request.getParameter("messageType");
String times = request.getParameter("times");
int intTimes = Integer.parseInt(times);
JMSDispatcher dispatcher = new JMSDispatcher();
try {
for(int count =1; count <= intTimes;count++){
dispatcher.sendMessage( count + " of " + times + " " + message);
out.println("Message [" + message + "] sent successfully to [" + topic + "] to the [" + server + "] server " + times + " times.");
} catch (JMSException e) {
out.println("Error:" + e.getMessage());
} catch (NamingException e) {
out.println("Error:" + e.getMessage());
} finally{
Утилита для отправки сообщений:
public class JMSDispatcher {
private static final long serialVersionUID = 7105145023422143880L;
private static Logger log = Logger.getLogger(JMSDispatcher.class);
private final String CONNECTION_FACTORY_CLUSTERED = "ClusteredConnectionFactory";
private final String CONNECTION_FACTORY = "ConnectionFactory";
private final String TOPIC = "TOPIC";
private final String QUEUE = "QUEUE";
private String topicQueueName;
private String server;
private String messageType;
public void setTopicQueueName(String value){
this.topicQueueName = value;
public void setServer(String value){
this.server = value;
public void setMessageType(String value){
this.messageType = value;
public void sendMessage(Object objectMessage) throws JMSException, NamingException{
log.debug("##### Setting up a Queue/Topic Message: #####");
if (TOPIC.equals(messageType)){
} else if (QUEUE.equals(messageType)){
log.debug("##### Publishing Message: Done #####");
private void sendQueueMessage(Object objectMessage) throws JMSException, NamingException{
InitialContext initialContext = getInitialContext();
QueueConnectionFactory qcf = (QueueConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
QueueConnection queueConn = qcf.createQueueConnection();
Queue queue = (Queue) initialContext.lookup(topicQueueName);
QueueSession queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender send = queueSession.createSender(queue);
ObjectMessage om = queueSession.createObjectMessage((Serializable)objectMessage);
log.debug("##### Publishing Message to a Queue: " + queueName + "#####");
}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
private void sendTopicMessage(Object objectMessage) throws JMSException, NamingException{
InitialContext initialContext = getInitialContext();
TopicConnectionFactory tcf = (TopicConnectionFactory)initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
TopicConnection topicConn = tcf.createTopicConnection();
Topic topic = (Topic) initialContext.lookup(topicQueueName);
TopicSession topicSession = topicConn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE);
TopicPublisher send = topicSession.createPublisher(topic);
ObjectMessage om = topicSession.createObjectMessage();
log.debug("##### Publishing Message to a Topic: " + topicName + "#####");
}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
private InitialContext getInitialContext() throws NamingException{
Properties jboss = new Properties();
jboss.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
jboss.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
jboss.put("java.naming.provider.url", server);
return new InitialContext(jboss);
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5">
Счастливой кластеризации!