Статьи

Параллельная групповая передача в Mule Made Easy

Распространенный сценарий интеграции — это когда одно сообщение необходимо отправить несколькими маршрутами.

Возьмем, к примеру, случай, когда вы получаете сообщение о регистрации нового клиента. Сообщение должно быть направлено через CRM для создания клиента,  маркетингу,  который захочет узнать, как клиент узнал о компании, и, наконец, перейти к системам обеспечения и хранения, чтобы они могли творить свое волшебство.

В этом случае сообщение передается в режиме «запусти и забудь», что означает, что вам не требуется ответ от любой из этих систем для продолжения обработки. Каждая из этих систем отвечает за обработку своей собственной логики и собственных ошибок. В Mule ESB вы можете сделать это следующим образом:

<async>
  <flow-ref name="crmSystemRoute" />
  <flow-ref name="marketingSystemRoute" />
  <flow-ref name="provisioningSystemRoute" />
  <flow-ref name="stockSystemRoute" />
</async>

Есть и другие случаи, в которых вы делаете нужен ответ от маршрутов. Предположим, вы используете приложение для бронирования билетов, и кто-то хочет получить прямой рейс из Буэнос-Айреса в Сан-Франциско. Ваше приложение должно связаться со всеми известными брокерами авиакомпании, получить доступ к этим рейсам и выбрать самый дешевый. Область <async> недостаточна для вас в этом случае, потому что вы хотите, чтобы поток, обрабатывающий запрос, действительно ожидал поступления ответов. Похоже, работа для многоадресного маршрутизатора!

Когда <все> просто недостаточно

Mule ESB уже имеет многоадресный маршрутизатор <all>, который направляет сообщение по нескольким маршрутам, а затем продолжает обработку после того, как все маршруты отвечают. Хотя это работает, у этого маршрутизатора есть ряд ограничений:

  • Он использует последовательную обработку! Это означает, что все маршруты выполняются по порядку, один за другим в одном потоке. Это означает, что общее количество времени, которое мы должны ждать, пока мы не сможем достать все ответы, является суммой времени выполнения всех маршрутов.
  • Он не очень хорошо справляется с обработкой ошибок: предположим, у вас есть 4 маршрута, как в примере выше. В случае сбоя второго маршрута маршруты 3 и 4 никогда не будут выполнены. Кроме того, вы получаете информацию только о сбое маршрута № 2, информация о маршруте 1 недоступна.
  • Это не очень настраиваемый . В случае успеха многоадресный маршрутизатор всегда возвращает MuleMessageCollection. Итак, возвращаясь к примеру с бронированием поездки, у вас не может быть настроенного <all> маршрутизатора, который просто возвращает самый дешевый рейс вместо всей MuleMessageCollection.

Чтобы преодолеть это ограничение, в выпуске раннего доступа Mule 3.5 появился новый ребёнок: маршрутизатор <scatter-collect>.

Представляем Scatter-Gather

Диаграмма рассеяния

Шаблоны Enterprise Integration Patterns Scatter-Gather определяют: «
Scatter-Gather  
направляет сообщение-запрос нескольким получателям. Затем он использует  
агрегатор  т
о собирать ответы и перегонять их в одно ответное сообщение. »

Мне бы хотелось описать это простым и более кратким способом, но я не могу, поэтому давайте просто сделаем краткий список различий с маршрутизатором <all> и перейдем к примеру:

  • Параллельная обработка: Scatter-Gather использует пул потоков для одновременного выполнения всех маршрутов. Это означает, что общее время, в течение которого поток вызывающего абонента должен ждать маршрутов для ответа, больше не является суммой времени всего маршрута, а является только самым длинным из них.
  • Лучшая обработка ошибок: поскольку все маршруты выполняются параллельно, один (или несколько) сбойных маршрутов не препятствуют выполнению других маршрутов. Кроме того, в случае исключения вы получите исключение CompositeRoutingException, которое содержит не только информацию обо всех неудачных маршрутах, но и ответы от успешных.
  • Конфигурируемость: как говорится в определении EIP, для объединения ответов используется агрегатор. По умолчанию реализация Mule scatter-collect возвращает MuleMessageCollection, так что он соответствует маршрутизатору ye olde <all>, что упрощает миграцию для существующих пользователей и позволяет использовать улучшенную производительность. Тем не менее, вы можете заменить это своей собственной стратегией агрегации, но скоро доберетесь до этого…

Рассеяние в действии!

Чтобы упростить этот пример и проиллюстрировать повышение производительности за счет использования <scatter-collect> над <all>, я приведу старый пример, который я сделал более года назад в этом посте . В этом примере (который в основном рассматривает Google Connectors Suite и DataMapper ) мы берем электронную таблицу Google с контактной информацией о супергероях и используем ее для создания учетных записей Salesforce , контактов Google, встреч Google Calendar и задач Google . Подробности этого примера доступны в исходном сообщении, поэтому я просто сосредоточусь на той части, которая транслирует исходное сообщение (каждого супергероя, найденного в электронной таблице) на несколько маршрутов (salesforce и приложения Google). На уровне XML это, наконец, сводится к следующему:

<set-variable variableName="startTime" value="#[System.currentTimeMillis()]" doc:name="Variable" />
<all>
  <flow-ref name="to-salesforce" doc:name="to salesforce"/>
  <flow-ref name="to-google" doc:name="to google"/>
</all>
<set-payload value="Elapsed time: #[(System.currentTimeMillis() - flowVars['startTime']) /1000] seconds" />

При выполнении этого на моем локальном ПК полная интеграция занимает 9 секунд (это число может варьироваться в зависимости от вашего географического местоположения, пропускной способности и мощности процессора ноутбука). Теперь давайте посмотрим на тот же пример с новым маршрутизатором:

<set-variable variableName="startTime" value="#[System.currentTimeMillis()]" doc:name="Variable" />
<scatter-gather>
  <flow-ref name="to-salesforce" doc:name="to salesforce"/>
  <flow-ref name="to-google" doc:name="to google"/>
</scatter-gather>
<set-payload value="Elapsed time: #[(System.currentTimeMillis() - flowVars['startTime']) /1000] seconds" />

В этом случае время сократилось до 5 секунд: на 45% быстрее .

Custom AggregationStrategy

Давайте на секунду вернемся к примеру с самым дешевым рейсом. Предположим, что после того, как все маршруты ответили, вы хотите отфильтровать те, которые имели ошибки (если есть), а затем выбрать самый дешевый. Вы можете сделать это в Mule, используя потоки, но рассмотрите это более простое решение:

public class CheapeastFlightAggregationStrategy implements AggregationStrategy {

	@Override
	public MuleEvent aggregate(AggregationContext context) throws MuleException {
		MuleEvent result;
		long value = Long.MAX_VALUE;
		for (MuleEvent event : context.collectEventsWithoutExceptions()) {
			Flight flight = (Flight) event.getMessage().getPayload(); 
			if (flight.getCost() < value) {
				result = event;
				value = flight.getCost();
			}
		}
		
		return event;
	}
}

Это классно! Я могу легко настроить агрегацию событий ответа, не беспокоясь о самой сложности агрегации! Но как мне использовать этот класс? Проверьте это:

<scatter-gather timeout="5000">
    <custom-aggregation-strategy class="org.myproject.CheapestFlightAggregationStrategy" />
    
    <flow-ref name="flightBroker1" />
    <flow-ref name="flightBroker2" />
    <flow-ref name="flightBroker3" />
</scatter-gather>

Убери прочь

  • Новый маршрутизатор рассеяния-сбора обеспечит функциональность многоадресной рассылки более производительным способом, чем маршрутизатор <all>.
  • Он также более настраиваемый и обеспечивает лучшую обработку ошибок.
  • В большинстве сценариев вы должны предпочитать <scatter-collect>, а не <all>, за исключением тех случаев, когда сбой на одном из маршрутов должен остановить выполнение следующих действий. Есть сценарии, где такое поведение желательно, поэтому мы будем хранить <all> в качестве шаблона.
  • Scatter-Gather доступен начиная с версии 3.5 раннего доступа Mule

Спасибо за чтение и с нетерпением ждем ваших отзывов!

Нет похожих сообщений.