Представьте, что у вас есть поток, который публикует события с непредсказуемой частотой. Иногда вы можете ожидать десятки сообщений в секунду, но иногда никаких событий не видно в течение нескольких секунд. Это может быть проблемой, если ваш поток передается через веб-сокет, SSE или любой другой сетевой протокол. Тихий период, который занимает слишком много времени (остановка), может быть интерпретирован как проблема сети. Поэтому мы часто отправляем искусственные события ( пинги ) время от времени, чтобы убедиться:
- клиенты еще живы
- сообщите клиентам, что мы еще живы
В качестве более конкретного примера представьте, что у нас есть поток Flowable<String> который генерирует некоторые события. Если в течение более одной секунды нет события, мы должны отправить сообщение-заполнитель "PING" . Когда тишина еще длиннее, каждую секунду должно появляться сообщение "PING" . Как мы можем реализовать такое требование в RxJava? Наиболее очевидное, но неверное решение — объединить оригинальный поток с пингами :
|
1
2
3
4
5
6
|
Flowable<String> events = //...Flowable<String> pings = Flowable .interval(1, SECONDS) .map(x -> "PING"); Flowable<String> eventsWithPings = events.mergeWith(pings); |
mergeWith() имеет решающее значение: он принимает подлинные events и объединяет их с постоянным потоком пингов. Конечно, когда нет подлинных событий, появляются сообщения "PING" . К сожалению, они совершенно не связаны с оригинальным потоком. Это означает, что мы продолжаем отправлять пинги, даже когда есть много нормальных событий. Более того, когда начинается тишина, мы не посылаем "PING" точно через одну секунду. Если вы в порядке с таким механизмом, вы можете перестать читать здесь.
оператор debounce()
Более сложный подход требует обнаружения тишины, которая длится более 1 секунды. Для этого мы можем использовать оператор timeout() . К сожалению, это приводит к TimeoutException и отписывается от TimeoutException — слишком агрессивное поведение. Мы просто хотим получить какое-то уведомление. Оказывается, для этого можно использовать оператор debounce() . Обычно этот оператор откладывает выдачу новых событий на случай, если появятся новые события, переопределяя старые. Так что, если я скажу:
|
1
2
|
Flowable<String> events = //...Flowable<String> delayed = events.debounce(1, SECONDS); |
Это означает, что delayed поток будет генерировать событие, только если за ним не последовало другое событие в течение 1 секунды. Технически delayed может никогда ничего не излучать, если поток events продолжает генерировать события достаточно быстро. Мы будем использовать delayed поток для обнаружения тишины следующим образом:
|
1
2
3
4
|
Flowable<String> events = //...Flowable<String> delayed = events.debounce(1, SECONDS);Flowable<String> pings = delayed.map(ev -> "PING");Flowable<String> eventsWithPings = Flowable.merge(events, pings); |
Имейте в виду, что нет никакой разницы между mergeWith() и его static аналогом merge() . Итак, мы куда-то добираемся. Если поток занят, delayed поток никогда не получает никаких событий, поэтому сообщения "PING" не отправляются. Однако когда исходный поток не отправляет какое-либо событие в течение более 1 секунды, delayed получает последнее увиденное событие, игнорирует его и преобразует в "PING" . Умный, но сломленный. Эта реализация отправляет только один "PING" после обнаружения остановки, в отличие от отправки периодических пингов каждую секунду. Довольно легко исправить! Вместо того, чтобы преобразовывать последнее увиденное событие в один "PING" мы можем преобразовать его в последовательность периодических пингов :
|
1
2
3
4
5
6
7
8
|
Flowable<String> events = //...Flowable<String> delayed = events.debounce(1, SECONDS);Flowable<String> pings = delayed .flatMap(x -> Flowable .interval(0, 1, SECONDS) .map(e -> "PING") );Flowable<String> eventsWithPings = Flowable.merge(events, pings); |
Вы можете увидеть, где недостаток? Каждый раз, когда в исходном потоке появляется тишина, мы начинаем пинговать каждую секунду. Однако мы должны прекратить это делать, когда появятся какие-то подлинные события. Мы не Каждый останов в восходящем потоке вызывает появление нового бесконечного потока пингов в конечном объединенном потоке. Мы должны как-то сообщить потоку pings что он должен прекратить пинговать, потому что исходный поток испустил подлинное событие. Угадайте, что есть оператор takeUntil() который делает именно это!
|
1
2
3
4
5
6
7
8
9
|
Flowable<String> events = //...Flowable<String> delayed = events.debounce(1, SECONDS);Flowable<String> pings = delayed .flatMap(x -> Flowable .interval(0, 1, SECONDS) .map(e -> "PING") .takeUntil(events) );Flowable<String> eventsWithPings = Flowable.merge(events, pings); |
Найдите минутку, чтобы полностью понять приведенный выше фрагмент кода. delayed поток генерирует событие каждый раз, когда в исходном потоке ничего не происходит в течение более 1 секунды. Поток pings генерирует последовательность событий "PING" каждую секунду для каждого события, отправляемого с delayed . Однако поток pings прекращается в тот момент, когда в потоке событий появляется событие. Вы можете даже определить все это как одно выражение:
|
01
02
03
04
05
06
07
08
09
10
|
Flowable<String> events = //...Flowable<String> eventsWithPings = events .mergeWith( events .debounce(1, SECONDS) .flatMap(x1 -> Flowable .interval(0, 1, SECONDS) .map(e -> "PING") .takeUntil(events) )); |
способность быть свидетелем в суде
Хорошо, мы написали все это, но как нам предположить протестировать этот тройной вложенный объектный код, управляемый событиями? Как мы можем убедиться, что пинги появляются в нужный момент и останавливаются, когда заканчивается тишина? Как смоделировать различные временные сценарии? В RxJava много убойных функций, но тестирование того, как проходит время, вероятно, самое большое. Прежде всего давайте сделаем наш код pinging немного более тестируемым и универсальным:
|
01
02
03
04
05
06
07
08
09
10
11
12
|
<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) { return events .mergeWith( events .debounce(1, SECONDS, clock) .flatMap(x1 -> Flowable .interval(0, 1, SECONDS, clock) .map(e -> ping) .takeUntil(events) )); } |
Этот служебный метод принимает произвольный поток T и добавляет пинги, если поток не генерирует никаких событий в течение более длительного периода времени. Мы используем это так в нашем тесте:
|
1
2
3
|
PublishProcessor<String> events = PublishProcessor.create();TestScheduler clock = new TestScheduler();Flowable<String> eventsWithPings = withPings(events, clock, "PING"); |
О, мальчик, PublishProcessor , TestScheduler ? PublishProcessor — интересный класс, который является подтипом Flowable (поэтому мы можем использовать его как обычный поток). С другой стороны, мы можем onNext() события, используя метод onNext() :
|
1
|
events.onNext("A"); |
Если кто-то прослушивает поток events , он сразу же получит событие "A" . А что с этими clock ? Каждый отдельный оператор в RxJava, который каким-либо образом обрабатывает время (например, debounce() , interval() , timeout() , window() ) может принимать необязательный аргумент Scheduler . Он служит внешним источником времени. Special TestScheduler — это искусственный источник времени, который мы полностью контролируем. Т.е. время останавливается, пока мы не вызываем advanceTimeBy() явно:
|
1
|
clock.advanceTimeBy(999, MILLISECONDS); |
999 миллисекунд это не совпадение. Пинги начинают появляться точно через 1 секунду, поэтому они не должны быть видны через 999 миллисекунд. Теперь пришло время раскрыть полный тестовый пример:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
@Testpublic void shouldAddPings() throws Exception { PublishProcessor<String> events = PublishProcessor.create(); final TestScheduler clock = new TestScheduler(); final Flowable<String> eventsWithPings = withPings(events, clock, "PING"); final TestSubscriber<String> test = eventsWithPings.test(); events.onNext("A"); test.assertValues("A"); clock.advanceTimeBy(999, MILLISECONDS); events.onNext("B"); test.assertValues("A", "B"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B"); clock.advanceTimeBy(1, MILLISECONDS); test.assertValues("A", "B", "PING"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B", "PING"); events.onNext("C"); test.assertValues("A", "B", "PING", "C"); clock.advanceTimeBy(1000, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING"); clock.advanceTimeBy(1, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING"); events.onNext("D"); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D"); clock.advanceTimeBy(999, MILLISECONDS); events.onNext("E"); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E"); clock.advanceTimeBy(999, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E"); clock.advanceTimeBy(1, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING"); clock.advanceTimeBy(3_000, MILLISECONDS); test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING");} |
Выглядит как стена текста, но на самом деле это полный сценарий тестирования нашей логики. Это гарантирует, что пинги появляются точно после 1000 миллисекунд, повторяются, когда молчание очень длинное и довольно подавленное, когда появляются подлинные события. Но самая важная часть: тест на 100% предсказуем и невероятно быстр. Нет ожидания , заняты ожидания, опрос, периодические сбои теста и медлительность. Искусственные часы, которые мы имеем под полным контролем, гарантируют, что все эти объединенные потоки работают точно так, как ожидалось.
| Ссылка: | Обнаружение и тестирование остановленных потоков — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге, посвященном Java и соседству . |