Статьи

Обнаружение и тестирование остановленных потоков — RxJava FAQ

Представьте, что у вас есть поток, который публикует события с непредсказуемой частотой. Иногда вы можете ожидать десятки сообщений в секунду, но иногда никаких событий не видно в течение нескольких секунд. Это может быть проблемой, если ваш поток передается через веб-сокет, SSE или любой другой сетевой протокол. Тихий период, который занимает слишком много времени (остановка), может быть интерпретирован как проблема сети. Поэтому мы часто отправляем искусственные события ( пинги ) время от времени, чтобы убедиться:

  • клиенты еще живы
  • сообщите клиентам, что мы еще живы

В качестве более конкретного примера представьте, что у нас есть поток Flowable<String> который генерирует некоторые события. Если в течение более одной секунды нет события, мы должны отправить сообщение-заполнитель "PING" . Когда тишина еще длиннее, каждую секунду должно появляться сообщение "PING" . Как мы можем реализовать такое требование в RxJava? Наиболее очевидное, но неверное решение — объединить оригинальный поток с пингами :

1
2
3
4
5
6
Flowable<String> events = //...
Flowable<String> pings = Flowable
            .interval(1, SECONDS)
            .map(x -> "PING");
  
Flowable<String> eventsWithPings = events.mergeWith(pings);

mergeWith() имеет решающее значение: он принимает подлинные events и объединяет их с постоянным потоком пингов. Конечно, когда нет подлинных событий, появляются сообщения "PING" . К сожалению, они совершенно не связаны с оригинальным потоком. Это означает, что мы продолжаем отправлять пинги, даже когда есть много нормальных событий. Более того, когда начинается тишина, мы не посылаем "PING" точно через одну секунду. Если вы в порядке с таким механизмом, вы можете перестать читать здесь.

оператор debounce()

Более сложный подход требует обнаружения тишины, которая длится более 1 секунды. Для этого мы можем использовать оператор timeout() . К сожалению, это приводит к TimeoutException и отписывается от TimeoutException — слишком агрессивное поведение. Мы просто хотим получить какое-то уведомление. Оказывается, для этого можно использовать оператор debounce() . Обычно этот оператор откладывает выдачу новых событий на случай, если появятся новые события, переопределяя старые. Так что, если я скажу:

1
2
Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);

Это означает, что delayed поток будет генерировать событие, только если за ним не последовало другое событие в течение 1 секунды. Технически delayed может никогда ничего не излучать, если поток events продолжает генерировать события достаточно быстро. Мы будем использовать delayed поток для обнаружения тишины следующим образом:

1
2
3
4
Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed.map(ev -> "PING");
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

Имейте в виду, что нет никакой разницы между mergeWith() и его static аналогом merge() . Итак, мы куда-то добираемся. Если поток занят, delayed поток никогда не получает никаких событий, поэтому сообщения "PING" не отправляются. Однако когда исходный поток не отправляет какое-либо событие в течение более 1 секунды, delayed получает последнее увиденное событие, игнорирует его и преобразует в "PING" . Умный, но сломленный. Эта реализация отправляет только один "PING" после обнаружения остановки, в отличие от отправки периодических пингов каждую секунду. Довольно легко исправить! Вместо того, чтобы преобразовывать последнее увиденное событие в один "PING" мы можем преобразовать его в последовательность периодических пингов :

1
2
3
4
5
6
7
8
Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed
        .flatMap(x -> Flowable
                .interval(0, 1, SECONDS)
                .map(e -> "PING")
        );
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

Вы можете увидеть, где недостаток? Каждый раз, когда в исходном потоке появляется тишина, мы начинаем пинговать каждую секунду. Однако мы должны прекратить это делать, когда появятся какие-то подлинные события. Мы не Каждый останов в восходящем потоке вызывает появление нового бесконечного потока пингов в конечном объединенном потоке. Мы должны как-то сообщить потоку pings что он должен прекратить пинговать, потому что исходный поток испустил подлинное событие. Угадайте, что есть оператор takeUntil() который делает именно это!

1
2
3
4
5
6
7
8
9
Flowable<String> events = //...
Flowable<String> delayed = events.debounce(1, SECONDS);
Flowable<String> pings = delayed
        .flatMap(x -> Flowable
                .interval(0, 1, SECONDS)
                .map(e -> "PING")
                .takeUntil(events)
        );
Flowable<String> eventsWithPings = Flowable.merge(events, pings);

Найдите минутку, чтобы полностью понять приведенный выше фрагмент кода. delayed поток генерирует событие каждый раз, когда в исходном потоке ничего не происходит в течение более 1 секунды. Поток pings генерирует последовательность событий "PING" каждую секунду для каждого события, отправляемого с delayed . Однако поток pings прекращается в тот момент, когда в потоке событий появляется событие. Вы можете даже определить все это как одно выражение:

01
02
03
04
05
06
07
08
09
10
Flowable<String> events = //...
Flowable<String> eventsWithPings = events
        .mergeWith(
                events
                        .debounce(1, SECONDS)
                        .flatMap(x1 -> Flowable
                                .interval(0, 1, SECONDS)
                                .map(e -> "PING")
                                .takeUntil(events)
                        ));

способность быть свидетелем в суде

Хорошо, мы написали все это, но как нам предположить протестировать этот тройной вложенный объектный код, управляемый событиями? Как мы можем убедиться, что пинги появляются в нужный момент и останавливаются, когда заканчивается тишина? Как смоделировать различные временные сценарии? В RxJava много убойных функций, но тестирование того, как проходит время, вероятно, самое большое. Прежде всего давайте сделаем наш код pinging немного более тестируемым и универсальным:

01
02
03
04
05
06
07
08
09
10
11
12
<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {
    return events
            .mergeWith(
                    events
                            .debounce(1, SECONDS, clock)
                            .flatMap(x1 -> Flowable
                                    .interval(0, 1, SECONDS, clock)
                                    .map(e -> ping)
                                    .takeUntil(events)
                            ));
  
}

Этот служебный метод принимает произвольный поток T и добавляет пинги, если поток не генерирует никаких событий в течение более длительного периода времени. Мы используем это так в нашем тесте:

1
2
3
PublishProcessor<String> events = PublishProcessor.create();
TestScheduler clock = new TestScheduler();
Flowable<String> eventsWithPings = withPings(events, clock, "PING");

О, мальчик, PublishProcessor , TestScheduler ? PublishProcessor — интересный класс, который является подтипом Flowable (поэтому мы можем использовать его как обычный поток). С другой стороны, мы можем onNext() события, используя метод onNext() :

1
events.onNext("A");

Если кто-то прослушивает поток events , он сразу же получит событие "A" . А что с этими clock ? Каждый отдельный оператор в RxJava, который каким-либо образом обрабатывает время (например, debounce() , interval() , timeout() , window() ) может принимать необязательный аргумент Scheduler . Он служит внешним источником времени. Special TestScheduler — это искусственный источник времени, который мы полностью контролируем. Т.е. время останавливается, пока мы не вызываем advanceTimeBy() явно:

1
clock.advanceTimeBy(999, MILLISECONDS);

999 миллисекунд это не совпадение. Пинги начинают появляться точно через 1 секунду, поэтому они не должны быть видны через 999 миллисекунд. Теперь пришло время раскрыть полный тестовый пример:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Test
public void shouldAddPings() throws Exception {
    PublishProcessor<String> events = PublishProcessor.create();
    final TestScheduler clock = new TestScheduler();
    final Flowable<String> eventsWithPings = withPings(events, clock, "PING");
  
    final TestSubscriber<String> test = eventsWithPings.test();
    events.onNext("A");
    test.assertValues("A");
  
    clock.advanceTimeBy(999, MILLISECONDS);
    events.onNext("B");
    test.assertValues("A", "B");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B");
  
    clock.advanceTimeBy(1, MILLISECONDS);
    test.assertValues("A", "B", "PING");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING");
  
    events.onNext("C");
    test.assertValues("A", "B", "PING", "C");
  
    clock.advanceTimeBy(1000, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING");
  
    clock.advanceTimeBy(1, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING");
  
    events.onNext("D");
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");
  
    clock.advanceTimeBy(999, MILLISECONDS);
    events.onNext("E");
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");
    clock.advanceTimeBy(999, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");
  
    clock.advanceTimeBy(1, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");
  
    clock.advanceTimeBy(3_000, MILLISECONDS);
    test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING");
}

Выглядит как стена текста, но на самом деле это полный сценарий тестирования нашей логики. Это гарантирует, что пинги появляются точно после 1000 миллисекунд, повторяются, когда молчание очень длинное и довольно подавленное, когда появляются подлинные события. Но самая важная часть: тест на 100% предсказуем и невероятно быстр. Нет ожидания , заняты ожидания, опрос, периодические сбои теста и медлительность. Искусственные часы, которые мы имеем под полным контролем, гарантируют, что все эти объединенные потоки работают точно так, как ожидалось.