Несколько месяцев назад мы начали переработку исполняемой модели Drools и сделали ее доступной для конечного пользователя с помощью API Java 8. Чтобы продемонстрировать гибкость этого подхода, я попытался интегрировать его с реактивным потоком и, в частности, использовать этот поток в качестве источника данных для Drools.
Чтобы показать, как это работает, я создал простой температурный сервер, который предоставляет RxJava Observable, который каждую секунду выдает температуру для данного города и останавливается через 5 секунд. Существует также второй фабричный метод, который позволяет объединить больше этих наблюдаемых, чтобы иметь один наблюдаемый, который излучает температуру для более чем одного города одновременно.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
public class TempServer { public static Observable<TempInfo> getFeed(String town) { return Observable.create(subscriber -> Observable.interval( 1 , TimeUnit.SECONDS) .subscribe(i -> { if (i > 5 ) subscriber.onCompleted(); try { subscriber.onNext(TempInfo.fetch(town)); } catch (Exception e) { subscriber.onError(e); } })); } public static Observable<TempInfo> getFeeds(String... towns) { return Observable.merge(Arrays.stream(towns) .map(TempServer::getFeed) .collect(toList())); } } |
где метод TempInfo.fetch просто возвращает случайную температуру от -20 до 50 градусов
1
2
3
4
5
6
7
8
|
public TempInfo(String town, int temp) { this .town = town; this .temp = temp; } public static TempInfo fetch(String town) { return new TempInfo(town, random.nextInt( 70 ) - 20 ); } |
Используя улучшенную версию Java 8 DSL, представленную в предыдущей статье, я определил следующие 2 правила:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
Variable<TempInfo> temp = any( TempInfo. class ); Variable<Person> person = any( Person. class ); Rule r1 = rule( "low temp" ) .view( subscribe(temp, "tempFeed" ), expr(temp, t -> t.getTemp() < 0 ), input(person, "persons" ), expr(person, temp, (p, t) -> p.getTown().equals(t.getTown())) ) .then(on(person, temp) .execute((p, t) -> System.out.println(p.getName() + " is freezing in " + p.getTown() + " - temp is " + t.getTemp()))); Rule r2 = rule( "high temp" ) .view( subscribe(temp, "tempFeed" ), expr(temp, t -> t.getTemp() > 30 ), input(person, "persons" ), expr(person, temp, (p, t) -> p.getTown().equals(t.getTown())) ) .then(on(person, temp) .execute((p, t) -> System.out.println(p.getName() + " is sweating in " + p.getTown() + " - temp is " + t.getTemp()))); |
Здесь я использую два разных типа источников данных: пассивный, который можно считать простым хранилищем фактов:
1
2
3
|
DataStore persons = storeOf( new Person( "Mark" , 37 , "London" ), new Person( "Edson" , 35 , "Toronto" ), new Person( "Mario" , 40 , "Milano" )); |
которые могут быть связаны с определенным Drools KieSession с
1
|
bindDataSource(ksession, "persons" , persons); |
и реактивный, взятый из TempServer, реализованного выше
1
|
Observable<TempInfo> tempFeed = TempServer.getFeeds( "Milano" , "London" , "Toronto" ); |
который также может быть связан с той же KieSession аналогичным образом
1
|
bindRxObservable( ksession, "tempFeed" , tempFeed ); |
Сделав это, вы можете запустить эти 2 правила и получить результат, подобный следующему:
01
02
03
04
05
06
07
08
09
10
|
Mark is freezing in London - temp is - 9 Edson is sweating in Toronto - temp is 42 Mario is sweating in Milano - temp is 42 Mario is sweating in Milano - temp is 49 Mark is freezing in London - temp is - 17 Edson is sweating in Toronto - temp is 40 Edson is sweating in Toronto - temp is 47 Mario is freezing in Milano - temp is - 14 Mark is freezing in London - temp is - 8 Mark is freezing in London - temp is - 17 |
- Полный тестовый пример для запуска этого примера доступен здесь .
Ссылка: | Использование отражающего потока в качестве источника данных для Drools от нашего партнера по JCG Марио Фуско в блоге Drools & jBPM . |