Статьи

Aleri — Комплексная обработка событий

Потоковая платформа Aleri от Sybase является одним из самых популярных продуктов в сегменте рынка CEP. Он используется в торговой платформе Sybase — выпуске RAP, который широко используется на рынках капитала для управления позициями в портфеле. Сегодня, в первой серии из нескольких частей, я хочу предоставить обзор платформы Aleri и предоставить несколько примеров кода, где это необходимо. Во второй части я представлю Aleri Studio, графический интерфейс на основе Eclipse, который упрощает задачу моделирования рабочего процесса CEP и мониторинга сервера Aleri через панель мониторинга.

В моем предыдущем посте, посвященном обработке сложных событий, я продемонстрировал использование Esper, программного обеспечения CEP с открытым исходным кодом и API Twitter4J для обработки потока твитов из Twitter. Тем не менее, продукт CEP намного больше, чем просто один поток данных. Один поток данных может быть легко обработан с помощью стандартных платформ асинхронного обмена сообщениями и не создает проблем с масштабируемостью или задержкой. Но когда речь идет о потреблении более одного потока данных в реальном времени и анализе его в реальном времени, и когда важна корреляция между потоками данных, ничто не сравнится с платформой CEP. Источники подачи потоковой платформы могут различаться по скорости, объему и сложности. Настоящий CEP корпоративного класса должен эффективно справляться с различными высокоскоростными данными в режиме реального времени, такими как биржевые тикеры и более медленные, но объемные автономные пакетные загрузки, с такой же легкостью. Помимо предоставления стандартных интерфейсов, CEP также должен предоставлять более простой язык программирования для запроса потоковых данных и генерирования непрерывного интеллекта с помощью таких функций, как сопоставление с образцом и запрос моментальных снимков.

Sybase Trading Platform — редакция RAP. URL обратной ссылки

Для простоты и высокого уровня CEP можно разбить на три основные части. Первый — это механизм для получения / использования исходных данных. Далее идет процесс исследования этих данных, выявления событий и закономерностей, а затем взаимодействия с целевыми системами путем предоставления им элементов, требующих действий. Активные события имеют разные формы и форматы в зависимости от приложения, для которого вы используете CEP. Элементом действия может быть — продажа позиции на основе рассчитанного риска в приложении для мониторинга рисков. указание потенциальных случаев мошенничества в приложениях по отмыванию денег или оповещение о катастрофическом событии в системе мониторинга путем считывания тысяч датчиков на химическом заводе. Существуют буквально тысячи сценариев, в которых ручная и автономная проверка данных просто невозможна. После того, как вы пройдете через следующий раздел, вы можете попробовать Aleri самостоятельно. Эта ссылка http://www.sybase.com/aleriform напрямую ведет на страницу загрузки Aleri. Пробная копия, действительная в течение 90 дней, находится в свободном доступе на официальном сайте Sybase. Хорошее количество документации, отличный учебник и пример кода на сайте должны помочь вам быстро приступить к работе.

Если вы уже являетесь пользователем какого-либо продукта CEP, я рекомендую вам сравнить Aleri с этим продуктом и поделиться им с сообществом или оставить комментарий в этом блоге. По некоторым оценкам, Tibco CEP является крупнейшим поставщиком CEP на рынке. Я не уверен, какую долю рынка занимает другой ведущий продукт StreamBase . Существует также вебинар, который вы можете прослушать на Youtube.com, который объясняет преимущества CEP в целом и некоторые ключевые особенности Streambase в частности. Для новичков это служит отличным введением в CEP и примером использования рынков капитала.

Приложение на Aleri CEP создается путем создания модели с использованием Studio (графический интерфейс) или с помощью Splash (язык) или с использованием языка моделирования Aleri (ML) — финального этапа перед его развертыванием.

Ниже приведен список ключевых функций Splash.

  • Типы данных — Поддерживает стандартные типы данных и XML. Также поддерживает ‘Typedef’ для пользовательских типов данных.
  • Контроль доступаконтроль доступа на детальном уровне, обеспечивающий доступ к потоку или модулям (содержащим множество потоков)
  • SQL — еще один способ построения модели. Создание модели студии Aleri может занять больше времени из-за ее визуальной парадигмы. Специалист по SQL должен уметь делать это намного быстрее, используя Aleri SQL, который очень похож на обычный SQL, который мы все знаем.
  • Объединения — поддерживаемые объединения: Внутренний, Левый, Правый и Полный
  • Выражения фильтра — включают Где, имея, Группа, имеющая
  • ML — Aleri SQL создает модель данных на языке моделирования Aleri (ML). Опытные пользователи ML могут использовать только ML (вместо Aleri Studio и Aleri SQL) для построения модели.
  • Язык сопоставления с образцом — включает такие конструкции, как «в пределах», чтобы указать интервал (скользящее окно), «из», чтобы указать поток данных, и интересное «fby», которое указывает последовательность (сопровождаемую)
  • Пользовательские функции — пользовательский интерфейс функций, предоставляемый в заставке, позволяет создавать функции в C ++ или Java и использовать их в выражении-заставке в модели.

Усовершенствованное сопоставление с образцом — возможности объяснены на примере здесь. — Следующие три сегмента кода и их пояснения взяты непосредственно из документации Sybase по Aleri.
В первом примере проверяется, отправляет ли брокер ордер на покупку той же акции, что и один из его прежних клиентов, затем вставляет ордер на покупку для клиента и затем продает эту акцию. Он создает событие «buyahead», когда эти действия происходят в этой последовательности.

01
02
03
04
05
06
07
08
09
10
11
within 5 minutes
from
BuyStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Buy1,
BuyStock[Symbol=sym; Shares=n2; Broker=b; Customer=c1] as Buy2,
SellStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Sell
on Buy1 fby Buy2 fby Sell
{
if ((b = c0) and (b != c1)) {
output [Symbol=sym; Shares=n1; Broker=b];
}
}

В этом примере проверяются три события, одно за другим, с использованием отношения fby. Поскольку одна и та же переменная sym используется в трех шаблонах, значения в трех событиях должны быть одинаковыми. Разные переменные могут иметь одно и то же значение, хотя (например, n1 и n2). Он выводит событие, если Broker иCustomer из событий Buy1 и Sell одинаковы, а Customer из события Buy2 отличается.

В следующем примере показаны логические операции с событиями. Правило описывает возможное состояние кражи, когда на полке было считывание товара (возможно, через RFID), после чего не было проверки данного товара, а затем считывание товара на сканере возле двери.

1
2
3
4
5
6
7
within 12 hours
from
ShelfReading[TagId=tag; ProductName=pname] as onShelf,
CounterReading[TagId=tag] as checkout,
ExitReading[TagId=tag; AreaId=area] as exit
on onShelf fby not(checkout) fby exit
output [TagId=t; ProductName=pname; AreaId=area];

В следующем примере показано, как вызвать предупреждение, если пользователь пытается безуспешно войти в учетную запись три раза в течение 5 минут.

1
2
3
4
5
6
7
from
LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login1,
LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login2,
LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login3,
LoginAttempt[IpAddress=ip; Account=acct; Result=1] as login4
on (login1 fby login2 fby login3) and not(login4)
output [Account=acct];

Люди, желающие проникнуть в компьютерные системы, часто сканируют несколько портов TCP / IP на наличие открытых и пытаются использовать уязвимости в программах, прослушивающих эти порты. Вот правило, которое проверяет, пытался ли один IP-адрес подключиться к трем портам, а также проверялось ли на них использование программы «sendmail».

1
2
3
4
5
6
7
8
within 30 minutes
from
Connect[Source=ip; Port=22] as c1,
Connect[Source=ip; Port=23] as c2,
Connect[Source=ip; Port=25] as c3
SendMail[Source=ip] as send
on (c1 and c2 and c3) fby send
output [Source=ip];

Aleri предоставляет множество интерфейсов для простой интеграции с исходной и целевой системами. Через эти интерфейсы / адаптеры платформа Aleri может взаимодействовать со стандартными реляционными базами данных, средами обмена сообщениями, такими как IBM MQ, сокетами и файлами файловой системы. Данные в различных форматах, таких как csv, FIX, рыночные данные Reuters, SOAP, http, SMTP, легко потребляются Aleri через стандартизированные интерфейсы.

Ниже приведены доступные методы интеграции Aleri с другими системами.

  • API Pub / Sub предоставляется в Java, C ++ и dot net — Стандартный механизм pub / sub
  • Интерфейс SQL с инструкциями SELECT, UPDATE, DELETE и INSERT, используемыми через соединение ODBC и JDBC.
  • Встроенные адаптеры для рыночных данных и FIX

В следующей части этой серии мы рассмотрим Aleri Studio, графический интерфейс, который помогает нам легко создавать приложения CEP.

Aleri, платформа обработки сложных событий от Sybase была рассмотрена на высоком уровне в моем последнем посте .

На этой неделе давайте рассмотрим Aleri Studio, пользовательский интерфейс для платформы Aleri и использование pub / sub api, одного из многих способов взаимодействия с платформой Aleri. Студия является неотъемлемой частью платформы и поставляется в комплекте с бесплатной пробной версией. Если вы еще этого не сделали, загрузите копию здесь . Довольно простой процесс установки продукта Aleri поможет вам начать работу за несколько минут.

Студия aleri — это авторская платформа для построения модели, которая определяет взаимодействия и последовательность между различными потоками данных. Он также может объединять несколько потоков в один или несколько потоков. С помощью этой студии, основанной на затмении, вы можете протестировать созданные вами модели, предоставив им тестовые данные и в реальном времени отслеживать активность внутри потоков. Давайте рассмотрим различные типы потоков, которые вы можете определить в Aleri, и их функциональность.

Исходный поток — только этот тип потока может обрабатывать входящие данные. Операции, которые могут быть выполнены входящими данными: вставка, обновление, удаление и удаление. Upsert, как следует из названия, обновляет данные, если ключ, определяющий строку, уже присутствует в потоке. Иначе, он вставляет запись в поток.

Совокупный поток — этот поток создает сводную запись для каждой группы, определенной конкретным атрибутом. Это обеспечивает функциональность, эквивалентную ‘group by’ в ANSI SQL.

Копировать поток — этот поток создается путем копирования другого потока, но с другим правилом хранения.

Поток вычислений — этот поток позволяет использовать функцию для каждой строки данных, чтобы получить новый вычисляемый элемент для каждой строки потока данных.

Расширить поток — этот поток получен из другого потока с помощью дополнительных выражений столбцов

Фильтр потока — вы можете определить условие фильтра для этого потока. Так же, как расширение и вычисление потоков, этот поток применяет условия фильтрации к другим потокам для получения нового потока.

Flex Stream — Значительная гибкость в обработке потоковых данных достигается с помощью пользовательских методов. Только этот поток позволяет вам писать свои собственные методы для удовлетворения особых потребностей.

Присоединиться к потоку — Создает новый поток, объединяя два или более потоков при некоторых условиях. Оба, Внутреннее и Внешнее объединения могут использоваться для объединения потоков.

Поток образца — правила сопоставления образца применяются с этим потоком

Объединенный поток — как следует из названия, он объединяет два или более потоков с одинаковой структурой данных строк. В отличие от потока соединения, этот поток включает в себя все данные из всех участвующих потоков.

Используя некоторые из этих потоков и пабы API Aeri, я продемонстрирую сегрегацию твиттера в прямом эфире в два разных потока. Твиттер в прямом эфире потребляется слушателем из библиотеки Twitter4j. Если вы просто хотите сначала попробовать библиотеку Twitter4j, следуйте моему предыдущему посту « Отслеживание настроений пользователей в Twitter ». Данные, полученные слушателем twitter4j, передаются в исходный поток в нашей модели с помощью API публикации от Aleri. В этом упражнении мы попытаемся выделить твиты в зависимости от их содержания. Основываясь на примере из моего предыдущего поста, мы разделим входящий поток на два потока в зависимости от содержимого. Один поток получит любые твиты, состоящие из «lol», а другой получит твиты со смайликом «:)» в тексте. Во-первых, давайте перечислим задачи, которые нам нужно выполнить, чтобы сделать это рабочим примером.

  1. Создайте модель с тремя потоками
  2. Проверьте модель без ошибок
  3. Создать статический файл данных
  4. Запустите сервер Aleri и вручную подайте файл статических данных в поток, чтобы подтвердить правильность работы модели.
  5. Написать код Java, чтобы использовать твиттер. Используйте API публикации для публикации твитов на платформе Aleri.
  6. Запустите демонстрацию и посмотрите данные в реальном времени, когда они проходят через различные потоки.

Это изображение является снимком Aleri Studio с тремя потоками: один слева с именем «твиты» является исходным потоком, а два справа с именами «lolFilter» и «smileyFilter» относятся к типу фильтра. Исходный поток принимает входящие данные, в то время как потоки фильтра получают отфильтрованные данные. Вот как я определил условия фильтра — например (tweets.text, ‘% lol%’). твиты — это имя потока, а текст — это поле в интересующем нас потоке.% lol% означает, выберите любые твиты, которые имеют в тексте строку ‘lol’. Каждый поток имеет только 2 поля — идентификатор и текст. Идентификатор и текст сопоставляются с идентификатором и текстовым сообщением, отправленным твиттером. Определив модель, вы можете проверить ее на наличие ошибок, нажав на флажок на ленте вверху. Ошибки, если таковые имеются, будут отображаться на панели в правом нижнем углу изображения. Как только ваша модель не содержит ошибок, самое время ее протестировать.

На следующем рисунке показан тестовый интерфейс студии. Попробуйте сначала запустить вашу модель со статическим файлом данных. Небольшой красный квадрат в верхней части указывает, что сервер Aleri в данный момент работает. В окне консоли в нижнем правом углу отображаются сообщения сервера, например, об успешных запусках, остановках и т. Д. На вкладке Run-test в левой панели вы выбираете файл статических данных для подачи в исходный поток. Панель справа показывает все текущие потоки и текущие данные, обработанные потоками.

На рисунке ниже показан формат файла данных, используемого для тестирования модели

1
2
3
4
5
tweets ALERI_OPS="i" id="1" text="324test 1234" ;
tweets ALERI_OPS="i" id="2" text="test 12345";
tweets ALERI_OPS="i" id="3" text="test 1234666" ;
tweets ALERI_OPS="i" id="4" text="test 1234888" ;
tweets ALERI_OPS="i" id="5" text="test 1234999" ;

Исходный код этого упражнения находится внизу.
Помните, что вам нужно иметь библиотеку twitter4j в пути сборки и запустить сервер Aleri перед запуском программы. Поскольку я не добавил никакого таймера в поток выполнения, единственный способ остановить выполнение — отменить его. Для краткости и сохранения короткой строки кода я удалил всю обработку исключений и ведение журнала. Код использует только издательскую часть pub / sub api Aleri. Я продемонстрирую использование дополнительной стороны API в моем следующем сообщении в блоге.

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.sybase.aleri;
 
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
 
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterException;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;
 
import com.aleri.pubsub.SpGatewayConstants;
import com.aleri.pubsub.SpObserver;
import com.aleri.pubsub.SpPlatform;
import com.aleri.pubsub.SpPlatformParms;
import com.aleri.pubsub.SpPlatformStatus;
import com.aleri.pubsub.SpPublication;
import com.aleri.pubsub.SpStream;
import com.aleri.pubsub.SpStreamDataRecord;
import com.aleri.pubsub.SpStreamDefinition;
import com.aleri.pubsub.SpSubscription;
import com.aleri.pubsub.SpSubscriptionCommon;
import com.aleri.pubsub.impl.SpFactory;
import com.aleri.pubsub.impl.SpUtils;
import com.aleri.pubsub.test.ClientSpObserver;
 
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Vector;
import java.util.TimeZone;
 
public class TwitterTest_2 {
 //make sure that Aleri server is running prior to running this program
 static {
  //creates the publishing platform
  createPlatform();
 
 }
 // Important objects from the publish API
 static SpStream stream;
 static SpPlatformStatus platformStatus;
 static SpPublication pub;
 
 public static void main(String[] args) throws TwitterException, IOException {
   
  TwitterTest_2 tt2 = new TwitterTest_2();
  ConfigurationBuilder cb = new ConfigurationBuilder();
  cb.setDebugEnabled(true);
   
  //use your twitter id and passcode
  cb.setUser("Your user name");
  cb.setPassword("Your Password");
 
  // creating the twitter4j listener
 
  Configuration cfg = cb.build();
  TwitterStream twitterStream = new TwitterStreamFactory(cfg)
    .getInstance();
  StatusListener_1 listener;
  listener = new StatusListener_1();
  twitterStream.addListener(listener);
   
  //runs the sample that comes with twitter4j
  twitterStream.sample();
 
 }
 
 private static int createPlatform() {
  int rc = 0;
  //Aleri platform configuration - better alternative is to your properties file
  String host = "localhost";
  int port = 22000;
   
  //aleri configured to run with empty userid and pwd strings
  String user = "";
  String password = "";
   
  //name of the source stream - the one that gets the data from the twitter4j
  String streamName = "tweets";
   
  String name = "TwitterTest_2";
   
  SpPlatformParms parms = SpFactory.createPlatformParms(host, port, user,
    password, false, false);
  platformStatus = SpFactory.createPlatformStatus();
  SpPlatform sp = SpFactory.createPlatform(parms, platformStatus);
  stream = sp.getStream(streamName);
  pub = sp.createPublication(name, platformStatus);
   
  // Then get the stream definition containing the schema information.
  SpStreamDefinition sdef = stream.getDefinition();
/*
  int numFieldsInRecord = sdef.getNumColumns();
  Vector colTypes = sdef.getColumnTypes();
  Vector colNames = sdef.getColumnNames();
 
*/
  return 0;
 }
 
 static SpStream getStream() {
  return stream;
 }
 
 static SpPlatformStatus getPlatformStatus() {
  return platformStatus;
 }
 
 static SpPublication getPublication() {
  return pub;
 }
 
 static int publish(SpStream stream, SpPlatformStatus platformStatus,
   SpPublication pub, Collection fieldData) {
   
  int rc = 0;
  int i = pub.start();
 
  SpStreamDataRecord sdr = SpFactory.createStreamDataRecord(stream,
    fieldData, SpGatewayConstants.SO_UPSERT,
    SpGatewayConstants.SF_NULLFLAG, platformStatus);
 
  Collection dataSet = new Vector();
  dataSet.add(sdr);
  System.out
    .println("\nAttempting to publish the data set to the Platform for stream <"
      + stream.getName() + ">.");
 
  rc = pub.publishTransaction(dataSet, SpGatewayConstants.SO_UPSERT,
    SpGatewayConstants.SF_NULLFLAG, 1);
 
  // commit blocks the thread until data is consumed by the platform
  System.out.println("before commit() call to the Platform.");
  rc = pub.commit();
 
 
  return 0;
 }
 
}

Ссылка: Aleri — Обработка сложных событий — Часть I , Понимание Aleri — Обработка сложных событий — Часть II от нашего партнера JCG Махеша Гаджила в простом, но практичном блоге.