Статьи

Асинхронная обработка сообщений с помощью Mule

Обработка сообщений асинхронно является важной техникой при разработке интеграционных приложений . Асинхронные приложения обычно легче масштабировать, они позволяют реализовать шаблоны надежности и иногда лучше отражают варианты использования в реальном мире.   Мул, что неудивительно, предлагает множество возможностей для асинхронной обработки сообщений.

Асинхронные потоки

Установка шаблона обмена для источника сообщения в «односторонний» обеспечивает асинхронную обработку потока. Некоторые транспорты и соединители , такие как JMS или транспорт VM, по умолчанию являются асинхронными. Другие транспорты, которые по своей природе являются синхронными, например HTTP, нуждаются в явно заданной схеме обмена. Установка шаблонов одностороннего обмена на этих транспортах позволяет моделировать асинхронное поведение с протоколами, которые в противном случае не были бы асинхронными. Следующий Gist демонстрирует, как асинхронно соединять HTTP-запрос с JMS.

<flow name="HTTP to JMS Flow">
        <http:inbound-endpoint address="http://localhost:8080/foo" exchange-pattern="one-way"/>
        <jms:outbound-endpoint queue="messages"/>
 </flow>

Агрегация сообщений

Вы можете обрабатывать асинхронно отправляемые сообщения в группах с помощью агрегатора-сборщика. Группы сообщений определяются установкой свойства correlationId в MuleMessage или установкой исходящего заголовка MULE_CORRELATION_ID. Свойство correlationGroupSize объекта MuleMessage или заголовок MULE_CORRELATION_GROUP_SIZE определяют количество сообщений в группе.

Ниже показано, как агрегатор сбора может использоваться для асинхронного ожидания и сбора содержимого группы корреляции, поступающей на входящую конечную точку виртуальной машины.

<flow name="aggregate.messages">
  <vm:inbound-endpoint path="foo.bar" exchange-pattern=”one-way”/>
  <collection-aggregator timeout="6000" failOnTimeout="false"/>
  <vm:outbound-endpoint path="messages.out"/>
</flow>

Разделение сообщений

Некоторые полезные данные сообщений, такие как коллекции или документы XML, могут быть разделены и отправлены асинхронно. Вот некоторые из разделителей сообщений, которые поддерживает Mule:

  • collection-splitter: разбивает полезную нагрузку List на отдельные сообщения.
  • разделитель: разделить сообщение, используя язык выражений Mule .
  • mulexml: filter-based-splitter: Распределяет полезную нагрузку документа XML с использованием выражения XPath.

Следующий Gist демонстрирует разбиение java.util.List и маршрутизацию в очередь JMS.

  <collection-splitter/>
  <jms:outbound-endpoint queue="order.queue"/>

Чанкинг сообщений

Разделенные полезные нагрузки сообщения могут быть повторно собраны с помощью агрегатора сообщений. По умолчанию message-chunk-aggregator будет использовать свойства correlationId и correlationGroupSize MuleMessage для повторной сборки. Вы можете определить необязательный «correlationIdExpression» для повторной сборки с другим свойством сообщения.

Следующий поток иллюстрирует, как собрать группу разделенных сообщений обратно вместе.

<flow name="aggregate.chunks">
  <vm:inbound-endpoint path="foo.bar"/>
  <message-chunk-aggregator/>
  <vm:outbound-endpint path="chunk.out"/>
</flow>

Настройка асинхронных потоков

Асинхронную обработку для потока можно настроить, определив стратегию асинхронной обработки в очереди . Стратегия множественной очереди с асинхронной обработкой может быть определена и установлена ​​с использованием атрибута «processingStrategy» потока. Ниже показано, как настроить поток для использования до 500 потоков для асинхронной обработки сообщений, поступающих на входящую конечную точку виртуальной машины.

<queued-asynchronous-processing-strategy name="allow500Threads" maxThreads="500"/>

<flow name="acceptOrders" processingStrategy="allow500Threads">
  <vm:inbound-endpoint path="acceptOrders" exchange-pattern="one-way"/>
  <vm:outbound-endpoint path="commonProcessing" exchange-pattern="one-way"/>
</flow>

Завершение

Асинхронная обработка сообщений является одним из ключей к эффективному использованию Mule. Надеемся, что этот пост проиллюстрировал некоторые функции Mule, которые облегчают диспетчеризацию, отправку и настройку асинхронных потоков сообщений.

Похожие сообщения:

  1. Комплексная обработка событий Twitter (CEP) с помощью Esper и Drools
  2. ESB или нет в ESB Пересмотрено — Часть 3, Архитектура уровня API и Grid Processing
  3. Маршрутизация с обработчиками сообщений в потоках (часть 1)
  4. Архитектура Mule 3: Часть 2. Знакомство с процессором сообщений