Статьи

Стремительная подписка — RxJava FAQ

Во время обучения и наставничества в RxJava, а также после написания книги я заметил, что некоторые области особенно проблематичны. Я решил опубликовать несколько коротких советов, которые касаются самых распространенных ошибок. Это первая часть.
Observable s и Flowable s по своей природе ленивы. Это означает, что независимо от того, какую тяжелую или длительную логику вы поместите в свой Flowable , она будет оцениваться только тогда, когда кто-то подписывается. А также столько раз, сколько кто-то подписывается. Это иллюстрируется следующим фрагментом кода:

01
02
03
04
05
06
07
08
09
10
11
12
13
private static String slow() throws InterruptedException {
    logger.info("Running");
    TimeUnit.SECONDS.sleep(1);
    return "abc";
}
  
//...
  
Flowable flo = Flowable.fromCallable(this::slow);
logger.info("Created");
flo.subscribe();
flo.subscribe();
logger.info("Done");

Такой Observable или Flowable неизбежно напечатает:

1
2
3
4
19:37:57.368 [main] - Created
19:37:57.379 [main] - Running
19:37:58.383 [main] - Running
19:37:59.388 [main] - Done

Обратите внимание, что вы платите цену за sleep() дважды (двойная подписка). Более того, вся логика выполняется в клиентском ( main ) потоке, в RxJava неявная многопоточность не существует, если только она не запрошена с subscribeOn() или неявно доступна с асинхронными потоками. Вопрос в следующем: можем ли мы принудительно запустить логику подписки, чтобы всякий раз, когда кто-то подписывается, поток уже был предварительно вычислен или, по крайней мере, началось вычисление?

Полностью жаждущий оценки

Наиболее очевидное, но ошибочное решение состоит в том, чтобы быстро вычислить все, что возвращает поток, и просто обернуть его фиксированным Flowable :

1
2
3
4
Flowable<String> eager() {
    final String slow = slow();
    return Flowable.just(slow);
}

К сожалению, это существенно противоречит цели RxJava. Прежде всего, такие операторы, как subscribeOn() больше не работают, и становится невозможным перенести вычисления в другой поток. Еще хуже, даже если eager() возвращает Flowable он всегда по определению блокирует поток клиента. Труднее рассуждать, составлять и управлять такими потоками. Как правило, вы должны избегать такой схемы и предпочитать ленивую загрузку, даже когда необходима тщательная оценка.

Использование оператора cache()

Следующий пример делает это с помощью оператора cache() :

1
2
3
4
5
6
7
8
Flowable<String> eager3() throws InterruptedException {
    final Flowable<String> cached =
        Flowable
            .fromCallable(this::slow)
            .cache();
    cached.subscribe();
    return cached;
}

Идея проста: обернуть вычисления ленивым Flowable и сделать его кэшированным. Оператор cache() выполняет запоминание всех отправленных событий при первой подписке, поэтому при появлении второго Subscriber он получает ту же кэшированную последовательность событий. Однако оператор cache() (как и большинство других) является ленивым, поэтому мы должны принудительно подписаться в первый раз. Вызов subscribe() предварительно заполнит кэш, более того, если второй подписчик появится до того, как вычисление slow() завершится, он также будет ожидать его, а не запускать во второй раз.

Это решение работает, но имейте в виду, что subscribe() будет фактически блокироваться, потому что не был задействован Scheduler Если вы хотите предварительно заполнить свой Flowable в фоновом режиме, попробуйте subscribeOn() :

1
2
3
4
5
6
7
8
9
Flowable<String> eager3() throws InterruptedException {
    final Flowable<String> cached =
        Flowable
            .fromCallable(this::slow)
            .subscribeOn(justDontAlwaysUse_Schedulers.io())
            .cache();
    cached.subscribe();
    return cached;
}

Да, использование Schedulers.io() проблематично и трудно поддерживать в производственных системах, поэтому, пожалуйста, избегайте его в пользу пользовательских пулов потоков.

Обработка ошибок

К сожалению, удивительно легко проглотить исключения в RxJava. Вот что может произойти в нашем последнем примере, если метод slow() завершится неудачно. Исключение не полностью проглочено, но по умолчанию, если никто не заинтересован, его трассировка стека печатается на System.err . Также необработанное исключение заключено в OnErrorNotImplementedException . Не очень удобно и, скорее всего, потеряно, если вы ведете какую-либо форму централизованного ведения журнала. Вы можете использовать оператор doOnError() для ведения журнала, но он по-прежнему передает исключение в нисходящем направлении, и RxJava также считает его необработанным, еще раз OnErrorNotImplementedException . Итак, давайте реализуем onError вызов onError в subscribe() :

01
02
03
04
05
06
07
08
09
10
Flowable<String> eager3() throws InterruptedException {
    final Flowable<String> cached =
        Flowable
            .fromCallable(this::slow)
            .cache();
    cached.subscribe(
            x -> {/* ignore */},
            e -> logger.error("Prepopulation error", e));
    return cached;
}

Мы не хотим обрабатывать реальные события, просто ошибки в subscribe() . На данный момент вы можете безопасно вернуть такой Flowable . Это нетерпеливо, и есть вероятность, что когда бы вы ни подписались на него, данные уже будут доступны. Обратите внимание, что, например, метод observe() из Hystrix также стремятся, в отличие от toObservable() , который ленив. Выбор ваш.