Обработка сообщений асинхронно является важной техникой при разработке интеграционных приложений . Асинхронные приложения обычно легче масштабировать, они позволяют реализовать шаблоны надежности и иногда лучше отражают варианты использования в реальном мире. Мул, что неудивительно, предлагает множество возможностей для асинхронной обработки сообщений.
Асинхронные потоки
Установка шаблона обмена для источника сообщения в «односторонний» обеспечивает асинхронную обработку потока. Некоторые транспорты и соединители , такие как JMS или транспорт VM, по умолчанию являются асинхронными. Другие транспорты, которые по своей природе являются синхронными, например HTTP, нуждаются в явно заданной схеме обмена. Установка шаблонов одностороннего обмена на этих транспортах позволяет моделировать асинхронное поведение с протоколами, которые в противном случае не были бы асинхронными. Следующий Gist демонстрирует, как асинхронно соединять HTTP-запрос с JMS.
<flow name="HTTP to JMS Flow"> <http:inbound-endpoint address="http://localhost:8080/foo" exchange-pattern="one-way"/> <jms:outbound-endpoint queue="messages"/> </flow>
Агрегация сообщений
Вы можете обрабатывать асинхронно отправляемые сообщения в группах с помощью агрегатора-сборщика. Группы сообщений определяются установкой свойства correlationId в MuleMessage или установкой исходящего заголовка MULE_CORRELATION_ID. Свойство correlationGroupSize объекта MuleMessage или заголовок MULE_CORRELATION_GROUP_SIZE определяют количество сообщений в группе.
Ниже показано, как агрегатор сбора может использоваться для асинхронного ожидания и сбора содержимого группы корреляции, поступающей на входящую конечную точку виртуальной машины.
<flow name="aggregate.messages"> <vm:inbound-endpoint path="foo.bar" exchange-pattern=”one-way”/> <collection-aggregator timeout="6000" failOnTimeout="false"/> <vm:outbound-endpoint path="messages.out"/> </flow>
Разделение сообщений
Некоторые полезные данные сообщений, такие как коллекции или документы XML, могут быть разделены и отправлены асинхронно. Вот некоторые из разделителей сообщений, которые поддерживает Mule:
- collection-splitter: разбивает полезную нагрузку List на отдельные сообщения.
- разделитель: разделить сообщение, используя язык выражений Mule .
- mulexml: filter-based-splitter: Распределяет полезную нагрузку документа XML с использованием выражения XPath.
Следующий Gist демонстрирует разбиение java.util.List и маршрутизацию в очередь JMS.
<collection-splitter/> <jms:outbound-endpoint queue="order.queue"/>
Чанкинг сообщений
Разделенные полезные нагрузки сообщения могут быть повторно собраны с помощью агрегатора сообщений. По умолчанию message-chunk-aggregator будет использовать свойства correlationId и correlationGroupSize MuleMessage для повторной сборки. Вы можете определить необязательный «correlationIdExpression» для повторной сборки с другим свойством сообщения.
Следующий поток иллюстрирует, как собрать группу разделенных сообщений обратно вместе.
<flow name="aggregate.chunks"> <vm:inbound-endpoint path="foo.bar"/> <message-chunk-aggregator/> <vm:outbound-endpint path="chunk.out"/> </flow>
Настройка асинхронных потоков
Асинхронную обработку для потока можно настроить, определив стратегию асинхронной обработки в очереди . Стратегия множественной очереди с асинхронной обработкой может быть определена и установлена с использованием атрибута «processingStrategy» потока. Ниже показано, как настроить поток для использования до 500 потоков для асинхронной обработки сообщений, поступающих на входящую конечную точку виртуальной машины.
<queued-asynchronous-processing-strategy name="allow500Threads" maxThreads="500"/> <flow name="acceptOrders" processingStrategy="allow500Threads"> <vm:inbound-endpoint path="acceptOrders" exchange-pattern="one-way"/> <vm:outbound-endpoint path="commonProcessing" exchange-pattern="one-way"/> </flow>
Завершение
Асинхронная обработка сообщений является одним из ключей к эффективному использованию Mule. Надеемся, что этот пост проиллюстрировал некоторые функции Mule, которые облегчают диспетчеризацию, отправку и настройку асинхронных потоков сообщений.
Похожие сообщения:
- Комплексная обработка событий Twitter (CEP) с помощью Esper и Drools
- ESB или нет в ESB Пересмотрено — Часть 3, Архитектура уровня API и Grid Processing
- Маршрутизация с обработчиками сообщений в потоках (часть 1)
- Архитектура Mule 3: Часть 2. Знакомство с процессором сообщений