Статьи

Scatter-Gather с использованием Spring Reactor Core

У меня есть хороший опыт работы с библиотеками Netflix Rx-Java, и я уже писал в блоге об использовании Rx-Java и Java 8 CompletableFuture для решения разрозненных проблем. Здесь я хочу изучить применение того же шаблона с использованием библиотеки Spring Reactor Core .

tldr — Если вы знакомы с Netflix Rx-Java, вы уже хорошо знаете Spring Reactor Core, карту API, и я был рад увидеть, что команда Spring Reactor старательно использовала диаграммы Marble в своих API Javadoc.

Другим быстрым моментом является то, что rx.Observable отображается на Flux или Mono в зависимости от того, испускается ли много элементов или один или ничего не испускается.

Позвольте мне сразу перейти к примеру — у меня есть простая задача (смоделированная с использованием задержки), которая порождается несколько раз, мне нужно выполнить эти задачи одновременно, а затем собрать результаты, представленные следующим образом, используя rx. Наблюдаемый код:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void testScatterGather() throws Exception {
    ExecutorService executors = Executors.newFixedThreadPool(5);
 
    List<Observable<String>> obs =
            IntStream.range(0, 10)
                .boxed()
                .map(i -> generateTask(i, executors)).collect(Collectors.toList());
 
 
    Observable<List<String>> merged = Observable.merge(obs).toList();
    List<String> result = merged.toBlocking().first();
 
    logger.info(result.toString());
 
}
 
private Observable<String> generateTask(int i, ExecutorService executorService) {
    return Observable
            .<String>create(s -> {
                Util.delay(2000);
                s.onNext( i + "-test");
                s.onCompleted();
            }).subscribeOn(Schedulers.from(executorService));
}

Обратите внимание, что я блокирую исключительно для теста. Теперь аналогичный код с использованием Spring Reactor Core приводит к следующему:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void testScatterGather() {
    ExecutorService executors = Executors.newFixedThreadPool(5);
 
    List<Flux<String>> fluxList = IntStream.range(0, 10)
            .boxed()
            .map(i -> generateTask(executors, i)).collect(Collectors.toList());
 
    Mono<List<String>> merged = Flux.merge(fluxList).toList();
 
    List<String> list = merged.get();
 
    logger.info(list.toString());
 
 
}
 
public Flux<String> generateTask(ExecutorService executorService, int i) {
    return Flux.<String>create(s -> {
        Util.delay(2000);
        s.onNext(i + "-test");
        s.onComplete();
    }).subscribeOn(executorService);
}

Это более или менее отображает один на один. Небольшое отличие заключается в типе Mono , лично я чувствовал, что этот тип был хорошим введением в реактивную библиотеку, поскольку он очень четко показывает, испускается ли более 1 элемента по сравнению только с одним элементом, который я использовал в образец. Это все еще ранние исследования для меня, и я с нетерпением жду, чтобы узнать больше об этой превосходной библиотеке.

Ссылка: Соберите информацию, используя Spring Reactor Core от нашего партнера по JCG Биджу Кунджуммен в блоге all and sundry.