В части 1: пулы потоков мы разработали и внедрили относительно простую систему для обработки событий в режиме реального времени. Убедитесь, что вы прочитали предыдущую часть, так как она содержит некоторые классы, которые мы будем использовать повторно. На всякий случай вот требования:
Система доставляет около тысячи событий в секунду. Каждое Event
имеет как минимум два атрибута:
-
clientId
— мы ожидаем до нескольких событий в секунду для одного клиента -
UUID
— глобально уникальный
Потребление одного события занимает около 10 миллисекунд. Разработайте потребителя такого потока, который:
- позволяет обрабатывать события в режиме реального времени
- события, связанные с одним клиентом, должны обрабатываться последовательно и по порядку, т. е. нельзя распараллеливать события для одного и того же
clientId
- если дублированный
UUID
появился в течение 10 секунд, отбросьте его. Предположим, дубликаты не появятся через 10 секунд
То, что мы придумали до сих пор, было комбинацией пулов потоков и общего кэша. На этот раз мы реализуем решение с использованием RxJava. Прежде всего, я никогда не раскрывал, как реализован EventStream
, только предоставляя API:
1
2
3
4
5
|
interface EventStream { void consume(EventConsumer consumer); } |
Фактически для ручного тестирования я создал простой поток RxJava, который ведет себя как система из требований:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
@Slf4j class EventStream { void consume(EventConsumer consumer) { observe() .subscribe( consumer::consume, e -> log.error( "Error emitting event" , e) ); } Observable<Event> observe() { return Observable .interval( 1 , TimeUnit.MILLISECONDS) .delay(x -> Observable.timer(RandomUtils.nextInt( 0 , 1_000), TimeUnit.MICROSECONDS)) .map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())) .flatMap( this ::occasionallyDuplicate, 100 ) .observeOn(Schedulers.io()); } private Observable<Event> occasionallyDuplicate(Event x) { final Observable<Event> event = Observable.just(x); if (Math.random() >= 0.01 ) { return event; } final Observable<Event> duplicated = event.delay(RandomUtils.nextInt( 10 , 5_000), TimeUnit.MILLISECONDS); return event.concatWith(duplicated); } } |
Понимание того, как работает этот симулятор, не обязательно, но довольно интересно. Сначала мы генерируем постоянный поток значений Long
( 0
, 1
, 2
…) каждую миллисекунду (тысяча событий в секунду) с помощью оператора interval()
. Затем мы задерживаем каждое событие на случайное количество времени от 0
до 1_000
микросекунд с помощью оператора delay()
. Таким образом, события будут появляться в менее предсказуемые моменты времени, в немного более реалистичной ситуации. Наконец, мы отображаем (используя оператор ekhem, map()
) каждое значение Long
на случайное Event
с clientId
где-то между 1_000
и 1_100
(включительно-эксклюзивно).
Последнее немного интересно. Мы хотели бы имитировать случайные дубликаты. Для этого мы сопоставляем каждое событие (используя flatMap()
) с собой (в 99% случаев). Однако в 1% случаев мы возвращаем это событие дважды, когда второе вхождение происходит между 10 миллисекундами и 5 секундами позже. На практике дублированный экземпляр события появляется после сотен других событий, что делает поток действительно реалистичным.
Существует два способа взаимодействия с EventStream
— обратный вызов, основанный на consume()
и поток, основанный на observe()
. Мы можем воспользоваться Observable<Event>
для быстрого построения конвейера обработки, очень похожего по функциональности на часть 1, но гораздо более простого.
Отсутствует противодавление
Первый наивный подход к использованию RxJava очень быстро терпит неудачу:
01
02
03
04
05
06
07
08
09
10
|
EventStream es = new EventStream(); EventConsumer clientProjection = new ClientProjection( new ProjectionMetrics( new MetricRegistry())); es.observe() .subscribe( clientProjection::consume, e -> log.error( "Fatal error" , e) ); |
( ClientProjection
, ProjectionMetrics
и др. ClientProjection
из части 1 ). Мы получаем MissingBackpressureException
практически мгновенно, и это ожидалось. Помните, как наше первое решение запаздывало, обрабатывая события с все большей и большей задержкой? RxJava пытается избежать этого, а также избежать переполнения очередей. MissingBackpressureException
вызывается, потому что потребитель ( ClientProjection
) не способен обрабатывать события в режиме реального времени. Это безотказное поведение. Самое быстрое решение — перенести потребление в отдельный пул потоков, как и раньше, но с использованием средств RxJava:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
EventStream es = new EventStream(); EventConsumer clientProjection = new FailOnConcurrentModification( new ClientProjection( new ProjectionMetrics( new MetricRegistry()))); es.observe() .flatMap(e -> clientProjection.consume(e, Schedulers.io())) .window( 1 , TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c -> log.info( "Processed {} events/s" , c), e -> log.error( "Fatal error" , e) ); |
Интерфейс EventConsumer
имеет вспомогательный метод, который может асинхронно потреблять события в предоставленном Scheduler
:
01
02
03
04
05
06
07
08
09
10
11
|
@FunctionalInterface interface EventConsumer { Event consume(Event event); default Observable<Event> consume(Event event, Scheduler scheduler) { return Observable .fromCallable(() -> this .consume(event)) .subscribeOn(scheduler); } } |
Используя события с использованием flatMap()
в отдельном Scheduler.io()
каждое потребление вызывается асинхронно. На этот раз события обрабатываются практически в реальном времени, но есть большая проблема. Я украшал ClientProjection
с помощью FailOnConcurrentModification
по причине. События потребляются независимо друг от друга, поэтому может случиться, что два события для одного и того же clientId
обрабатываются одновременно. Нехорошо. К счастью, в RxJava решить эту проблему намного проще, чем с простыми потоками:
01
02
03
04
05
06
07
08
09
10
11
|
es.observe() .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume)) .window( 1 , TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c -> log.info( "Processed {} events/s" , c), e -> log.error( "Fatal error" , e) ); |
Немного изменился. Прежде всего мы группируем события по clientId
. Это разделяет один Observable
поток на поток потоков . Каждый подпоток с именем byClient
представляет все события, связанные с одним и тем же clientId
. Теперь, если мы отобразим этот подпоток, мы можем быть уверены, что события, относящиеся к одному и clientId
же clientId
, никогда не обрабатываются одновременно. Внешний поток ленивый, поэтому мы должны подписаться на него. Вместо того, чтобы подписываться на каждое событие отдельно, мы собираем события каждую секунду и подсчитываем их. Таким образом, мы получаем одно событие типа Integer
каждую секунду, представляющее количество событий, потребляемых в секунду.
Нечистое, не идиоматическое, подверженное ошибкам, небезопасное решение дедупликации с использованием глобального состояния
Теперь мы должны удалить дубликаты UUID
. Самый простой, но очень глупый способ отбросить дубликаты — воспользоваться преимуществами глобального состояния. Мы можем просто отфильтровывать дубликаты, просматривая их в кеше, доступном вне оператора filter()
:
01
02
03
04
05
06
07
08
09
10
11
|
final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder() .expireAfterWrite( 10 , TimeUnit.SECONDS) .build(); es.observe() .filter(e -> seenUuids.getIfPresent(e.getUuid()) == null ) .doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid())) .subscribe( clientProjection::consume, e -> log.error( "Fatal error" , e) ); |
Если вы хотите отслеживать использование этого механизма, просто добавьте метрику:
01
02
03
04
05
06
07
08
09
10
11
|
Meter duplicates = metricRegistry.meter( "duplicates" ); es.observe() .filter(e -> { if (seenUuids.getIfPresent(e.getUuid()) != null ) { duplicates.mark(); return false ; } else { return true ; } }) |
Доступ к глобальному, особенно изменчивому состоянию изнутри операторов очень опасен и подрывает единственные цели RxJava — упрощение параллелизма. Очевидно, что мы используем поточно-ориентированный Cache
из Guava, но во многих случаях легко пропустить места, где доступ к глобальному изменяемому состоянию доступен из нескольких потоков. Если вы обнаружите, что изменяете некоторую переменную за пределами цепочки операторов, будьте очень осторожны.
Пользовательский distinct()
оператор distinct()
в RxJava 1.x
RxJava 1.x имеет distinct()
оператор distinct()
который, вероятно, выполняет свою работу:
1
2
3
|
es.observe() .distinct(Event::getUuid) .groupBy(Event::getClientId) |
К сожалению, distinct()
хранит все ключи ( UUID
) внутри постоянно растущего HashSet
. Но мы заботимся только о дубликатах за последние 10 секунд! Вставив копию реализации DistinctOperator
я создал оператор DistinctEvent
который использует кеш Guava для хранения UUID только за последние 10 секунд. Я намеренно жестко запрограммировал Event
в этом операторе, вместо того, чтобы сделать его более универсальным, чтобы облегчить понимание кода:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
class DistinctEvent implements Observable.Operator<Event, Event> { private final Duration duration; DistinctEvent(Duration duration) { this .duration = duration; } @Override public Subscriber<? super Event> call(Subscriber<? super Event> child) { return new Subscriber<Event>(child) { final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder() .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS) .<UUID, Boolean>build().asMap(); @Override public void onNext(Event event) { if (keyMemory.put(event.getUuid(), true ) == null ) { child.onNext(event); } else { request( 1 ); } } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onCompleted() { child.onCompleted(); } }; } } |
Использование довольно простое, и вся реализация (плюс пользовательский оператор) так коротка, как:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
es.observe() .lift( new DistinctEvent(Duration.ofSeconds( 10 ))) .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) ) .window( 1 , TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c -> log.info( "Processed {} events/s" , c), e -> log.error( "Fatal error" , e) ); |
На самом деле это может быть еще короче, если вы пропускаете ведение журнала каждую секунду:
01
02
03
04
05
06
07
08
09
10
11
|
es.observe() .lift( new DistinctEvent(Duration.ofSeconds( 10 ))) .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) ) .subscribe( e -> {}, e -> log.error( "Fatal error" , e) ); |
Это решение намного короче предыдущего, основанного на пулах потоков и декораторах. Единственная неудобная часть — пользовательский оператор, который предотвращает утечку памяти при хранении слишком большого количества исторических UUID
. К счастью, RxJava 2 на помощь!
RxJava 2.x и более мощная встроенная функция Different distinct()
На самом деле я был так близок к тому, чтобы представить PR в RxJava с более мощной реализацией оператора Different distinct()
. Но до того, как я проверил ветку 2.x
там было: distinct()
которое позволяет предоставлять пользовательскую Collection
а не жестко закодированный HashSet
. Хотите верьте, хотите нет, инверсия зависимостей касается не только среды Spring или Java EE. Когда библиотека позволяет вам предоставлять пользовательскую реализацию своей внутренней структуры данных, это также DI. Сначала я создаю вспомогательный метод, который может создавать Set<UUID>
на основе Map<UUID, Boolean>
на основе Cache<UUID, Boolean>
. Мы уверены, что делегация!
1
2
3
4
5
6
7
8
|
private Set<UUID> recentUuids() { return Collections.newSetFromMap( CacheBuilder.newBuilder() .expireAfterWrite( 10 , TimeUnit.SECONDS) .<UUID, Boolean>build() .asMap() ); } |
Имея этот метод, мы можем реализовать всю задачу, используя это выражение:
01
02
03
04
05
06
07
08
09
10
11
|
es.observe() .distinct(Event::getUuid, this ::recentUuids) .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) ) .subscribe( e -> {}, e -> log.error( "Fatal error" , e) ); |
Элегантность, простота, ясность! Это читается почти как проблема:
- наблюдать за потоком событий
- учитывать только различные UUID
- групповые события по клиенту
- для каждого клиента их потребляют (последовательно)
Надеюсь, вам понравились все эти решения, и вы нашли их полезными в вашей повседневной работе.
Смотрите также:
- Небольшая потоковая обработка ката. Часть 1: пулы потоков
- Небольшая потоковая обработка ката. Часть 2: RxJava 1.x / 2.x
Ссылка: | Небольшая потоковая обработка ката. Часть 2: RxJava 1.x / 2.x от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей . |