У меня есть хороший опыт работы с библиотеками Netflix Rx-Java, и я уже писал в блоге об использовании Rx-Java и Java 8 CompletableFuture для решения разрозненных проблем. Здесь я хочу изучить применение того же шаблона с использованием библиотеки Spring Reactor Core .
tldr — Если вы знакомы с Netflix Rx-Java, вы уже хорошо знаете Spring Reactor Core, карту API, и я был рад увидеть, что команда Spring Reactor старательно использовала диаграммы Marble в своих API Javadoc.
Другим быстрым моментом является то, что rx.Observable отображается на Flux или Mono в зависимости от того, испускается ли много элементов или один или ничего не испускается.
Позвольте мне сразу перейти к примеру — у меня есть простая задача (смоделированная с использованием задержки), которая порождается несколько раз, мне нужно выполнить эти задачи одновременно, а затем собрать результаты, представленные следующим образом, используя rx. Наблюдаемый код:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Test public void testScatterGather() throws Exception { ExecutorService executors = Executors.newFixedThreadPool( 5 ); List<Observable<String>> obs = IntStream.range( 0 , 10 ) .boxed() .map(i -> generateTask(i, executors)).collect(Collectors.toList()); Observable<List<String>> merged = Observable.merge(obs).toList(); List<String> result = merged.toBlocking().first(); logger.info(result.toString()); } private Observable<String> generateTask( int i, ExecutorService executorService) { return Observable .<String>create(s -> { Util.delay( 2000 ); s.onNext( i + "-test" ); s.onCompleted(); }).subscribeOn(Schedulers.from(executorService)); } |
Обратите внимание, что я блокирую исключительно для теста. Теперь аналогичный код с использованием Spring Reactor Core приводит к следующему:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@Test public void testScatterGather() { ExecutorService executors = Executors.newFixedThreadPool( 5 ); List<Flux<String>> fluxList = IntStream.range( 0 , 10 ) .boxed() .map(i -> generateTask(executors, i)).collect(Collectors.toList()); Mono<List<String>> merged = Flux.merge(fluxList).toList(); List<String> list = merged.get(); logger.info(list.toString()); } public Flux<String> generateTask(ExecutorService executorService, int i) { return Flux.<String>create(s -> { Util.delay( 2000 ); s.onNext(i + "-test" ); s.onComplete(); }).subscribeOn(executorService); } |
Это более или менее отображает один на один. Небольшое отличие заключается в типе Mono , лично я чувствовал, что этот тип был хорошим введением в реактивную библиотеку, поскольку он очень четко показывает, испускается ли более 1 элемента по сравнению только с одним элементом, который я использовал в образец. Это все еще ранние исследования для меня, и я с нетерпением жду, чтобы узнать больше об этой превосходной библиотеке.
Ссылка: | Соберите информацию, используя Spring Reactor Core от нашего партнера по JCG Биджу Кунджуммен в блоге all and sundry. |