Статьи

Кластеризация JMS на примере

Удивительно, как команда JBoss собрала простой способ сделать JMS Clustering из коробки !!

 Я начну с простого примера, создавая очередь с именем « MyClusteredQueue ».

В этом примере я использую JBoss AS 5.1 . и два компьютера, подключенные к одной сети, с этими IP-адресами:

— Компьютер A: 192.168.0.143
— Компьютер B: 192.168.0.210

Итак, вот шаги:

1 ) Установите JBoss на обоих компьютерах. Мы собираемся использовать конфигурацию « все » для обоих компьютеров.

2 ) Мы создаем нашу очередь на обоих серверах.

Перейдите в папку $ JBOSS_HOME / server / all / deploy / messaging / и отредактируйте файл destination-service.xml . Добавьте MyClusteredQueue перед последним тегом сервера . Это выглядит так:

<!-- Cluster JMS -->
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=MyClusteredQueue"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
<attribute name="Clustered">true</attribute>
</mbean>

 

Это то, как вы добавляете очередь в JBoss, и люди, которые знакомы с этим, единственное новое — это добавить атрибут « Clustered ». Этот шаг должен быть установлен на обоих компьютерах. В конце статьи вы можете найти файлы.

3 ) Напишите MDB   для использования сообщений и разверните его на двух компьютерах. (Я использую EJB 3 — стиль MDB).

 

import java.net.InetAddress;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.log4j.Logger;

/**
* @author felipeg
*
*/
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="queue/MyClusteredQueue")
})
public class JMSClusterClientHandler implements MessageListener {
Logger log = Logger.getLogger(JMSClusterClientHandler.class);

@Override
public void onMessage(Message message) {

try{
if (message instanceof ObjectMessage)
{
InetAddress addr = InetAddress.getLocalHost();
log.info("########## Processing Host: " + addr.getHostName() + " ##########" );

ObjectMessage objMessage = (ObjectMessage) message;
Object obj = objMessage.getObject();

log.info("Object received:" + obj.toString());
}
} catch (Exception e) {
e.printStackTrace();
}

}

}

 

4 ) Запустите jboss со следующими параметрами:

Компьютер A :
$ cd $ JBOSS_HOME / bin
$ ./run.sh -c all -b 192.168.0.143 -Djboss.messaging.ServerPeerID = 1

Компьютер B :
$ cd $ JBOSS_HOME / bin
$ ./run.sh -c all -b 192.168.0.210 -Djboss.messaging.ServerPeerID = 2

Необходимо указать ID для каждого сервера, и это выполняется с помощью следующей директивы:
-Djboss.messaging.ServerPeerID

Когда вы запускаете jboss на компьютере A , вы должны увидеть журналы (server.log), сообщающие вам, что готов и прослушивает один узел, и как только вы запустите jboss на компьютере B , в журнале появятся два узла, два IP готовы потреблять сообщения.

5 ) Теперь пришло время отправить сообщение в очередь. Для этого необходимо изменить фабрику соединений на «
ClusteredConnectionFactory » (
JMSDispatcher.java — см. Код ниже).

Также в
файле jndi.properties (если вы используете DefaultContext по умолчанию
) необходимо добавить ip двух компьютеров, разделенных запятой, в
свойство java.naming.provider.url . (В моем случае создайте
переменную Properties, и я установлю все необходимые свойства,
JMSDispatcher.java — см. Код ниже).

java.naming.provider.url =
192.168.0.143:1099,192.168.0.210:1099



Клиент, который я написал, — это веб-приложение, состоящее из одной
страницы
index.jsp , которая содержит форму, которая запрашивает у вас имя очереди, тип сообщений (очередь или тема), ip и порт сервера, как много раз он будет отправлять сообщение и фактическое сообщение, которое будет отправлено; также веб-приложение имеет сервлет (
JMSClusteredClient.java — см. код ниже), который получает класс обратной передачи и вспомогательный класс (
JMSDispatcher.java — см. код ниже), который отправляет сообщение на серверы jboss.
 

Вы можете развернуть его на любом компьютере.
В моем случае я развернул его на
компьютере . И вы можете получить к нему доступ через этот URL: http://192.168.0.143:8080/JMSWeb/ (просто измените IP, на котором развернута война клиентов).

Если вы заметили (в index.jsp — код ниже), я уже поместил некоторые значения по умолчанию, которые отражают имя очереди и IP-адреса моих двух компьютеров.
Теперь, если вы увеличиваете количество раз, которое будет отправлено сообщение (возможно, 10), заполняете окно сообщения и нажимаете «
Отправить », вы должны увидеть на двух серверах некоторые сообщения, используемые MDB.
 

Вот файлы для создания клиента:
index.jsp
 

<html>
<body>
<div>
<form method="POST" action='<%= request.getRequestURI() + "JMSClusteredClient" %>'>
<fieldset>
<legend>JMS Clustered - Test Client</legend>
<table>
<tr>
<td>Server:</td><td><input type="text" name="server" value="192.168.0.143:1099,192.168.0.210:1099" /></td>
</tr>
<tr>
<td>
<select name="messageType">
<option value="QUEUE" selected="selected">Queue</option>
<option value="TOPIC" >Topic</option>
</select>
</td>
<td><input type="text" name="topicqueue" value="queue/MyClusteredQueue" /></td>
</tr>
<tr>
<td>Times:</td><td><input type="text" name="times" value="3" /></td>
</tr>
<tr>
<td>Message:</td><td><textarea rows="3" cols="20" name="message"></textarea></td>
</tr>
</table>
<input type="submit" value="Send">
</fieldset>
</form>
</div>
</body>
</html>
 
Сервлет

JMSClusteredClient.java
 
public class JMSClusteredClient extends HttpServlet {
private static final long serialVersionUID = 1L;

/**
* @see HttpServlet#service(HttpServletRequest request, HttpServletResponse response)
*/
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
PrintWriter out = response.getWriter();

String topicqueue = request.getParameter("topicqueue");
String message = request.getParameter("message");
String server = request.getParameter("server");
String messageType = request.getParameter("messageType");
String times = request.getParameter("times");

int intTimes = Integer.parseInt(times);

JMSDispatcher dispatcher = new JMSDispatcher();
dispatcher.setTopicQueueName(topicqueue);
dispatcher.setServer(server);
dispatcher.setMessageType(messageType);

try {

for(int count =1; count <= intTimes;count++){
dispatcher.sendMessage( count + " of " + times + " " + message);
}
out.println("Message [" + message + "] sent successfully to [" + topic + "] to the [" + server + "] server " + times + " times.");
} catch (JMSException e) {
e.printStackTrace();
out.println("Error:" + e.getMessage());
} catch (NamingException e) {
out.println("Error:" + e.getMessage());
e.printStackTrace();
} finally{
out.close();
}
}

}

Утилита для отправки сообщений:
JMSDispatcher.java
 
public class JMSDispatcher {

/**
*
*/
private static final long serialVersionUID = 7105145023422143880L;
private static Logger log = Logger.getLogger(JMSDispatcher.class);


private final String CONNECTION_FACTORY_CLUSTERED = "ClusteredConnectionFactory";
private final String CONNECTION_FACTORY = "ConnectionFactory";

private final String TOPIC = "TOPIC";
private final String QUEUE = "QUEUE";

private String topicQueueName;
private String server;
private String messageType;


public void setTopicQueueName(String value){
this.topicQueueName = value;
}

public void setServer(String value){
this.server = value;
}

public void setMessageType(String value){
this.messageType = value;
}

public void sendMessage(Object objectMessage) throws JMSException, NamingException{
log.debug("##### Setting up a Queue/Topic Message: #####");
if (TOPIC.equals(messageType)){
sendTopicMessage(objectMessage);
} else if (QUEUE.equals(messageType)){
sendQueueMessage(objectMessage);
}
log.debug("##### Publishing Message: Done #####");
}


private void sendQueueMessage(Object objectMessage) throws JMSException, NamingException{
try{

InitialContext initialContext = getInitialContext();

QueueConnectionFactory qcf = (QueueConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
QueueConnection queueConn = qcf.createQueueConnection();
Queue queue = (Queue) initialContext.lookup(topicQueueName);
QueueSession queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queueConn.start();

QueueSender send = queueSession.createSender(queue);
ObjectMessage om = queueSession.createObjectMessage((Serializable)objectMessage);
setMessageProperties(om);
log.debug("##### Publishing Message to a Queue: " + queueName + "#####");
send.send(om);
send.close();

queueConn.stop();
queueSession.close();
queueConn.close();
}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
}

}

private void sendTopicMessage(Object objectMessage) throws JMSException, NamingException{

try{
InitialContext initialContext = getInitialContext();

TopicConnectionFactory tcf = (TopicConnectionFactory)initialContext.lookup(CONNECTION_FACTORY_CLUSTERED);
TopicConnection topicConn = tcf.createTopicConnection();
Topic topic = (Topic) initialContext.lookup(topicQueueName);
TopicSession topicSession = topicConn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE);
topicConn.start();

TopicPublisher send = topicSession.createPublisher(topic);

ObjectMessage om = topicSession.createObjectMessage();
om.setObject((Serializable)objectMessage);
setMessageProperties(om);
log.debug("##### Publishing Message to a Topic: " + topicName + "#####");
send.publish(om);
send.close();

topicConn.stop();
topicSession.close();
topicConn.close();

}catch(MessageFormatException ex){
log.error("##### The MESSAGE is not Serializable ####");
throw ex;
}catch(MessageNotWriteableException ex){
log.error("##### The MESSAGE is not Readable ####");
throw ex;
}catch(JMSException ex){
log.error("##### JMS provider fails to set the object due to some internal error. ####");
throw ex;
}
}

private InitialContext getInitialContext() throws NamingException{
Properties jboss = new Properties();
jboss.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
jboss.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
jboss.put("java.naming.provider.url", server);
return new InitialContext(jboss);

}
}
 

И
web.xml
 
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5">
<display-name>JMSWeb</display-name>
<welcome-file-list>
<welcome-file>index.jsp</welcome-file>
</welcome-file-list>

<servlet>
<description></description>
<display-name>JMSClusteredClient</display-name>
<servlet-name>JMSClusteredClient</servlet-name>
<servlet-class>com.blogspot.felipeg48.jms.web.JMSClusteredClient</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>JMSClusteredClient</servlet-name>
<url-pattern>/JMSClusteredClient</url-pattern>
</servlet-mapping>
</web-app>

 Счастливой кластеризации!