Статьи

Небольшая потоковая обработка ката.

В части 1: пулы потоков мы разработали и внедрили относительно простую систему для обработки событий в режиме реального времени. Убедитесь, что вы прочитали предыдущую часть, так как она содержит некоторые классы, которые мы будем использовать повторно. На всякий случай вот требования:

Система доставляет около тысячи событий в секунду. Каждое Event имеет как минимум два атрибута:

  • clientId — мы ожидаем до нескольких событий в секунду для одного клиента
  • UUID — глобально уникальный

Потребление одного события занимает около 10 миллисекунд. Разработайте потребителя такого потока, который:

  1. позволяет обрабатывать события в режиме реального времени
  2. события, связанные с одним клиентом, должны обрабатываться последовательно и по порядку, т. е. нельзя распараллеливать события для одного и того же clientId
  3. если дублированный UUID появился в течение 10 секунд, отбросьте его. Предположим, дубликаты не появятся через 10 секунд

То, что мы придумали до сих пор, было комбинацией пулов потоков и общего кэша. На этот раз мы реализуем решение с использованием RxJava. Прежде всего, я никогда не раскрывал, как реализован EventStream , только предоставляя API:

1
2
3
4
5
interface EventStream {
  
    void consume(EventConsumer consumer);
  
}

Фактически для ручного тестирования я создал простой поток RxJava, который ведет себя как система из требований:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
class EventStream {
  
    void consume(EventConsumer consumer) {
        observe()
            .subscribe(
                consumer::consume,
                e -> log.error("Error emitting event", e)
        );
    }
  
    Observable<Event> observe() {
        return Observable
                .interval(1, TimeUnit.MILLISECONDS)
                .delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS))
                .map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID()))
                .flatMap(this::occasionallyDuplicate, 100)
                .observeOn(Schedulers.io());
    }
  
    private Observable<Event> occasionallyDuplicate(Event x) {
        final Observable<Event> event = Observable.just(x);
        if (Math.random() >= 0.01) {
            return event;
        }
        final Observable<Event> duplicated =
                event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);
        return event.concatWith(duplicated);
    }
  
}

Понимание того, как работает этот симулятор, не обязательно, но довольно интересно. Сначала мы генерируем постоянный поток значений Long ( 0 , 1 , 2 …) каждую миллисекунду (тысяча событий в секунду) с помощью оператора interval() . Затем мы задерживаем каждое событие на случайное количество времени от 0 до 1_000 микросекунд с помощью оператора delay() . Таким образом, события будут появляться в менее предсказуемые моменты времени, в немного более реалистичной ситуации. Наконец, мы отображаем (используя оператор ekhem, map() ) каждое значение Long на случайное Event с clientId где-то между 1_000 и 1_100 (включительно-эксклюзивно).

Последнее немного интересно. Мы хотели бы имитировать случайные дубликаты. Для этого мы сопоставляем каждое событие (используя flatMap() ) с собой (в 99% случаев). Однако в 1% случаев мы возвращаем это событие дважды, когда второе вхождение происходит между 10 миллисекундами и 5 секундами позже. На практике дублированный экземпляр события появляется после сотен других событий, что делает поток действительно реалистичным.

Существует два способа взаимодействия с EventStream — обратный вызов, основанный на consume() и поток, основанный на observe() . Мы можем воспользоваться Observable<Event> для быстрого построения конвейера обработки, очень похожего по функциональности на часть 1, но гораздо более простого.

Отсутствует противодавление

Первый наивный подход к использованию RxJava очень быстро терпит неудачу:

01
02
03
04
05
06
07
08
09
10
EventStream es = new EventStream();
EventConsumer clientProjection = new ClientProjection(
        new ProjectionMetrics(
                new MetricRegistry()));
  
es.observe()
        .subscribe(
                clientProjection::consume,
                e -> log.error("Fatal error", e)
        );

( ClientProjection , ProjectionMetrics и др. ClientProjection из части 1 ). Мы получаем MissingBackpressureException практически мгновенно, и это ожидалось. Помните, как наше первое решение запаздывало, обрабатывая события с все большей и большей задержкой? RxJava пытается избежать этого, а также избежать переполнения очередей. MissingBackpressureException вызывается, потому что потребитель ( ClientProjection ) не способен обрабатывать события в режиме реального времени. Это безотказное поведение. Самое быстрое решение — перенести потребление в отдельный пул потоков, как и раньше, но с использованием средств RxJava:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
EventStream es = new EventStream();
EventConsumer clientProjection = new FailOnConcurrentModification(
        new ClientProjection(
                new ProjectionMetrics(
                        new MetricRegistry())));
  
es.observe()
        .flatMap(e -> clientProjection.consume(e, Schedulers.io()))
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -> log.info("Processed {} events/s", c),
                e -> log.error("Fatal error", e)
        );

Интерфейс EventConsumer имеет вспомогательный метод, который может асинхронно потреблять события в предоставленном Scheduler :

01
02
03
04
05
06
07
08
09
10
11
@FunctionalInterface
interface EventConsumer {
    Event consume(Event event);
  
    default Observable<Event> consume(Event event, Scheduler scheduler) {
        return Observable
                .fromCallable(() -> this.consume(event))
                .subscribeOn(scheduler);
    }
  
}

Используя события с использованием flatMap() в отдельном Scheduler.io() каждое потребление вызывается асинхронно. На этот раз события обрабатываются практически в реальном времени, но есть большая проблема. Я украшал ClientProjection с помощью FailOnConcurrentModification по причине. События потребляются независимо друг от друга, поэтому может случиться, что два события для одного и того же clientId обрабатываются одновременно. Нехорошо. К счастью, в RxJava решить эту проблему намного проще, чем с простыми потоками:

01
02
03
04
05
06
07
08
09
10
11
es.observe()
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume))
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -> log.info("Processed {} events/s", c),
                e -> log.error("Fatal error", e)
        );

Немного изменился. Прежде всего мы группируем события по clientId . Это разделяет один Observable поток на поток потоков . Каждый подпоток с именем byClient представляет все события, связанные с одним и тем же clientId . Теперь, если мы отобразим этот подпоток, мы можем быть уверены, что события, относящиеся к одному и clientId же clientId , никогда не обрабатываются одновременно. Внешний поток ленивый, поэтому мы должны подписаться на него. Вместо того, чтобы подписываться на каждое событие отдельно, мы собираем события каждую секунду и подсчитываем их. Таким образом, мы получаем одно событие типа Integer каждую секунду, представляющее количество событий, потребляемых в секунду.

Нечистое, не идиоматическое, подверженное ошибкам, небезопасное решение дедупликации с использованием глобального состояния

Теперь мы должны удалить дубликаты UUID . Самый простой, но очень глупый способ отбросить дубликаты — воспользоваться преимуществами глобального состояния. Мы можем просто отфильтровывать дубликаты, просматривая их в кеше, доступном вне оператора filter() :

01
02
03
04
05
06
07
08
09
10
11
final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder()
        .expireAfterWrite(10, TimeUnit.SECONDS)
        .build();
  
es.observe()
        .filter(e -> seenUuids.getIfPresent(e.getUuid()) == null)
        .doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid()))
        .subscribe(
                clientProjection::consume,
                e -> log.error("Fatal error", e)
        );

Если вы хотите отслеживать использование этого механизма, просто добавьте метрику:

01
02
03
04
05
06
07
08
09
10
11
Meter duplicates = metricRegistry.meter("duplicates");
  
es.observe()
        .filter(e -> {
            if (seenUuids.getIfPresent(e.getUuid()) != null) {
                duplicates.mark();
                return false;
            } else {
                return true;
            }
        })

Доступ к глобальному, особенно изменчивому состоянию изнутри операторов очень опасен и подрывает единственные цели RxJava — упрощение параллелизма. Очевидно, что мы используем поточно-ориентированный Cache из Guava, но во многих случаях легко пропустить места, где доступ к глобальному изменяемому состоянию доступен из нескольких потоков. Если вы обнаружите, что изменяете некоторую переменную за пределами цепочки операторов, будьте очень осторожны.

Пользовательский distinct() оператор distinct() в RxJava 1.x

RxJava 1.x имеет distinct() оператор distinct() который, вероятно, выполняет свою работу:

1
2
3
es.observe()
        .distinct(Event::getUuid)
        .groupBy(Event::getClientId)

К сожалению, distinct() хранит все ключи ( UUID ) внутри постоянно растущего HashSet . Но мы заботимся только о дубликатах за последние 10 секунд! Вставив копию реализации DistinctOperator я создал оператор DistinctEvent который использует кеш Guava для хранения UUID только за последние 10 секунд. Я намеренно жестко запрограммировал Event в этом операторе, вместо того, чтобы сделать его более универсальным, чтобы облегчить понимание кода:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class DistinctEvent implements Observable.Operator<Event, Event> {
    private final Duration duration;
      
    DistinctEvent(Duration duration) {
        this.duration = duration;
    }
  
    @Override
    public Subscriber<? super Event> call(Subscriber<? super Event> child) {
        return new Subscriber<Event>(child) {
            final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder()
                    .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS)
                    .<UUID, Boolean>build().asMap();
              
            @Override
            public void onNext(Event event) {
                if (keyMemory.put(event.getUuid(), true) == null) {
                    child.onNext(event);
                } else {
                    request(1);
                }
            }
              
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
              
            @Override
            public void onCompleted() {
                child.onCompleted();
            }
              
        };
    }
}

Использование довольно простое, и вся реализация (плюс пользовательский оператор) так коротка, как:

01
02
03
04
05
06
07
08
09
10
11
12
13
es.observe()
        .lift(new DistinctEvent(Duration.ofSeconds(10)))
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -> log.info("Processed {} events/s", c),
                e -> log.error("Fatal error", e)
        );

На самом деле это может быть еще короче, если вы пропускаете ведение журнала каждую секунду:

01
02
03
04
05
06
07
08
09
10
11
es.observe()
        .lift(new DistinctEvent(Duration.ofSeconds(10)))
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .subscribe(
                e -> {},
                e -> log.error("Fatal error", e)
        );

Это решение намного короче предыдущего, основанного на пулах потоков и декораторах. Единственная неудобная часть — пользовательский оператор, который предотвращает утечку памяти при хранении слишком большого количества исторических UUID . К счастью, RxJava 2 на помощь!

RxJava 2.x и более мощная встроенная функция Different distinct()

На самом деле я был так близок к тому, чтобы представить PR в RxJava с более мощной реализацией оператора Different distinct() . Но до того, как я проверил ветку 2.x там было: distinct() которое позволяет предоставлять пользовательскую Collection а не жестко закодированный HashSet . Хотите верьте, хотите нет, инверсия зависимостей касается не только среды Spring или Java EE. Когда библиотека позволяет вам предоставлять пользовательскую реализацию своей внутренней структуры данных, это также DI. Сначала я создаю вспомогательный метод, который может создавать Set<UUID> на основе Map<UUID, Boolean> на основе Cache<UUID, Boolean> . Мы уверены, что делегация!

1
2
3
4
5
6
7
8
private Set<UUID> recentUuids() {
    return Collections.newSetFromMap(
            CacheBuilder.newBuilder()
                    .expireAfterWrite(10, TimeUnit.SECONDS)
                    .<UUID, Boolean>build()
                    .asMap()
    );
}

Имея этот метод, мы можем реализовать всю задачу, используя это выражение:

01
02
03
04
05
06
07
08
09
10
11
es.observe()
        .distinct(Event::getUuid, this::recentUuids)
        .groupBy(Event::getClientId)
        .flatMap(byClient -> byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .subscribe(
                e -> {},
                e -> log.error("Fatal error", e)
        );

Элегантность, простота, ясность! Это читается почти как проблема:

  • наблюдать за потоком событий
  • учитывать только различные UUID
  • групповые события по клиенту
  • для каждого клиента их потребляют (последовательно)

Надеюсь, вам понравились все эти решения, и вы нашли их полезными в вашей повседневной работе.

Смотрите также: