Во время обучения и наставничества в RxJava, а также после написания книги я заметил, что некоторые области особенно проблематичны. Я решил опубликовать несколько коротких советов, которые касаются самых распространенных ошибок. Это первая часть.
Observable
s и Flowable
s по своей природе ленивы. Это означает, что независимо от того, какую тяжелую или длительную логику вы поместите в свой Flowable
, она будет оцениваться только тогда, когда кто-то подписывается. А также столько раз, сколько кто-то подписывается. Это иллюстрируется следующим фрагментом кода:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
private static String slow() throws InterruptedException { logger.info( "Running" ); TimeUnit.SECONDS.sleep( 1 ); return "abc" ; } //... Flowable flo = Flowable.fromCallable( this ::slow); logger.info( "Created" ); flo.subscribe(); flo.subscribe(); logger.info( "Done" ); |
Такой Observable
или Flowable
неизбежно напечатает:
1
2
3
4
|
19:37:57.368 [main] - Created 19:37:57.379 [main] - Running 19:37:58.383 [main] - Running 19:37:59.388 [main] - Done |
Обратите внимание, что вы платите цену за sleep()
дважды (двойная подписка). Более того, вся логика выполняется в клиентском ( main
) потоке, в RxJava неявная многопоточность не существует, если только она не запрошена с subscribeOn()
или неявно доступна с асинхронными потоками. Вопрос в следующем: можем ли мы принудительно запустить логику подписки, чтобы всякий раз, когда кто-то подписывается, поток уже был предварительно вычислен или, по крайней мере, началось вычисление?
Полностью жаждущий оценки
Наиболее очевидное, но ошибочное решение состоит в том, чтобы быстро вычислить все, что возвращает поток, и просто обернуть его фиксированным Flowable
:
1
2
3
4
|
Flowable<String> eager() { final String slow = slow(); return Flowable.just(slow); } |
К сожалению, это существенно противоречит цели RxJava. Прежде всего, такие операторы, как subscribeOn()
больше не работают, и становится невозможным перенести вычисления в другой поток. Еще хуже, даже если eager()
возвращает Flowable
он всегда по определению блокирует поток клиента. Труднее рассуждать, составлять и управлять такими потоками. Как правило, вы должны избегать такой схемы и предпочитать ленивую загрузку, даже когда необходима тщательная оценка.
Использование оператора cache()
Следующий пример делает это с помощью оператора cache()
:
1
2
3
4
5
6
7
8
|
Flowable<String> eager3() throws InterruptedException { final Flowable<String> cached = Flowable .fromCallable( this ::slow) .cache(); cached.subscribe(); return cached; } |
Идея проста: обернуть вычисления ленивым Flowable
и сделать его кэшированным. Оператор cache()
выполняет запоминание всех отправленных событий при первой подписке, поэтому при появлении второго Subscriber
он получает ту же кэшированную последовательность событий. Однако оператор cache()
(как и большинство других) является ленивым, поэтому мы должны принудительно подписаться в первый раз. Вызов subscribe()
предварительно заполнит кэш, более того, если второй подписчик появится до того, как вычисление slow()
завершится, он также будет ожидать его, а не запускать во второй раз.
Это решение работает, но имейте в виду, что subscribe()
будет фактически блокироваться, потому что не был задействован Scheduler
Если вы хотите предварительно заполнить свой Flowable
в фоновом режиме, попробуйте subscribeOn()
:
1
2
3
4
5
6
7
8
9
|
Flowable<String> eager3() throws InterruptedException { final Flowable<String> cached = Flowable .fromCallable( this ::slow) .subscribeOn(justDontAlwaysUse_Schedulers.io()) .cache(); cached.subscribe(); return cached; } |
Да, использование Schedulers.io()
проблематично и трудно поддерживать в производственных системах, поэтому, пожалуйста, избегайте его в пользу пользовательских пулов потоков.
Обработка ошибок
К сожалению, удивительно легко проглотить исключения в RxJava. Вот что может произойти в нашем последнем примере, если метод slow()
завершится неудачно. Исключение не полностью проглочено, но по умолчанию, если никто не заинтересован, его трассировка стека печатается на System.err
. Также необработанное исключение заключено в OnErrorNotImplementedException
. Не очень удобно и, скорее всего, потеряно, если вы ведете какую-либо форму централизованного ведения журнала. Вы можете использовать оператор doOnError()
для ведения журнала, но он по-прежнему передает исключение в нисходящем направлении, и RxJava также считает его необработанным, еще раз OnErrorNotImplementedException
. Итак, давайте реализуем onError
вызов onError
в subscribe()
:
01
02
03
04
05
06
07
08
09
10
|
Flowable<String> eager3() throws InterruptedException { final Flowable<String> cached = Flowable .fromCallable( this ::slow) .cache(); cached.subscribe( x -> { /* ignore */ }, e -> logger.error( "Prepopulation error" , e)); return cached; } |
Мы не хотим обрабатывать реальные события, просто ошибки в subscribe()
. На данный момент вы можете безопасно вернуть такой Flowable
. Это нетерпеливо, и есть вероятность, что когда бы вы ни подписались на него, данные уже будут доступны. Обратите внимание, что, например, метод observe()
из Hystrix также стремятся, в отличие от toObservable()
, который ленив. Выбор ваш.
Ссылка: | Активная подписка — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге Java и соседях . |