Представьте, что у вас есть поток, который публикует события с непредсказуемой частотой. Иногда вы можете ожидать десятки сообщений в секунду, но иногда никаких событий не видно в течение нескольких секунд. Это может быть проблемой, если ваш поток передается через веб-сокет, SSE или любой другой сетевой протокол. Тихий период, который занимает слишком много времени (остановка), может быть интерпретирован как проблема сети. Поэтому мы часто отправляем искусственные события ( пинги ) время от времени, чтобы убедиться:
- клиенты еще живы
- сообщите клиентам, что мы еще живы
В качестве более конкретного примера представьте, что у нас есть поток Flowable<String>
который генерирует некоторые события. Если в течение более одной секунды нет события, мы должны отправить сообщение-заполнитель "PING"
. Когда тишина еще длиннее, каждую секунду должно появляться сообщение "PING"
. Как мы можем реализовать такое требование в RxJava? Наиболее очевидное, но неверное решение — объединить оригинальный поток с пингами :
1
2
3
4
5
6
|
Flowable<String> events = //... Flowable<String> pings = Flowable .interval( 1 , SECONDS) .map(x -> "PING" ); Flowable<String> eventsWithPings = events.mergeWith(pings); |
mergeWith()
имеет решающее значение: он принимает подлинные events
и объединяет их с постоянным потоком пингов. Конечно, когда нет подлинных событий, появляются сообщения "PING"
. К сожалению, они совершенно не связаны с оригинальным потоком. Это означает, что мы продолжаем отправлять пинги, даже когда есть много нормальных событий. Более того, когда начинается тишина, мы не посылаем "PING"
точно через одну секунду. Если вы в порядке с таким механизмом, вы можете перестать читать здесь.
оператор debounce()
Более сложный подход требует обнаружения тишины, которая длится более 1 секунды. Для этого мы можем использовать оператор timeout()
. К сожалению, это приводит к TimeoutException
и отписывается от TimeoutException
— слишком агрессивное поведение. Мы просто хотим получить какое-то уведомление. Оказывается, для этого можно использовать оператор debounce()
. Обычно этот оператор откладывает выдачу новых событий на случай, если появятся новые события, переопределяя старые. Так что, если я скажу:
1
2
|
Flowable<String> events = //... Flowable<String> delayed = events.debounce( 1 , SECONDS); |
Это означает, что delayed
поток будет генерировать событие, только если за ним не последовало другое событие в течение 1 секунды. Технически delayed
может никогда ничего не излучать, если поток events
продолжает генерировать события достаточно быстро. Мы будем использовать delayed
поток для обнаружения тишины следующим образом:
1
2
3
4
|
Flowable<String> events = //... Flowable<String> delayed = events.debounce( 1 , SECONDS); Flowable<String> pings = delayed.map(ev -> "PING" ); Flowable<String> eventsWithPings = Flowable.merge(events, pings); |
Имейте в виду, что нет никакой разницы между mergeWith()
и его static
аналогом merge()
. Итак, мы куда-то добираемся. Если поток занят, delayed
поток никогда не получает никаких событий, поэтому сообщения "PING"
не отправляются. Однако когда исходный поток не отправляет какое-либо событие в течение более 1 секунды, delayed
получает последнее увиденное событие, игнорирует его и преобразует в "PING"
. Умный, но сломленный. Эта реализация отправляет только один "PING"
после обнаружения остановки, в отличие от отправки периодических пингов каждую секунду. Довольно легко исправить! Вместо того, чтобы преобразовывать последнее увиденное событие в один "PING"
мы можем преобразовать его в последовательность периодических пингов :
1
2
3
4
5
6
7
8
|
Flowable<String> events = //... Flowable<String> delayed = events.debounce( 1 , SECONDS); Flowable<String> pings = delayed .flatMap(x -> Flowable .interval( 0 , 1 , SECONDS) .map(e -> "PING" ) ); Flowable<String> eventsWithPings = Flowable.merge(events, pings); |
Вы можете увидеть, где недостаток? Каждый раз, когда в исходном потоке появляется тишина, мы начинаем пинговать каждую секунду. Однако мы должны прекратить это делать, когда появятся какие-то подлинные события. Мы не Каждый останов в восходящем потоке вызывает появление нового бесконечного потока пингов в конечном объединенном потоке. Мы должны как-то сообщить потоку pings
что он должен прекратить пинговать, потому что исходный поток испустил подлинное событие. Угадайте, что есть оператор takeUntil()
который делает именно это!
1
2
3
4
5
6
7
8
9
|
Flowable<String> events = //... Flowable<String> delayed = events.debounce( 1 , SECONDS); Flowable<String> pings = delayed .flatMap(x -> Flowable .interval( 0 , 1 , SECONDS) .map(e -> "PING" ) .takeUntil(events) ); Flowable<String> eventsWithPings = Flowable.merge(events, pings); |
Найдите минутку, чтобы полностью понять приведенный выше фрагмент кода. delayed
поток генерирует событие каждый раз, когда в исходном потоке ничего не происходит в течение более 1 секунды. Поток pings
генерирует последовательность событий "PING"
каждую секунду для каждого события, отправляемого с delayed
. Однако поток pings
прекращается в тот момент, когда в потоке событий появляется событие. Вы можете даже определить все это как одно выражение:
01
02
03
04
05
06
07
08
09
10
|
Flowable<String> events = //... Flowable<String> eventsWithPings = events .mergeWith( events .debounce( 1 , SECONDS) .flatMap(x1 -> Flowable .interval( 0 , 1 , SECONDS) .map(e -> "PING" ) .takeUntil(events) )); |
способность быть свидетелем в суде
Хорошо, мы написали все это, но как нам предположить протестировать этот тройной вложенный объектный код, управляемый событиями? Как мы можем убедиться, что пинги появляются в нужный момент и останавливаются, когда заканчивается тишина? Как смоделировать различные временные сценарии? В RxJava много убойных функций, но тестирование того, как проходит время, вероятно, самое большое. Прежде всего давайте сделаем наш код pinging немного более тестируемым и универсальным:
01
02
03
04
05
06
07
08
09
10
11
12
|
<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) { return events .mergeWith( events .debounce( 1 , SECONDS, clock) .flatMap(x1 -> Flowable .interval( 0 , 1 , SECONDS, clock) .map(e -> ping) .takeUntil(events) )); } |
Этот служебный метод принимает произвольный поток T
и добавляет пинги, если поток не генерирует никаких событий в течение более длительного периода времени. Мы используем это так в нашем тесте:
1
2
3
|
PublishProcessor<String> events = PublishProcessor.create(); TestScheduler clock = new TestScheduler(); Flowable<String> eventsWithPings = withPings(events, clock, "PING" ); |
О, мальчик, PublishProcessor
, TestScheduler
? PublishProcessor
— интересный класс, который является подтипом Flowable
(поэтому мы можем использовать его как обычный поток). С другой стороны, мы можем onNext()
события, используя метод onNext()
:
1
|
events.onNext( "A" ); |
Если кто-то прослушивает поток events
, он сразу же получит событие "A"
. А что с этими clock
? Каждый отдельный оператор в RxJava, который каким-либо образом обрабатывает время (например, debounce()
, interval()
, timeout()
, window()
) может принимать необязательный аргумент Scheduler
. Он служит внешним источником времени. Special TestScheduler
— это искусственный источник времени, который мы полностью контролируем. Т.е. время останавливается, пока мы не вызываем advanceTimeBy()
явно:
1
|
clock.advanceTimeBy( 999 , MILLISECONDS); |
999 миллисекунд это не совпадение. Пинги начинают появляться точно через 1 секунду, поэтому они не должны быть видны через 999 миллисекунд. Теперь пришло время раскрыть полный тестовый пример:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
@Test public void shouldAddPings() throws Exception { PublishProcessor<String> events = PublishProcessor.create(); final TestScheduler clock = new TestScheduler(); final Flowable<String> eventsWithPings = withPings(events, clock, "PING" ); final TestSubscriber<String> test = eventsWithPings.test(); events.onNext( "A" ); test.assertValues( "A" ); clock.advanceTimeBy( 999 , MILLISECONDS); events.onNext( "B" ); test.assertValues( "A" , "B" ); clock.advanceTimeBy( 999 , MILLISECONDS); test.assertValues( "A" , "B" ); clock.advanceTimeBy( 1 , MILLISECONDS); test.assertValues( "A" , "B" , "PING" ); clock.advanceTimeBy( 999 , MILLISECONDS); test.assertValues( "A" , "B" , "PING" ); events.onNext( "C" ); test.assertValues( "A" , "B" , "PING" , "C" ); clock.advanceTimeBy( 1000 , MILLISECONDS); test.assertValues( "A" , "B" , "PING" , "C" , "PING" ); clock.advanceTimeBy( 999 , MILLISECONDS); test.assertValues( "A" , "B" , "PING" , "C" , "PING" ); clock.advanceTimeBy( 1 , MILLISECONDS); test.assertValues( "A" , "B" , "PING" , "C" , "PING" , "PING" ); clock.advanceTimeBy( 999 , MILLISECONDS); test.assertValues( "A" , "B" , "PING" , "C" , "PING" , "PING" ); events.onNext( "D" ); test.assertValues( "A" , "B" , "PING" , "C" , "PING" , "PING" , "D" ); clock.advanceTimeBy( 999 , MILLISECONDS); events.onNext( "E" ); test.assertValues( "A" , "B" , "PING" , "C" , "PING" , "PING" , "D" , "E" ); clock.advanceTimeBy( 999 , MILLISECONDS); test.assertValues( "A" , "B" , "PING" , "C" , "PING" , "PING" , "D" , "E" ); clock.advanceTimeBy( 1 , MILLISECONDS); test.assertValues( "A" , "B" , "PING" , "C" , "PING" , "PING" , "D" , "E" , "PING" ); clock.advanceTimeBy(3_000, MILLISECONDS); test.assertValues( "A" , "B" , "PING" , "C" , "PING" , "PING" , "D" , "E" , "PING" , "PING" , "PING" , "PING" ); } |
Выглядит как стена текста, но на самом деле это полный сценарий тестирования нашей логики. Это гарантирует, что пинги появляются точно после 1000 миллисекунд, повторяются, когда молчание очень длинное и довольно подавленное, когда появляются подлинные события. Но самая важная часть: тест на 100% предсказуем и невероятно быстр. Нет ожидания , заняты ожидания, опрос, периодические сбои теста и медлительность. Искусственные часы, которые мы имеем под полным контролем, гарантируют, что все эти объединенные потоки работают точно так, как ожидалось.
Ссылка: | Обнаружение и тестирование остановленных потоков — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге, посвященном Java и соседству . |