В части 1: пулы потоков мы разработали и внедрили относительно простую систему для обработки событий в режиме реального времени. Убедитесь, что вы прочитали предыдущую часть, так как она содержит некоторые классы, которые мы будем использовать повторно. На всякий случай вот требования:
Система доставляет около тысячи событий в секунду. Каждое Event имеет как минимум два атрибута:
-
clientId— мы ожидаем до нескольких событий в секунду для одного клиента -
UUID— глобально уникальный
Потребление одного события занимает около 10 миллисекунд. Разработайте потребителя такого потока, который:
- позволяет обрабатывать события в режиме реального времени
- события, связанные с одним клиентом, должны обрабатываться последовательно и по порядку, т. е. нельзя распараллеливать события для одного и того же
clientId - если дублированный
UUIDпоявился в течение 10 секунд, отбросьте его. Предположим, дубликаты не появятся через 10 секунд
То, что мы придумали до сих пор, было комбинацией пулов потоков и общего кэша. На этот раз мы реализуем решение с использованием RxJava. Прежде всего, я никогда не раскрывал, как реализован EventStream , только предоставляя API:
|
1
2
3
4
5
|
interface EventStream { void consume(EventConsumer consumer); } |
Фактически для ручного тестирования я создал простой поток RxJava, который ведет себя как система из требований:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
@Slf4jclass EventStream { void consume(EventConsumer consumer) { observe() .subscribe( consumer::consume, e -> log.error("Error emitting event", e) ); } Observable<Event> observe() { return Observable .interval(1, TimeUnit.MILLISECONDS) .delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS)) .map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())) .flatMap(this::occasionallyDuplicate, 100) .observeOn(Schedulers.io()); } private Observable<Event> occasionallyDuplicate(Event x) { final Observable<Event> event = Observable.just(x); if (Math.random() >= 0.01) { return event; } final Observable<Event> duplicated = event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS); return event.concatWith(duplicated); } } |
Понимание того, как работает этот симулятор, не обязательно, но довольно интересно. Сначала мы генерируем постоянный поток значений Long ( 0 , 1 , 2 …) каждую миллисекунду (тысяча событий в секунду) с помощью оператора interval() . Затем мы задерживаем каждое событие на случайное количество времени от 0 до 1_000 микросекунд с помощью оператора delay() . Таким образом, события будут появляться в менее предсказуемые моменты времени, в немного более реалистичной ситуации. Наконец, мы отображаем (используя оператор ekhem, map() ) каждое значение Long на случайное Event с clientId где-то между 1_000 и 1_100 (включительно-эксклюзивно).
Последнее немного интересно. Мы хотели бы имитировать случайные дубликаты. Для этого мы сопоставляем каждое событие (используя flatMap() ) с собой (в 99% случаев). Однако в 1% случаев мы возвращаем это событие дважды, когда второе вхождение происходит между 10 миллисекундами и 5 секундами позже. На практике дублированный экземпляр события появляется после сотен других событий, что делает поток действительно реалистичным.
Существует два способа взаимодействия с EventStream — обратный вызов, основанный на consume() и поток, основанный на observe() . Мы можем воспользоваться Observable<Event> для быстрого построения конвейера обработки, очень похожего по функциональности на часть 1, но гораздо более простого.
Отсутствует противодавление
Первый наивный подход к использованию RxJava очень быстро терпит неудачу:
|
01
02
03
04
05
06
07
08
09
10
|
EventStream es = new EventStream();EventConsumer clientProjection = new ClientProjection( new ProjectionMetrics( new MetricRegistry())); es.observe() .subscribe( clientProjection::consume, e -> log.error("Fatal error", e) ); |
( ClientProjection , ProjectionMetrics и др. ClientProjection из части 1 ). Мы получаем MissingBackpressureException практически мгновенно, и это ожидалось. Помните, как наше первое решение запаздывало, обрабатывая события с все большей и большей задержкой? RxJava пытается избежать этого, а также избежать переполнения очередей. MissingBackpressureException вызывается, потому что потребитель ( ClientProjection ) не способен обрабатывать события в режиме реального времени. Это безотказное поведение. Самое быстрое решение — перенести потребление в отдельный пул потоков, как и раньше, но с использованием средств RxJava:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
EventStream es = new EventStream();EventConsumer clientProjection = new FailOnConcurrentModification( new ClientProjection( new ProjectionMetrics( new MetricRegistry()))); es.observe() .flatMap(e -> clientProjection.consume(e, Schedulers.io())) .window(1, TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c -> log.info("Processed {} events/s", c), e -> log.error("Fatal error", e) ); |
Интерфейс EventConsumer имеет вспомогательный метод, который может асинхронно потреблять события в предоставленном Scheduler :
|
01
02
03
04
05
06
07
08
09
10
11
|
@FunctionalInterfaceinterface EventConsumer { Event consume(Event event); default Observable<Event> consume(Event event, Scheduler scheduler) { return Observable .fromCallable(() -> this.consume(event)) .subscribeOn(scheduler); } } |
Используя события с использованием flatMap() в отдельном Scheduler.io() каждое потребление вызывается асинхронно. На этот раз события обрабатываются практически в реальном времени, но есть большая проблема. Я украшал ClientProjection с помощью FailOnConcurrentModification по причине. События потребляются независимо друг от друга, поэтому может случиться, что два события для одного и того же clientId обрабатываются одновременно. Нехорошо. К счастью, в RxJava решить эту проблему намного проще, чем с простыми потоками:
|
01
02
03
04
05
06
07
08
09
10
11
|
es.observe() .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume)) .window(1, TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c -> log.info("Processed {} events/s", c), e -> log.error("Fatal error", e) ); |
Немного изменился. Прежде всего мы группируем события по clientId . Это разделяет один Observable поток на поток потоков . Каждый подпоток с именем byClient представляет все события, связанные с одним и тем же clientId . Теперь, если мы отобразим этот подпоток, мы можем быть уверены, что события, относящиеся к одному и clientId же clientId , никогда не обрабатываются одновременно. Внешний поток ленивый, поэтому мы должны подписаться на него. Вместо того, чтобы подписываться на каждое событие отдельно, мы собираем события каждую секунду и подсчитываем их. Таким образом, мы получаем одно событие типа Integer каждую секунду, представляющее количество событий, потребляемых в секунду.
Нечистое, не идиоматическое, подверженное ошибкам, небезопасное решение дедупликации с использованием глобального состояния
Теперь мы должны удалить дубликаты UUID . Самый простой, но очень глупый способ отбросить дубликаты — воспользоваться преимуществами глобального состояния. Мы можем просто отфильтровывать дубликаты, просматривая их в кеше, доступном вне оператора filter() :
|
01
02
03
04
05
06
07
08
09
10
11
|
final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder() .expireAfterWrite(10, TimeUnit.SECONDS) .build(); es.observe() .filter(e -> seenUuids.getIfPresent(e.getUuid()) == null) .doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid())) .subscribe( clientProjection::consume, e -> log.error("Fatal error", e) ); |
Если вы хотите отслеживать использование этого механизма, просто добавьте метрику:
|
01
02
03
04
05
06
07
08
09
10
11
|
Meter duplicates = metricRegistry.meter("duplicates"); es.observe() .filter(e -> { if (seenUuids.getIfPresent(e.getUuid()) != null) { duplicates.mark(); return false; } else { return true; } }) |
Доступ к глобальному, особенно изменчивому состоянию изнутри операторов очень опасен и подрывает единственные цели RxJava — упрощение параллелизма. Очевидно, что мы используем поточно-ориентированный Cache из Guava, но во многих случаях легко пропустить места, где доступ к глобальному изменяемому состоянию доступен из нескольких потоков. Если вы обнаружите, что изменяете некоторую переменную за пределами цепочки операторов, будьте очень осторожны.
Пользовательский distinct() оператор distinct() в RxJava 1.x
RxJava 1.x имеет distinct() оператор distinct() который, вероятно, выполняет свою работу:
|
1
2
3
|
es.observe() .distinct(Event::getUuid) .groupBy(Event::getClientId) |
К сожалению, distinct() хранит все ключи ( UUID ) внутри постоянно растущего HashSet . Но мы заботимся только о дубликатах за последние 10 секунд! Вставив копию реализации DistinctOperator я создал оператор DistinctEvent который использует кеш Guava для хранения UUID только за последние 10 секунд. Я намеренно жестко запрограммировал Event в этом операторе, вместо того, чтобы сделать его более универсальным, чтобы облегчить понимание кода:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
class DistinctEvent implements Observable.Operator<Event, Event> { private final Duration duration; DistinctEvent(Duration duration) { this.duration = duration; } @Override public Subscriber<? super Event> call(Subscriber<? super Event> child) { return new Subscriber<Event>(child) { final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder() .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS) .<UUID, Boolean>build().asMap(); @Override public void onNext(Event event) { if (keyMemory.put(event.getUuid(), true) == null) { child.onNext(event); } else { request(1); } } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onCompleted() { child.onCompleted(); } }; }} |
Использование довольно простое, и вся реализация (плюс пользовательский оператор) так коротка, как:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
es.observe() .lift(new DistinctEvent(Duration.ofSeconds(10))) .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) ) .window(1, TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c -> log.info("Processed {} events/s", c), e -> log.error("Fatal error", e) ); |
На самом деле это может быть еще короче, если вы пропускаете ведение журнала каждую секунду:
|
01
02
03
04
05
06
07
08
09
10
11
|
es.observe() .lift(new DistinctEvent(Duration.ofSeconds(10))) .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) ) .subscribe( e -> {}, e -> log.error("Fatal error", e) ); |
Это решение намного короче предыдущего, основанного на пулах потоков и декораторах. Единственная неудобная часть — пользовательский оператор, который предотвращает утечку памяти при хранении слишком большого количества исторических UUID . К счастью, RxJava 2 на помощь!
RxJava 2.x и более мощная встроенная функция Different distinct()
На самом деле я был так близок к тому, чтобы представить PR в RxJava с более мощной реализацией оператора Different distinct() . Но до того, как я проверил ветку 2.x там было: distinct() которое позволяет предоставлять пользовательскую Collection а не жестко закодированный HashSet . Хотите верьте, хотите нет, инверсия зависимостей касается не только среды Spring или Java EE. Когда библиотека позволяет вам предоставлять пользовательскую реализацию своей внутренней структуры данных, это также DI. Сначала я создаю вспомогательный метод, который может создавать Set<UUID> на основе Map<UUID, Boolean> на основе Cache<UUID, Boolean> . Мы уверены, что делегация!
|
1
2
3
4
5
6
7
8
|
private Set<UUID> recentUuids() { return Collections.newSetFromMap( CacheBuilder.newBuilder() .expireAfterWrite(10, TimeUnit.SECONDS) .<UUID, Boolean>build() .asMap() );} |
Имея этот метод, мы можем реализовать всю задачу, используя это выражение:
|
01
02
03
04
05
06
07
08
09
10
11
|
es.observe() .distinct(Event::getUuid, this::recentUuids) .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) ) .subscribe( e -> {}, e -> log.error("Fatal error", e) ); |
Элегантность, простота, ясность! Это читается почти как проблема:
- наблюдать за потоком событий
- учитывать только различные UUID
- групповые события по клиенту
- для каждого клиента их потребляют (последовательно)
Надеюсь, вам понравились все эти решения, и вы нашли их полезными в вашей повседневной работе.
Смотрите также:
- Небольшая потоковая обработка ката. Часть 1: пулы потоков
- Небольшая потоковая обработка ката. Часть 2: RxJava 1.x / 2.x
| Ссылка: | Небольшая потоковая обработка ката. Часть 2: RxJava 1.x / 2.x от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей . |