Несколько месяцев назад мы начали переработку исполняемой модели Drools и сделали ее доступной для конечного пользователя с помощью API Java 8. Чтобы продемонстрировать гибкость этого подхода, я попытался интегрировать его с реактивным потоком и, в частности, использовать этот поток в качестве источника данных для Drools.
Чтобы показать, как это работает, я создал простой температурный сервер, который предоставляет RxJava Observable, который каждую секунду выдает температуру для данного города и останавливается через 5 секунд. Существует также второй фабричный метод, который позволяет объединить больше этих наблюдаемых, чтобы иметь один наблюдаемый, который излучает температуру для более чем одного города одновременно.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
public class TempServer { public static Observable<TempInfo> getFeed(String town) { return Observable.create(subscriber -> Observable.interval(1, TimeUnit.SECONDS) .subscribe(i -> { if (i > 5) subscriber.onCompleted(); try { subscriber.onNext(TempInfo.fetch(town)); } catch (Exception e) { subscriber.onError(e); } })); } public static Observable<TempInfo> getFeeds(String... towns) { return Observable.merge(Arrays.stream(towns) .map(TempServer::getFeed) .collect(toList())); }} |
где метод TempInfo.fetch просто возвращает случайную температуру от -20 до 50 градусов
|
1
2
3
4
5
6
7
8
|
public TempInfo(String town, int temp) { this.town = town; this.temp = temp;}public static TempInfo fetch(String town) { return new TempInfo(town, random.nextInt(70) - 20);} |
Используя улучшенную версию Java 8 DSL, представленную в предыдущей статье, я определил следующие 2 правила:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
Variable<TempInfo> temp = any( TempInfo.class );Variable<Person> person = any( Person.class );Rule r1 = rule("low temp") .view( subscribe(temp, "tempFeed"), expr(temp, t -> t.getTemp() < 0), input(person, "persons"), expr(person, temp, (p, t) -> p.getTown().equals(t.getTown())) ) .then(on(person, temp) .execute((p, t) -> System.out.println(p.getName() + " is freezing in " + p.getTown() + " - temp is " + t.getTemp())));Rule r2 = rule("high temp") .view( subscribe(temp, "tempFeed"), expr(temp, t -> t.getTemp() > 30), input(person, "persons"), expr(person, temp, (p, t) -> p.getTown().equals(t.getTown())) ) .then(on(person, temp) .execute((p, t) -> System.out.println(p.getName() + " is sweating in " + p.getTown() + " - temp is " + t.getTemp()))); |
Здесь я использую два разных типа источников данных: пассивный, который можно считать простым хранилищем фактов:
|
1
2
3
|
DataStore persons = storeOf(new Person("Mark", 37, "London"), new Person("Edson", 35, "Toronto"), new Person("Mario", 40, "Milano")); |
которые могут быть связаны с определенным Drools KieSession с
|
1
|
bindDataSource(ksession, "persons", persons); |
и реактивный, взятый из TempServer, реализованного выше
|
1
|
Observable<TempInfo> tempFeed = TempServer.getFeeds( "Milano", "London", "Toronto" ); |
который также может быть связан с той же KieSession аналогичным образом
|
1
|
bindRxObservable( ksession, "tempFeed", tempFeed ); |
Сделав это, вы можете запустить эти 2 правила и получить результат, подобный следующему:
|
01
02
03
04
05
06
07
08
09
10
|
Mark is freezing in London - temp is -9Edson is sweating in Toronto - temp is 42Mario is sweating in Milano - temp is 42Mario is sweating in Milano - temp is 49Mark is freezing in London - temp is -17Edson is sweating in Toronto - temp is 40Edson is sweating in Toronto - temp is 47Mario is freezing in Milano - temp is -14Mark is freezing in London - temp is -8Mark is freezing in London - temp is -17 |
- Полный тестовый пример для запуска этого примера доступен здесь .
| Ссылка: | Использование отражающего потока в качестве источника данных для Drools от нашего партнера по JCG Марио Фуско в блоге Drools & jBPM . |