Статьи

Комплексная обработка событий Twitter с помощью Esper и Drools

Сложные  механизмы обработки событий естественным образом подходят для управляемых событиями платформ, таких как Mule. Встроенная   поддержка CEP доступна в Mule начиная с версии 3.2 с помощью модуля Drools . Модуль Esper  теперь предлагает альтернативный способ использовать КЭП в ваших приложениях интеграцииEsper — это мощный, производительный, с открытым исходным кодом, сложный механизм обработки событий. Давайте посмотрим, как использовать Esper с Mule, а затем посмотрим, как он соотносится с поддержкой CEP в Drools .

Подписка на поток твитов

В следующем Gist показано приложение Mule, которое считывает обновления состояния с общедоступной временной шкалы Twitter и регистрирует экземпляры обновлений, которые содержат хэштег «#mule».

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:twitter="http://www.mulesoft.org/schema/mule/twitter"
      xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz"
      
      xmlns:esper="http://www.mulesoft.org/schema/mule/esper"
      xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/3.2/mule-quartz.xsd
http://www.mulesoft.org/schema/mule/bpm http://www.mulesoft.org/schema/mule/bpm/3.2/mule-bpm.xsd
http://www.mulesoft.org/schema/mule/esper http://www.mulesoft.org/schema/mule/esper/1.0/mule-esper.xsd
http://www.mulesoft.org/schema/mule/twitter http://www.mulesoft.org/schema/mule/twitter/2.3/mule-twitter.xsd
">

    <description>
        Esper Cloud Connector example using Twitter
    </description>

    <esper:config configuration="esper-config.xml"/>

    <twitter:config consumerKey="****************" consumerSecret="***************"/>

    <flow name="main">
        <poll frequency="120000">
            <twitter:get-public-timeline/>
        </poll>
        <twitter:get-public-timeline/>
        <collection-splitter/>
        <esper:send eventPayload-ref="#[payload:]"/>
    </flow>

    <flow name="Event Listener Flow">
        <esper:listen statement="select count(hashtagEntities.where(p => p.text = 'mule')) as tagged
from Tweets having count(hashtagEntities.where(p => p.text = 'mule')) > 0"/>
        <logger category="INFO"/>
    </flow>
</mule>

Чтобы начать использовать модуль Esper, нам сначала нужно создать файл конфигурации, который определяет наши типы событий. В будущих версиях модуля это ограничение, скорее всего, будет снято, но сейчас вам нужно вручную определить конфигурацию Esper, например:

<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                     xmlns="http://www.espertech.com/schema/esper"
                     xsi:schemaLocation="
http://www.espertech.com/schema/esper
http://www.espertech.com/schema/esper/esper-configuration-2.0.xsd">

    <event-type name="Tweets" class="twitter4j.StatusJSONImpl"/>

</esper-configuration>

При этом регистрируется поток событий под названием «Tweets», события которого являются подходящим объектом домена в API twitter4j. Esper также поддерживает Карты и XML-документы в качестве типов событий.

«Основной» поток в приведенном выше приложении опрашивает Twitter каждые 2 минуты для выборки обновлений статуса (обратите внимание, что «реальная» реализация этого приложения, вероятно, подпишется на пожарный шланг.) Разделитель коллекций, который он использовал для разделения Списка обновления статуса в атомарные сообщения, которые вводятся в поток событий «Tweets» с использованием процессора сообщений <esper: send>.

Теперь, когда мы заполняем поток событий, мы можем использовать язык запросов Esper, EPL. EPL является SQL-esque, который упрощает запрос потоков событий. Источник сообщения <esper: listen> в вышеприведенном приложении принимает запрос EPL для подписки на события вне потока. В этом случае мы устанавливаем запрос, который будет генерировать составное событие, когда обновления статуса с хэштегом «#mule» соответствуют запросу. Эти события регистрируются обработчиком сообщений регистратора.

Сравнение с поддержкой Drools CEP

Давайте посмотрим, как Esper сравнивается с поддержкой CEP Друла.  В этом Гисте показано определение правил Drools для примера CEP фондового брокера, который поставляется с последней версией Mule. В этом примере демонстрируется использование CEP для оповещения об изменении определенного запаса на определенный порог в заданном временном окне. Пример Drools, использующий DRL, довольно многословен, как мы можем видеть из Gist выше. Ниже показаны те же важные функции, реализованные в Esper:

<flow name="processStockTicks">
        <composite-source>
            <inbound-endpoint ref="stockTick"/>
            <ajax:inbound-endpoint channel="/services/cepExample/thresholdChange"/>
        </composite-source>
        <all>
            <ajax:outbound-endpoint channel="/services/cepExample/stockTick"/>
            <esper:send eventPayload-ref="#[payload:]"/>
        </all>
    </flow>

<flow name="sendAlerts">
        <esper:listen statement="
select symbol,price,(Math.abs(first(price) - last(price)) / first(price)) * 100.0 as percentChange
from StockTick.win:time(2 min) group by symbol
having (Math.abs(first(price) - last(price)) / first(price)) * 100.0 > 7.0 "/>
        <vm:outbound-endpoint path="alerts.out"/>
 </flow>

Заявление EPL может быть еще более кратким, если мы внешне определили функцию средней разности. Если ваше приложение еще не использует Drools, вы можете рассмотреть возможность использования Esper в качестве альтернативы. Тот факт, что EPL подобен SQL, также делает его привлекательным вариантом, поскольку вам не нужно изучать DRL, чтобы начать работу с потоками событий.

Ограничение скорости трафика

Я нахожу, что CEP и, в частности, Esper, чрезвычайно полезны при разработке и поддержке интеграционных приложений. Ранее я  писал об использовании Esper в этом контексте для проверки QoS на конечных точках. Вы также можете использовать его для ограничения трафика в потоке, как показано в следующем примере:

 <flow name="Event Rate Limit Flow">
        <vm:inbound-endpoint path="filtered.in"/>
        <esper:filter eventPayload-ref="#[payload:]"
                      statement="select case when count(*) > 1000 then false else true end from TestEvent.win:time(5 min)"
                      key="case when count(*)>1000 then false else true end"/>
        <vm:outbound-endpoint path="filtered.out"/>
    </flow>

В этом примере используется процессор <esper: filter> для остановки прохождения трафика, если данный оператор EPL оценивается как false. В этом примере мы останавливаем трафик, если за 5-минутное окно получено более 1000 сообщений.

Функциональных вариантов использования КЭП также предостаточно. Стандартный пример, который поставляется с Мулом, очевиден. Быстро меняющиеся данные, такие как цены на акции, цены на аукционах, местоположения меток RFID и т. Д., Поддаются модели обработки, управляемой событиями. Менее очевидный вариант использования — наблюдение за состоянием доменных объектов, когда они проходят через систему. Например, вы можете генерировать событие всякий раз, когда заказ отправляется для приложения интернет-магазина. Затем слушатель может подписаться на поток и сгенерировать составное событие, когда заказ находится в незаполненном состоянии в течение определенного порога времени.

Вывод

Обработка сложных событий часто естественным образом подходит для приложений, управляемых событиями, созданных с помощью Mule. В то время как Mule поставляется с поддержкой CEP через Drools, модуль Esper предоставляет альтернативу с более лаконичной конфигурацией.