WSO2 Message Broker (MB) — это новый проект с открытым исходным кодом и продукт от WSO2, который предоставляет функции обмена сообщениями на платформе WSO2 Carbon и другим клиентам на разных языках. Он работает как автономно, так и совместно с такими продуктами и компонентами, как WSO2 ESB и WSO2, сервер обработки сложных событий.
МБ основан на проекте Apache Qpid / Java ( http://qpid.apache.org ). От Apache Qpid MB получает базовую поддержку протокола AMQP и JMS API. Кроме того, в WSO2 добавлена поддержка API Amazon SQS и поддержка WS-Eventing.
Понимание того, как брокер МБ вписывается в архитектуру предприятия
Message Broker предоставляет три основных возможности в общую архитектуру предприятия
- Возможность очередей / постоянных сообщений
- Модель распределения событий (pub / sub)
- Посредник, где несколько систем могут подключаться независимо от направления сообщений.
Чтобы дать некоторые конкретные примеры этих преимуществ, вот несколько сценариев:
1) В ESB WSO2 обычным делом является сохранение сообщения из входящего HTTP-запроса в постоянную очередь сообщений, а затем обработка оттуда. МБ может обеспечить постоянную очередь.
2) ESB WSO2 уже имеет модель распределения событий и поддержку событий, но брокер на основе QPid обеспечивает более высокую производительность, а также поддерживает JMS API. Например, вы можете отправить сообщения извне брандмауэра на сервер внутри. Вы можете подключить ESB или Service Host в брандмауэре к Message Broker, работающему вне брандмауэра (например, в Amazon EC2). Эта модель используется шлюзом облачных сервисов WSO2.
Где подходит AMQP?
AMQP ( www.amqp.org ) — это открытый протокол для обмена сообщениями. Хотя протокол AMQP все еще находится в стадии разработки, он выпустил три стабильных выпуска (0-8, 0-9-1 и 0-10) со сроком выпуска 1,0 в течение 2011 года. Существует ряд реализаций стандарта AMQP в производство, включая Apache Qpid (версии для Java и C ++), RabbitMQ, OpenAMQ и другие.
WSO2 является членом рабочей группы AMQP в течение нескольких лет, и мы решительно поддерживаем AMQP как способ обеспечения взаимодействия и большей открытости в пространстве обмена сообщениями.
Брокер Qpid поддерживает множество клиентов поверх протокола AMQP. Наиболее полезным из них для Carbon является API Java JMS 1.1, который предоставляет переносимый API, а также основной интерфейс с ESB WSO2. Кроме того, есть C # и другие API. WSO2 MB также дополняет их API-интерфейсами WS-Eventing и Amazon SQS для обеспечения взаимодействия с использованием HTTP, REST и SOAP.
Установка WSO2 MB
Вы можете скачать WSO2 MB Beta с:
http://people.wso2.com/~manjular/wso2mb-1.0.0-beta.zip
После того, как вы загрузили и распаковали, просто перейдите в каталог установки
cd wso2mb-1.0.0-SNAPSHOT
bin\wso2server.bat [ON WINDOWS]
bin/wso2server.sh [ON LINUX/MACOSX]
Давайте обратимся к каталогу установки с этого момента.
Вы должны увидеть запуск сервера:
[2011-03-16 14:00:12,471] INFO {org.wso2.carbon.server.Main} - Initializing system...
[2011-03-16 14:00:12,840] INFO {org.wso2.carbon.server.TomcatCarbonWebappDeployer} - Deployed Carbon webapp: StandardEngine[Tomcat].StandardHost[defaulthost].StandardContext[/]
[2011-03-16 14:00:14,147] INFO {org.wso2.carbon.atomikos.TransactionFactory} - Starting Atomikos Transaction Manager 3.7.0
[2011-03-16 14:00:19,952] INFO {org.wso2.carbon.core.internal.CarbonCoreActivator} - Starting WSO2 Carbon...
[2011-03-16 14:00:19,983] INFO {org.wso2.carbon.core.internal.CarbonCoreActivator} - Operating System : Mac OS X 10.6.6, x86_64
[2011-03-16 14:00:19,984] INFO {org.wso2.carbon.core.internal.CarbonCoreActivator} - Java Home : /System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
[2011-03-16 14:00:19,984] INFO {org.wso2.carbon.core.internal.CarbonCoreActivator} - Java Version : 1.6.0_24
[2011-03-16 14:00:19,985] INFO {org.wso2.carbon.core.internal.CarbonCoreActivator} - Java VM : Java HotSpot(TM) 64-Bit Server VM 19.1-b02-334,Apple Inc.
[2011-03-16 14:00:19,985] INFO {org.wso2.carbon.core.internal.CarbonCoreActivator} - Carbon Home : /Users/paul/wso2/wso2mb-1.0.0-SNAPSHOT
[2011-03-16 14:00:19,985] INFO {org.wso2.carbon.core.internal.CarbonCoreActivator} - Java Temp Dir : /Users/paul/wso2/wso2mb-1.0.0-SNAPSHOT/tmp
[2011-03-16 14:00:19,986] INFO {org.wso2.carbon.core.internal.CarbonCoreActivator} - User : paul, en-US, Europe/London
2011-03-16 14:00:12,471] INFO {org.wso2.carbon.server.Main} - Initializing system...
some logs deleted
[2011-03-16 14:00:41,691] INFO {org.wso2.carbon.core.transports.http.HttpsTransportListener} - HTTPS port : 9443
[2011-03-16 14:00:41,691] INFO {org.wso2.carbon.core.transports.http.HttpTransportListener} - HTTP port : 9763
[2011-03-16 14:00:42,422] INFO {org.wso2.carbon.ui.internal.CarbonUIServiceComponent} - Mgt Console URL : https://192.168.1.100:9443/carbon/
[2011-03-16 14:00:42,499] INFO {org.wso2.carbon.core.internal.StartupFinalizerServiceComponent} - Started Transport Listener Manager
[2011-03-16 14:00:42,500] INFO {org.wso2.carbon.core.internal.StartupFinalizerServiceComponent} - Server : WSO2 MB -1.0.0-SNAPSHOT
[2011-03-16 14:00:42,506] INFO {org.wso2.carbon.core.internal.StartupFinalizerServiceComponent} - WSO2 Carbon started in 27 sec
2011-03-16 14:00:12,471] INFO {org.wso2.carbon.server.Main} - Initializing system...
Брокер сообщений WSO2 можно установить для производственных систем. Как правило, он либо регистрируется как демон Linux, либо как служба Windows — но сейчас мы будем придерживаться версии для командной строки для простоты.
После запуска сервера вы можете получить доступ к консоли управления. Направьте свой браузер на: https: // localhost: 9443
Сначала вы увидите экран браузера, предупреждающий вас о сертификатах. Пожалуйста, попросите браузер продолжить (для рабочего сервера вы обычно устанавливаете надлежащий сертификат SSL / TLS, но для первоначальной установки мы создаем самозаверяющий сертификат, который вам необходимо согласиться использовать).
Как только вы приняли сертификат, вы должны увидеть экран, подобный следующему:
Вы можете войти, используя имя пользователя / пароль по умолчанию, которое является admin / admin.
После входа в систему вы должны увидеть следующий экран:
Прежде чем мы рассмотрим консоль администратора, давайте сначала создадим простой JMS-клиент, который будет взаимодействовать с сервером через AMQP по TCP / IP.
Начало работы с JMS
Спецификация Java Message Service (JMS) — http://www.oracle.com/technetwork/java/index-jsp-142945.html — это спецификация для общения с брокерами сообщений. К сожалению, оно плохо названо: слово «сервис» подразумевает, что это реализация, но JMS не определяет реальную службу обмена сообщениями, а только API, который используется для доступа к JMS-провайдерам. «Java Messaging API» будет более точно выражать, что такое JMS. В результате существует множество провайдеров JMS, и они часто используют совершенно разные подходы к своей базовой модели.
Посредник сообщений WSO2 основан на проекте Apache Qpid ( http://qpid.apache.org ) и является совместимой реализацией спецификации JMS, а также различных уровней спецификации AMQP (0-8, 0-9- 1, 0-10).
Чтобы написать полностью стандартный переносимый код JMS, вам нужно использовать провайдера JNDI для получения доступа к соединению JMS, очередям и т. Д. В этом примере мы будем использовать провайдера Qpid JNDI, подкрепленный простым набором свойств. Это делает систему в целом простой и легко переносимой.
Вот пример приложения JMS, которое можно использовать для проверки доступа к Message Broker. Вы можете найти этот код здесь: http://people.apache.org/~pzf/MB/JMSExample.java
Во-первых, некоторые необходимые импортные.
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
Далее следует простое определение «основного» класса:
public class JMSExample {
public static void main(String[] args) {
JMSExample producer = new JMSExample();
producer.runTest();
}
private void runTest() {
Since this is just an example, we will place the complete logic in a try/catch block.
try {
Обычно JNDI настраивается с помощью файла свойств, но вы также можете сделать это из набора свойств в памяти. Чтобы увидеть аналогичную настройку с файлом свойств, взгляните на пример ESB ниже. Вот объект свойств для хранения свойств:
Properties properties = new Properties();
Для начальной загрузки записей JNDI для фабрики соединений и очереди мы устанавливаем пары имя / значение в объект простых свойств:
properties.put("connectionfactory.cf",
"amqp://admin:admin@carbon/carbon?brokerlist='tcp://localhost:5672'");
Имя свойства «connectionfactory.cf» означает, что мы создаем объект типа ConnectionFactory с именем «cf». Значением является URL-адрес, который используется для начальной загрузки ConnectionFactory: этот URL-адрес указывает на брокер AMQP. Синтаксис разбит следующим образом:
amqp:// Indicates this is an AMQP URL
admin:admin@ This is the username/password
carbon/carbon The client ID and virtual host
? separator for options
brokerlist=’tcp://localhost:5672’ A list of broker URLs to use
Для получения дополнительной информации об этом синтаксисе URL см. Https://cwiki.apache.org/qpid/connection-url-format.html.
Имя виртуального хоста является частью определения в: /repository/conf/qpid/etc/virtualhosts.xml
Этот файл также определяет такие аспекты, как максимальное количество сообщений в очереди и глубина очереди (максимальный размер в байтах очереди).
Теперь нам нужно создать запись JNDI для очереди, с которой мы будем разговаривать:
properties.put("destination.samplequeue", "samplequeue; {create:always}");
Имя свойства «destination.samplequeue» указывает на создание пункта назначения с именем JNDI «samplequeue». Значение свойства «samplequeue; {create: всегда} ”указывает очередь с именем« samplequeue »с атрибутом, который указывает посреднику создать очередь, если она не существует.
Эти свойства специфичны для конкретной используемой нами реализации JNDI, а именно Qpid «PropertiesFileInitialContextFactory». Итак, теперь нам нужно настроить JNDI для использования этой реализации:
properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
Теперь мы можем выполнить поиск JNDI:
Context context = new InitialContext(properties);
ConnectionFactory connectionFactory =
(ConnectionFactory) context.lookup("cf");
«Найдя» фабрику соединений JMS в JNDI, теперь мы можем создать соединение с брокером:
Connection connection = connectionFactory.createConnection();
connection.start();
И теперь мы можем создать сеанс JMS:
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Еще один поиск из JNDI будет искать нашу очередь
Destination destination = (Destination) context
.lookup("samplequeue");
Теперь мы можем создать продюсера и отправить сообщение:
MessageProducer producer = session.createProducer(destination);
TextMessage outMessage = session.createTextMessage();
outMessage.setText("Hello World!");
producer.send(outMessage);
Конечно, в реальной жизни вы, скорее всего, НЕ получили бы то же самое сообщение из того же приложения, но для этого примера мы теперь получим сообщение:
MessageConsumer consumer = session.createConsumer(destination);
Message inMessage = consumer.receive();
System.out.println(((TextMessage)inMessage).getText());
И закройте соединение и начальный контекст:
connection.close();
context.close();
} catch (Exception exp) {
exp.printStackTrace();
}
Чтобы опробовать этот клиент, вам нужны правильные клиентские файлы JAR.
В бета-версии вы найдете:
/jms-client-lib/geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
/jms-client-lib/qpid-client-0.9.wso2v2.jar
Вы также должны ссылаться
/lib/log4j-1.2.13.jar
Как только они появятся в вашем classpath, вы можете запустить программу. Вы должны увидеть простой вывод
log4j:WARN No appenders could be found for logger (org.apache.qpid.jndi.PropertiesFileInitialContextFactory).
log4j:WARN Please initialize the log4j system properly.
Hello World!
Если вы получили это далеко, поздравляю!
В следующем разделе мы рассмотрим использование ESB с Message Broker.
Для этого есть два подхода:
1) Если вы используете существующий WSO2 ESB 3.0.1 или аналогичный, вы можете развернуть клиентские библиотеки MB и обмениваться данными по сети.
2) Начиная со следующего выпуска WSO2 ESB (3.1.0) он будет включать функции Qpid / MB как часть выпуска, и вы можете использовать среду исполнения Message Broker / JMS локально в той же JVM.
WSO2 MB и WSO2 ESB вместе
В этом первом примере мы собираемся заставить ESB WSO2 и MB работать вместе.
Предполагая, что у вас уже установлен и работает MB, вам сначала нужно установить ESB и изменить порты консоли администратора, чтобы они не конфликтовали. Вы можете загрузить WSO2 ESB 3.0.1 с: http://wso2.org/downloads/esb
Процедура установки аналогична: разархивируйте ESB, но пока не запускайте его. Давайте назовем (для этого руководства) каталог, в котором вы установили ESB.
Сначала давайте отредактируем порты, которые слушает ESB. (В качестве альтернативы вы могли бы сделать то же самое с MB вместо этого).
Отредактируйте \ repository \ conf \ mgt-transports.xml
Этот файл определяет, какие порты запускает консоль управления (HTTP и HTTPS).
Пожалуйста измените:
<transport name="http" class="org.wso2.carbon.server.transports.http.HttpTransport">
<parameter name="port">9763</parameter>
читать:
<transport name="http" class="org.wso2.carbon.server.transports.http.HttpTransport">
<parameter name="port">9764</parameter>
Аналогичным образом измените порт HTTPS на 9444.
Теперь следующий шаг — убедиться, что у ESB есть нужные драйверы для связи с MB. Скопируйте следующие файлы JAR в каталог \ repository \ components \ lib
/jms-client-lib/geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
/jms-client-lib/qpid-client-0.9.wso2v2.jar
Нам также необходимо правильно настроить транспорт JMS. Для этого мы редактируем файл axis2.xml: \ repository \ conf \ axis2.xml
В этом файле закомментирован транспорт JMS. Также необходимо обновить настройки, чтобы использовать библиотеки Qpid. Измените файл так, чтобы разделы получателя и отправителя JMS выглядели так:
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
<parameter name="myTopicConnectionFactory" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">resources/jndi.properties</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
</parameter>
<parameter name="myQueueConnectionFactory" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">resources/jndi.properties</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
</parameter>
<parameter name="default" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.qpid.jndi.PropertiesFileInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">resources/jndi.properties</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
</parameter>
</transportReceiver>
Вы можете найти мою копию отредактированного axis2.xml здесь http://people.wso2.com/~paul/mb-guide-1.0/
Если вы просмотрели конфигурацию JMS, вы заметите, что она ссылается на ресурс JNDI: resources / jndi.properties.
Это используется для того же, что и жестко закодированные свойства, которые мы использовали выше, — для настройки локального JNDI, который будет использовать клиент JMS внутри ESB. В будущем выпуске ESB мы планируем автоматически настроить этот JNDI, но пока мы можем просто создать файл в каталоге / resources.
Пожалуйста, создайте /resources/jndi.properties, чтобы он выглядел так:
connectionfactory.TopicConnectionFactory = \
amqp://admin:admin@carbon/carbon?brokerlist='tcp://localhost:5672'
connectionfactory.QueueConnectionFactory = \
amqp://admin:admin@carbon/carbon?brokerlist='tcp://localhost:5672'
destination.dynamicQueues/myqueue=jmsdestinationqueue; {create:always}
destination.myqueue=jmsdestinationqueue; {create:always}
Обратите внимание, что строки, заканчивающиеся \, на самом деле разделены для форматирования и должны быть одной непрерывной строкой.
Вы можете найти этот файл здесь: http://people.apache.org/~pzf/MB/
Теперь мы сможем запустить ESB. Конечно, он пока ничего не сделает.
Просто для интереса, вы можете попробовать запустить ESB WSO2 с остановленным MB . Теперь, когда транспорт JMS включен, вы должны увидеть ошибки соединения:
javax.jms.JMSException: Error creating connection: Connection refused
at org.apache.qpid.client.AMQConnectionFactory.createConnection(AMQConnectionFactory.java:286)
at org.apache.axis2.transport.jms.JMSUtils.createConnection(JMSUtils.java:579)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.createConnection(ServiceTaskManager.java:803)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.getConnection(ServiceTaskManager.java:688)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.receiveMessage(ServiceTaskManager.java:487)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:412)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:58)
ESB все равно запустится, но транспорт JMS будет отключен.
Если вы запустите MB, то ESB должен начать нормально. Однако вы увидите несколько предупреждающих строк:
[2011-04-01 09:14:05,320] WARN - JMSUtils Cannot locate destination : WSDLValidatorService
Это связано с тем, что ESB связывает внутренние службы с транспортом JMS. В самых последних сборках ESB это было изменено, так что ESB только связывает внутренние службы с транспортными средствами HTTP / S, чтобы избежать этого.
Если вы зайдете в веб-консоль Message Broker, теперь вы сможете увидеть очереди, созданные для поддержки ESB. Просто щелкните по левому пункту меню Очереди à Список .
Теперь мы можем создать простой прокси-сервис, который будет проверять соединение JMS. Это небольшое изменение на одном из стандартных образцов ESB.
Этот прокси-сервис ожидает SOAP или XML-сообщение через HTTP POST от клиента и просто помещает тело этого сообщения в очередь JMS. Затем сервер отвечает HTTP 202, принятым клиенту. Это отличный тест, потому что мы можем использовать что-то простое, например, curl, для отправки сообщений в ESB.
Вот определение прокси для ESB:
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse" name="testJMS" transports="https jms http" startOnLoad="true" trace="disable">
<target>
<endpoint name="jmsqueue">
<address uri="jms:/myqueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=resources/jndi.properties"/>
</endpoint>
<inSequence>
<property action="set" name="OUT_ONLY" value="true"/>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
</inSequence>
<outSequence>
<send/>
</outSequence>
</target>
</proxy>
Этот файл доступен по адресу http://people.apache.org/~pzf/MB/testJMS.xml.
Вам нужно разместить этот файл здесь: /repository/conf/synapse-config/proxy-services/testJMS.xml
Это хорошая особенность ESB. Фактически вы можете настроить независимые прокси-службы, каждая из которых имеет свой собственный файл конфигурации или запись в реестре, и ESB объединяет их во время выполнения для создания единого согласованного ESB. Это отлично подходит для внесения дополнительных изменений. Вы даже можете изменить этот файл во время выполнения и использовать прокси в горячем режиме.
Прокси сервис действительно прост. По сути, он просто устанавливает пункт назначения для отправки сообщения в очередь JMS, которая определяется с использованием комбинации JNDI и URL-адреса JMS.
URL JMS состоит из:
JMS: / myqueue
Найдите запись JNDI «myqueue» (см. Свойства jndi выше).
? Разделитель, указывающий дополнительные атрибуты
transport.jms.ConnectionFactoryJNDIName = QueueConnectionFactory
Посмотрите ConnectionFactory в JNDI с именем QueueConnectionFactory
Separator (конвертируется в ‘&’)
Следующая часть конфигурации прокси просто сообщает ESB, что это односторонний поток, а не ожидать ответа:
Следующая строка гарантирует, что ESB отправит ответ HTTP 202 Accepted клиенту:
Все остальное в конфиге полностью по умолчанию.
Чтобы проверить это, есть простой тестовый XML-файл, который вы можете отправить в ESB с помощью curl:
<test xmlns = «http://fremantle.org»>
<sample> data </ sample>
</ test>
Еще раз вы можете найти этот файл здесь: http://people.apache.org/~pzf/MB/
Для следующего шага, пожалуйста, убедитесь, что у вас установлена копия curl. Если вы используете Linux или Mac, у вас будет по умолчанию. На Windows вы можете найти бесплатную версию в Интернете. Давайте попробуем запрос к ESB:
curl http://localhost:8280/services/testJMS/a -X POST -H 'Content-type: text/plain' --data @samplexml.xml
Запустите это несколько раз просто для удовольствия. Вы не увидите много. Если все идет хорошо, вы также не увидите ошибок на консоли WSO2 ESB. Если вы добавите ‘-v’ в командную строку curl, вы увидите гораздо больше информации о разделе HTTP потока и увидите хороший признак того, что дела идут хорошо:
Имя очереди
|
Глубина очереди
|
Количество сообщений
|
Время создания
|
Обновленное время
|
Тип
|
0 (б)
|
0
|
Пт Апр 01 09:14:05 BST 2011
|
Пт Апр 01 09:14:05 BST 2011
|
||
390 (б)
|
6
|
Пт Апр 01 10:04:31 BST 2011
|
Пт Апр 01 10:04:31 BST 2011
|
Как видно из моего примера, я отправил 6 сообщений.
В качестве упражнения, почему бы не попробовать изменить простой код JMS, чтобы забрать эти сообщения из очереди JMS. Если вы застряли, образец находится в том же месте, что и другой код.
Вывод
С MB мы можем сделать гораздо больше. В будущих статьях я надеюсь рассказать об использовании клиента C # для взаимодействия, использовании поддержки SQS и о том, как код MB может быть встроен непосредственно в ESB для обеспечения организации очереди и обработки событий. В то же время, я надеюсь, что это было простое введение, чтобы вы начали работать с WSO2 MB.
От http://pzf.fremantle.org/2011/04/introduction-to-wso2-message-broker_05.html