Во время обучения и наставничества в RxJava, а также после написания книги я заметил, что некоторые области особенно проблематичны. Я решил опубликовать несколько коротких советов, которые касаются самых распространенных ошибок. Это первая часть.
Observable s и Flowable s по своей природе ленивы. Это означает, что независимо от того, какую тяжелую или длительную логику вы поместите в свой Flowable , она будет оцениваться только тогда, когда кто-то подписывается. А также столько раз, сколько кто-то подписывается. Это иллюстрируется следующим фрагментом кода:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
private static String slow() throws InterruptedException { logger.info("Running"); TimeUnit.SECONDS.sleep(1); return "abc";} //... Flowable flo = Flowable.fromCallable(this::slow);logger.info("Created");flo.subscribe();flo.subscribe();logger.info("Done"); |
Такой Observable или Flowable неизбежно напечатает:
|
1
2
3
4
|
19:37:57.368 [main] - Created19:37:57.379 [main] - Running19:37:58.383 [main] - Running19:37:59.388 [main] - Done |
Обратите внимание, что вы платите цену за sleep() дважды (двойная подписка). Более того, вся логика выполняется в клиентском ( main ) потоке, в RxJava неявная многопоточность не существует, если только она не запрошена с subscribeOn() или неявно доступна с асинхронными потоками. Вопрос в следующем: можем ли мы принудительно запустить логику подписки, чтобы всякий раз, когда кто-то подписывается, поток уже был предварительно вычислен или, по крайней мере, началось вычисление?
Полностью жаждущий оценки
Наиболее очевидное, но ошибочное решение состоит в том, чтобы быстро вычислить все, что возвращает поток, и просто обернуть его фиксированным Flowable :
|
1
2
3
4
|
Flowable<String> eager() { final String slow = slow(); return Flowable.just(slow);} |
К сожалению, это существенно противоречит цели RxJava. Прежде всего, такие операторы, как subscribeOn() больше не работают, и становится невозможным перенести вычисления в другой поток. Еще хуже, даже если eager() возвращает Flowable он всегда по определению блокирует поток клиента. Труднее рассуждать, составлять и управлять такими потоками. Как правило, вы должны избегать такой схемы и предпочитать ленивую загрузку, даже когда необходима тщательная оценка.
Использование оператора cache()
Следующий пример делает это с помощью оператора cache() :
|
1
2
3
4
5
6
7
8
|
Flowable<String> eager3() throws InterruptedException { final Flowable<String> cached = Flowable .fromCallable(this::slow) .cache(); cached.subscribe(); return cached;} |
Идея проста: обернуть вычисления ленивым Flowable и сделать его кэшированным. Оператор cache() выполняет запоминание всех отправленных событий при первой подписке, поэтому при появлении второго Subscriber он получает ту же кэшированную последовательность событий. Однако оператор cache() (как и большинство других) является ленивым, поэтому мы должны принудительно подписаться в первый раз. Вызов subscribe() предварительно заполнит кэш, более того, если второй подписчик появится до того, как вычисление slow() завершится, он также будет ожидать его, а не запускать во второй раз.
Это решение работает, но имейте в виду, что subscribe() будет фактически блокироваться, потому что не был задействован Scheduler Если вы хотите предварительно заполнить свой Flowable в фоновом режиме, попробуйте subscribeOn() :
|
1
2
3
4
5
6
7
8
9
|
Flowable<String> eager3() throws InterruptedException { final Flowable<String> cached = Flowable .fromCallable(this::slow) .subscribeOn(justDontAlwaysUse_Schedulers.io()) .cache(); cached.subscribe(); return cached;} |
Да, использование Schedulers.io() проблематично и трудно поддерживать в производственных системах, поэтому, пожалуйста, избегайте его в пользу пользовательских пулов потоков.
Обработка ошибок
К сожалению, удивительно легко проглотить исключения в RxJava. Вот что может произойти в нашем последнем примере, если метод slow() завершится неудачно. Исключение не полностью проглочено, но по умолчанию, если никто не заинтересован, его трассировка стека печатается на System.err . Также необработанное исключение заключено в OnErrorNotImplementedException . Не очень удобно и, скорее всего, потеряно, если вы ведете какую-либо форму централизованного ведения журнала. Вы можете использовать оператор doOnError() для ведения журнала, но он по-прежнему передает исключение в нисходящем направлении, и RxJava также считает его необработанным, еще раз OnErrorNotImplementedException . Итак, давайте реализуем onError вызов onError в subscribe() :
|
01
02
03
04
05
06
07
08
09
10
|
Flowable<String> eager3() throws InterruptedException { final Flowable<String> cached = Flowable .fromCallable(this::slow) .cache(); cached.subscribe( x -> {/* ignore */}, e -> logger.error("Prepopulation error", e)); return cached;} |
Мы не хотим обрабатывать реальные события, просто ошибки в subscribe() . На данный момент вы можете безопасно вернуть такой Flowable . Это нетерпеливо, и есть вероятность, что когда бы вы ни подписались на него, данные уже будут доступны. Обратите внимание, что, например, метод observe() из Hystrix также стремятся, в отличие от toObservable() , который ленив. Выбор ваш.
| Ссылка: | Активная подписка — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге Java и соседях . |