Статьи

Отправленные сервером события с RxJava и SseEmitter

Spring Framework 4.2 GA почти выпущен, давайте рассмотрим некоторые новые функции, которые он предоставляет. Тот, который привлек мое внимание, — это простой новый класс SseEmitter — абстракция над несколькими отправленными событиями, которые легко используются в контроллерах Spring MVC. SSE — это технология, которая позволяет вам передавать данные с сервера в браузер в рамках одного HTTP-соединения в одном направлении. Это похоже на подмножество того, что могут делать веб-сокеты Однако, поскольку это гораздо более простой протокол, его можно использовать, когда в полнодуплексном режиме нет необходимости, например, для изменения цены акций в режиме реального времени или для отображения прогресса в длительном процессе. Это будет нашим примером.

Представьте, что у нас есть виртуальный майнер со следующим API:

1
2
3
4
5
6
public interface CoinMiner {
  
    BigDecimal mine() {
        //...
    }
}

Каждый раз, когда мы вызываем mine() нам приходится ждать несколько секунд, и мы получаем около 1 монеты взамен (в среднем). Если мы хотим добыть несколько монет, мы должны вызывать этот метод несколько раз:

01
02
03
04
05
06
07
08
09
10
11
12
13
@RestController
public class MiningController {
  
    //...
  
    @RequestMapping("/mine/{count}")
    void mine(@PathVariable int count) {
        IntStream
                .range(0, count)
                .forEach(x -> coinMiner.mine());
    }
  
}

Это работает, мы можем запросить /mine/10 и метод mine() будет выполнен 10 раз. Все идет нормально. Но майнинг требует много ресурсов процессора, поэтому было бы полезно распределить вычисления по нескольким ядрам. Кроме того, даже при распараллеливании наша конечная точка API довольно медленная, и мы должны терпеливо ждать, пока вся работа не будет выполнена без каких-либо уведомлений о прогрессе. Давайте сначала исправим параллелизм — однако поскольку параллельные потоки не дают вам никакого контроля над базовым пулом потоков, давайте перейдем к явному ExecutorService :

01
02
03
04
05
06
07
08
09
10
@Component
class CoinMiner {
  
    CompletableFuture<BigDecimal> mineAsync(ExecutorService executorService) {
        return CompletableFuture.supplyAsync(this::mine, executorService);
    }
  
    //...
  
}

Код клиента должен явно предоставлять ExecutorService (только выбор дизайна):

1
2
3
4
5
6
7
8
@RequestMapping("/mine/{count}")
void mine(@PathVariable int count) {
    final List<CompletableFuture<BigDecimal>> futures = IntStream
            .range(0, count)
            .mapToObj(x -> coinMiner.mineAsync(executorService))
            .collect(toList());
    futures.forEach(CompletableFuture::join);
}

Безумно важно сначала вызвать mineAsync несколько раз, а затем (на втором этапе) дождаться завершения всех фьючерсов с помощью join . Соблазнительно написать:

1
2
3
4
IntStream
        .range(0, count)
        .mapToObj(x -> coinMiner.mineAsync(executorService))
        .forEach(CompletableFuture::join);

Однако из-за ленивого характера потоков в Java 8 эти задачи будут выполняться последовательно! Если вы еще не ленивые потоки, всегда читайте их снизу вверх: мы просим join какому-нибудь будущему, чтобы поток увеличивался и mineAsync() только один раз (ленивый!), mineAsync() его в join() . Когда этот join() завершается, он снова поднимается и запрашивает другое Future . Используя collect() мы принудительно mineAsync() все выполнения mineAsync() , начиная все асинхронные вычисления. Позже мы ждем каждого из них.

Представляем SseEmitter

Теперь пришло время быть более реактивным (там я это сказал). Контроллер может вернуть экземпляр SseEmitter . Как только мы return из метода-обработчика, поток контейнера освобождается и может обслуживать больше предстоящих запросов. Но соединение не закрыто, и клиент продолжает ждать! Нам нужно сохранить ссылку на экземпляр SseEmitter и SseEmitter вызвать его методы send() и complete из другого потока. Например, мы можем запустить длительный процесс и сохранить прогресс send() от произвольных потоков. Как только процесс завершен, мы complete() SseEmitter и, наконец, HTTP-соединение закрывается (по крайней мере, логически, помните о Keep-alive ). В приведенном ниже примере у нас есть несколько CompletableFuture s, и когда каждый из них завершается, мы просто отправляем 1 клиенту ( notifyProgress() ). Когда все будущее закончено, мы завершаем поток ( thenRun(sseEmitter::complete) ), закрывая соединение:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {
    final SseEmitter sseEmitter = new SseEmitter();
    final List<CompletableFuture<BigDecimal>> futures = mineAsync(count);
    futures.forEach(future ->
            future.thenRun(() -> notifyProgress(sseEmitter)));
  
    final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);
    CompletableFuture
            .allOf(futuresArr)
            .thenRun(sseEmitter::complete);
  
    return sseEmitter;
}
  
private void notifyProgress(SseEmitter sseEmitter) {
    try {
        sseEmitter.send(1);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
  
private List<CompletableFuture<BigDecimal>> mineAsync(@PathVariable int count) {
    return IntStream
            .range(0, count)
            .mapToObj(x -> coinMiner.mineAsync(executorService))
            .collect(toList());
}

Вызов этого метода приводит к следующему ответу (уведомление Content-Type ):

01
02
03
04
05
06
07
08
09
10
11
12
13
< HTTP/1.1 200 OK
< Content-Type: text/event-stream;charset=UTF-8
< Transfer-Encoding: chunked
<
data:1
  
data:1
  
data:1
  
data:1
  
* Connection #0 to host localhost left intact

Позже мы узнаем, как интерпретировать такой ответ на стороне клиента. Пока давайте немного проясним дизайн.

Представляем RxJava с заметным прогрессом

Код выше работает, но выглядит довольно грязно. На самом деле мы имеем последовательность событий, каждое из которых представляет прогресс вычислений. Вычисление наконец заканчивается, поэтому поток также должен сигнализировать об окончании. Звучит так же, как Observable ! Мы начнем с рефакторинга CoinMiner , чтобы вернуть Observable<BigDecimal :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {
    final ReplaySubject<BigDecimal> subject = ReplaySubject.create();
    final List<CompletableFuture<BigDecimal>> futures = IntStream
            .range(0, count)
            .mapToObj(x -> mineAsync(executorService))
            .collect(toList());
    futures
            .forEach(future ->
                    future.thenRun(() -> subject.onNext(BigDecimal.ONE)));
  
    final CompletableFuture[] futuresArr = futures.toArray(new CompletableFuture[futures.size()]);
    CompletableFuture
            .allOf(futuresArr)
            .thenRun(subject::onCompleted);
  
    return subject;
}

Каждый раз, когда в Observable возвращается событие, возвращаемое из mineMany() , мы просто добываем столько монет. Когда все фьючерсы сделаны, мы также завершаем поток. Это не выглядит намного лучше на стороне реализации, но посмотрите, насколько он чист с точки зрения контроллера:

01
02
03
04
05
06
07
08
09
10
11
12
@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {
    final SseEmitter sseEmitter = new SseEmitter();
    coinMiner
            .mineMany(count, executorService)
            .subscribe(
                    value -> notifyProgress(sseEmitter),
                    sseEmitter::completeWithError,
                    sseEmitter::complete
            );
    return sseEmitter;
}

После вызова coinMiner.mineMany() мы просто подписываемся на события. Оказывается, методы Observable и SseEmitter соответствуют 1: 1. То, что здесь происходит, довольно очевидно: начинайте асинхронные вычисления, и каждый раз, когда фоновые вычисления сигнализируют о каком-либо прогрессе, отправляйте их клиенту. Хорошо, давайте вернемся к реализации. Это выглядит грязно, потому что мы смешиваем CompletableFuture и Observable . Я уже описывал, как преобразовать CompletableFuture в Observable только с одним элементом . Вот резюме, включая абстракцию rx.Single найденную начиная с RxJava 1.0.13 (здесь не используется):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Futures {
  
    public static <T> Observable<T> toObservable(CompletableFuture<T> future) {
        return Observable.create(subscriber ->
                future.whenComplete((result, error) -> {
                    if (error != null) {
                        subscriber.onError(error);
                    } else {
                        subscriber.onNext(result);
                        subscriber.onCompleted();
                    }
                }));
    }
  
    public static <T> Single<T> toSingle(CompletableFuture<T> future) {
        return Single.create(subscriber ->
                future.whenComplete((result, error) -> {
                    if (error != null) {
                        subscriber.onError(error);
                    } else {
                        subscriber.onSuccess(result);
                    }
                }));
    }
  
}

Имея эти операторы утилит, мы можем улучшить реализацию и избежать смешивания двух API:

01
02
03
04
05
06
07
08
09
10
11
12
13
Observable<BigDecimal> mineMany(int count, ExecutorService executorService) {
    final List<Observable<BigDecimal>> observables = IntStream
            .range(0, count)
            .mapToObj(x -> mineAsync(executorService))
            .collect(toList());
    return Observable.merge(observables);
}
  
Observable<BigDecimal> mineAsync(ExecutorService executorService) {
    final CompletableFuture<BigDecimal> future =
         CompletableFuture.supplyAsync(this::mine, executorService);
    return Futures.toObservable(future);
}

RxJava имеет встроенный оператор для объединения нескольких Observable в один, не имеет значения, что каждый из наших базовых Observable генерирует только одно событие.

Глубокое погружение в операторы RxJava

Давайте использовать мощь RxJava, чтобы немного улучшить нашу потоковую передачу.

сканирование ()

В настоящее время каждый раз, когда мы добываем одну монету, мы send(1) событие клиенту. Это означает, что каждый клиент должен отслеживать, сколько монет он уже получил, чтобы рассчитать общую рассчитанную сумму. Было бы хорошо, если бы сервер всегда отправлял общую сумму, а не дельты. Однако мы не хотим менять реализацию. Оказывается, это довольно просто с оператором Observable.scan() :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
@RequestMapping("/mine/{count}")
SseEmitter mine(@PathVariable int count) {
    final SseEmitter sseEmitter = new SseEmitter();
    coinMiner
            .mineMany(count, executorService)
            .scan(BigDecimal::add)
            .subscribe(
                    value -> notifyProgress(sseEmitter, value),
                    sseEmitter::completeWithError,
                    sseEmitter::complete
            );
    return sseEmitter;
}
  
private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {
    try {
        sseEmitter.send(value);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Оператор scan() принимает предыдущее и текущее события, объединяя их вместе. Применяя BigDecimal::add мы просто добавляем все числа. Например, 1, 1 + 1, (1 + 1) + 1 и т. Д. scan() похожа на flatMap() , но сохраняет промежуточные значения.

Отбор проб с sample()

Может случиться так, что наша внутренняя служба выдает слишком много обновлений прогресса, чем мы можем использовать. Мы не хотим загружать клиента неуместными обновлениями и насыщать пропускную способность. Отправка обновления не чаще двух раз в секунду звучит разумно. К счастью, RxJava имеет встроенный оператор для этого:

1
2
3
4
5
6
7
Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs
        .scan(BigDecimal::add)
        .sample(500, TimeUnit.MILLISECONDS)
        .subscribe(
            //...
        );

sample() будет периодически просматривать основной поток и испускать только самый последний элемент, исключая промежуточные. К счастью, мы собираем элементы на лету с помощью scan() поэтому не теряем обновления.

window() — постоянные интервалы излучения

Хотя есть одна загвоздка. sample() не будет выдавать один и тот же элемент дважды, если в течение 500 миллисекунд не появилось ничего нового. Это нормально, но помните, что мы отправляем эти обновления через соединение TCP / IP. Рекомендуется периодически отправлять обновление клиенту, даже если за это время ничего не произошло — просто чтобы поддерживать соединение, что-то вроде ping . Вероятно, существует много способов выполнения этого требования, например, использование оператора timeout() . Я выбрал группировку всех событий каждые 500 мс с помощью оператора window() :

1
2
3
4
5
6
7
8
Observable<BigDecimal> obs = coinMiner.mineMany(count, executorService);
obs
        .window(500, TimeUnit.MILLISECONDS)
        .flatMap(window -> window.reduce(BigDecimal.ZERO, BigDecimal::add))
        .scan(BigDecimal::add)
        .subscribe(
            //...
        );

Этот хитрый. Сначала мы группируем все обновления прогресса в окнах 500 миллисекунд. Затем мы рассчитываем общее количество (аналогично scan() ) монет, добытых за этот период времени, используя reduce . Если в этот период не было добыто ни одной монеты, мы просто возвращаем ZERO . В конце мы используем scan() для агрегирования промежуточных итогов каждого окна. Нам больше не нужен sample() так как window() гарантирует, что событие отправляется каждые 500 мс.

Сторона клиента

Есть много примеров использования SSE в JavaScript, так что просто чтобы дать вам быстрое решение, позвонив нашему контроллеру:

1
2
3
4
var source = new EventSource("/mine/10");
source.onmessage = function (event) {
    console.info(event);
};

Я считаю, что SseEmitter — это серьезное усовершенствование Spring MVC, которое позволит нам SseEmitter более надежные и быстрые веб-приложения, требующие мгновенных однонаправленных обновлений.

Ссылка: Отправленные сервером события с RxJava и SseEmitter от нашего партнера по JCG Томаша Нуркевича в блоге Java и соседей