Сложные механизмы обработки событий естественным образом подходят для управляемых событиями платформ, таких как Mule. Встроенная поддержка CEP доступна в Mule начиная с версии 3.2 с помощью модуля Drools . Модуль Esper теперь предлагает альтернативный способ использовать КЭП в ваших приложениях интеграции . Esper — это мощный, производительный, с открытым исходным кодом, сложный механизм обработки событий. Давайте посмотрим, как использовать Esper с Mule, а затем посмотрим, как он соотносится с поддержкой CEP в Drools .
Подписка на поток твитов
В следующем Gist показано приложение Mule, которое считывает обновления состояния с общедоступной временной шкалы Twitter и регистрирует экземпляры обновлений, которые содержат хэштег «#mule».
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:twitter="http://www.mulesoft.org/schema/mule/twitter" xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz" xmlns:esper="http://www.mulesoft.org/schema/mule/esper" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/3.2/mule-quartz.xsd http://www.mulesoft.org/schema/mule/bpm http://www.mulesoft.org/schema/mule/bpm/3.2/mule-bpm.xsd http://www.mulesoft.org/schema/mule/esper http://www.mulesoft.org/schema/mule/esper/1.0/mule-esper.xsd http://www.mulesoft.org/schema/mule/twitter http://www.mulesoft.org/schema/mule/twitter/2.3/mule-twitter.xsd "> <description> Esper Cloud Connector example using Twitter </description> <esper:config configuration="esper-config.xml"/> <twitter:config consumerKey="****************" consumerSecret="***************"/> <flow name="main"> <poll frequency="120000"> <twitter:get-public-timeline/> </poll> <twitter:get-public-timeline/> <collection-splitter/> <esper:send eventPayload-ref="#[payload:]"/> </flow> <flow name="Event Listener Flow"> <esper:listen statement="select count(hashtagEntities.where(p => p.text = 'mule')) as tagged from Tweets having count(hashtagEntities.where(p => p.text = 'mule')) > 0"/> <logger category="INFO"/> </flow> </mule>
Чтобы начать использовать модуль Esper, нам сначала нужно создать файл конфигурации, который определяет наши типы событий. В будущих версиях модуля это ограничение, скорее всего, будет снято, но сейчас вам нужно вручную определить конфигурацию Esper, например:
<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.espertech.com/schema/esper" xsi:schemaLocation=" http://www.espertech.com/schema/esper http://www.espertech.com/schema/esper/esper-configuration-2.0.xsd"> <event-type name="Tweets" class="twitter4j.StatusJSONImpl"/> </esper-configuration>
При этом регистрируется поток событий под названием «Tweets», события которого являются подходящим объектом домена в API twitter4j. Esper также поддерживает Карты и XML-документы в качестве типов событий.
«Основной» поток в приведенном выше приложении опрашивает Twitter каждые 2 минуты для выборки обновлений статуса (обратите внимание, что «реальная» реализация этого приложения, вероятно, подпишется на пожарный шланг.) Разделитель коллекций, который он использовал для разделения Списка обновления статуса в атомарные сообщения, которые вводятся в поток событий «Tweets» с использованием процессора сообщений <esper: send>.
Теперь, когда мы заполняем поток событий, мы можем использовать язык запросов Esper, EPL. EPL является SQL-esque, который упрощает запрос потоков событий. Источник сообщения <esper: listen> в вышеприведенном приложении принимает запрос EPL для подписки на события вне потока. В этом случае мы устанавливаем запрос, который будет генерировать составное событие, когда обновления статуса с хэштегом «#mule» соответствуют запросу. Эти события регистрируются обработчиком сообщений регистратора.
Сравнение с поддержкой Drools CEP
Давайте посмотрим, как Esper сравнивается с поддержкой CEP Друла. В этом Гисте показано определение правил Drools для примера CEP фондового брокера, который поставляется с последней версией Mule. В этом примере демонстрируется использование CEP для оповещения об изменении определенного запаса на определенный порог в заданном временном окне. Пример Drools, использующий DRL, довольно многословен, как мы можем видеть из Gist выше. Ниже показаны те же важные функции, реализованные в Esper:
<flow name="processStockTicks"> <composite-source> <inbound-endpoint ref="stockTick"/> <ajax:inbound-endpoint channel="/services/cepExample/thresholdChange"/> </composite-source> <all> <ajax:outbound-endpoint channel="/services/cepExample/stockTick"/> <esper:send eventPayload-ref="#[payload:]"/> </all> </flow> <flow name="sendAlerts"> <esper:listen statement=" select symbol,price,(Math.abs(first(price) - last(price)) / first(price)) * 100.0 as percentChange from StockTick.win:time(2 min) group by symbol having (Math.abs(first(price) - last(price)) / first(price)) * 100.0 > 7.0 "/> <vm:outbound-endpoint path="alerts.out"/> </flow>
Заявление EPL может быть еще более кратким, если мы внешне определили функцию средней разности. Если ваше приложение еще не использует Drools, вы можете рассмотреть возможность использования Esper в качестве альтернативы. Тот факт, что EPL подобен SQL, также делает его привлекательным вариантом, поскольку вам не нужно изучать DRL, чтобы начать работу с потоками событий.
Ограничение скорости трафика
Я нахожу, что CEP и, в частности, Esper, чрезвычайно полезны при разработке и поддержке интеграционных приложений. Ранее я писал об использовании Esper в этом контексте для проверки QoS на конечных точках. Вы также можете использовать его для ограничения трафика в потоке, как показано в следующем примере:
<flow name="Event Rate Limit Flow"> <vm:inbound-endpoint path="filtered.in"/> <esper:filter eventPayload-ref="#[payload:]" statement="select case when count(*) > 1000 then false else true end from TestEvent.win:time(5 min)" key="case when count(*)>1000 then false else true end"/> <vm:outbound-endpoint path="filtered.out"/> </flow>
В этом примере используется процессор <esper: filter> для остановки прохождения трафика, если данный оператор EPL оценивается как false. В этом примере мы останавливаем трафик, если за 5-минутное окно получено более 1000 сообщений.
Функциональных вариантов использования КЭП также предостаточно. Стандартный пример, который поставляется с Мулом, очевиден. Быстро меняющиеся данные, такие как цены на акции, цены на аукционах, местоположения меток RFID и т. Д., Поддаются модели обработки, управляемой событиями. Менее очевидный вариант использования — наблюдение за состоянием доменных объектов, когда они проходят через систему. Например, вы можете генерировать событие всякий раз, когда заказ отправляется для приложения интернет-магазина. Затем слушатель может подписаться на поток и сгенерировать составное событие, когда заказ находится в незаполненном состоянии в течение определенного порога времени.
Вывод
Обработка сложных событий часто естественным образом подходит для приложений, управляемых событиями, созданных с помощью Mule. В то время как Mule поставляется с поддержкой CEP через Drools, модуль Esper предоставляет альтернативу с более лаконичной конфигурацией.