Статьи

Методы побочных эффектов RxJava

Класс Observable в RxJava имеет множество методов, которые можно использовать для преобразования потока излучаемых элементов в тот тип данных, который вам необходим. Эти методы лежат в основе RxJava и составляют большую часть его привлекательности.

Но есть и другие методы, которые никак не изменяют поток элементов — я называю эти методы методами побочного эффекта.

Что я имею в виду под побочными эффектами?

Методы побочных эффектов не влияют на ваш поток сам по себе. Вместо этого они вызываются, когда происходят определенные события, чтобы позволить вам реагировать на эти события.

Например: если вы заинтересованы в том, чтобы делать что-то вне ваших  Subscriberобратных вызовов при возникновении какой-либо ошибки, вы должны использовать  doOnError() метод и передавать ему  функциональный интерфейс,  который будет использоваться при возникновении ошибки:

someObservable
      .doOnError(new Action1() {
         @Override
         public void call(Throwable t) {
            // use this callback to clean up resources,
            // log the event or or report the
            // problem to the user
         }
      })
      //…

Наиболее важной частью является  call() метод. Код этого метода будет выполнен до  Subscriberвызова  onError() метода.

В дополнение к исключениям RxJava предлагает еще много событий, на которые вы можете реагировать:


События и соответствующие им побочные эффекты
метод Функциональный интерфейс Мероприятие
doOnSubscribe () Action0 Подписчик подписывается на Observable
doOnUnsubscribe () Action0 Подписчик отписывается от подписки
doOnNext () Action1 <Т> Следующий элемент выбрасывается
doOnCompleted () Action0 The Observable больше не будет излучать предметы
doOnError () Action1 <Т> Произошла ошибка
doOnTerminate () Action0 Либо произошла ошибка, либо Observable больше не будет излучать элементы
doOnEach () Action1 <Уведомление <T >> Либо элемент был выпущен, наблюдаемое завершено, либо произошла ошибка. Объект Notification содержит информацию о типе события
doOnRequest () Action1 <Long> Нижестоящий оператор запрашивает выделение большего количества элементов

<T> относится либо к типу испускаемого предмета, либо, в случае  onError()метода, к типу броска Throwable.

Все функциональные интерфейсы имеют тип  Action0  или  Action1 . Это означает, что отдельные методы этих интерфейсов ничего не возвращают и принимают либо нулевые аргументы, либо один аргумент, в зависимости от конкретного события.

Поскольку эти методы ничего не возвращают, они не могут использоваться для изменения испускаемых элементов и, таким образом, никоим образом не изменяют поток элементов. Вместо этого эти методы предназначены для создания побочных эффектов, таких как запись чего-либо на диск, очистка состояния или чего-либо еще, что манипулирует состоянием самой системы вместо потока событий.

Примечание:  Методы побочного эффекта сам (doOnNext (), doOnCompleted () и так далее)  сделать возвращать Observable. Это должно держать интерфейс свободно. Но возвращаемая наблюдаемая имеет тот же тип и испускает те же элементы, что и исходная наблюдаемая.

Для чего они полезны?

Теперь, поскольку они не меняют поток предметов, для них должно быть другое использование. Я привожу здесь три примера того, чего вы можете достичь с помощью этих методов:

  • Использовать  doOnNext() для отладки
  • Использовать  doOnError() внутри  flatMap() для обработки ошибок
  • Используйте  doOnNext() для сохранения / кэширования результатов сети

Итак, давайте посмотрим на эти примеры в деталях.

Используйте doOnNext () для отладки

С RxJava вы иногда удивляетесь, почему ваш  Observable работает не так, как ожидалось. Особенно, когда вы только начинаете. Поскольку вы используете свободный API для преобразования какого-либо источника во что-то, на что вы хотите подписаться, вы увидите только то, что получите в конце этого конвейера преобразования.

Когда я изучал RxJava, у меня был некоторый начальный опыт работы с потоками Java. По сути, у вас там такая же проблема. У вас есть плавный API для перемещения из одного потока одного типа в другой поток другого типа. Но что, если это не сработает, как ожидалось?

С Java 8 Streams у вас есть  peek() метод. Поэтому, когда я начинал с RxJava, я задавался вопросом, доступно ли что-то сопоставимое. Ну, есть. На самом деле, RxJava предлагает гораздо больше!

Вы можете использовать  doOnNext() метод в любом месте вашего конвейера обработки, чтобы увидеть, что происходит и каков промежуточный результат.

Вот пример этого:

Observable someObservable = Observable
            .from(Arrays.asList(new Integer[]{2, 3, 5, 7, 11}))
            .doOnNext(System.out::println)
            .filter(prime -> prime % 2 == 0)
            .doOnNext(System.out::println)
            .count()
            .doOnNext(System.out::println)
            .map(number -> String.format(“Contains %d elements”, number));

Subscription subscription = o.subscribe(
            System.out::println,
            System.out::println,
            () -> System.out.println(“Completed!”));

И вот вывод этого кода:

2
3
3
5
5
7
7
11
11
4
Contains 4 elements
Completed!

Таким образом, вы можете получить ценную информацию о том, что происходит, когда ваш Observable ведет себя не так, как вы ожидали.

Эти  doOnError() и  doOnCompleted() методы могут быть также полезны для отладки состояния вашего трубопровода.

Примечание.  Если вы используете RxJava при разработке для Android, ознакомьтесь с публикацией  Фродо  и Фернандо Цеха, в которой объясняется  мотивация и использование Фродо . С Frodo вы можете использовать аннотации для отладки ваших Observables и Подписчиков. 

The shown way of using doOnNext() and doOnError() does not change much of the system state – apart from bloating your log and slowing everything down. 

But there are other uses for these operators. And in those cases you use those methods to actually change the state of your system. Let’s have a look at them.

Use doOnError() within flatMap()

Say you’re using Retrofit to access some resource over the network. Since Retrofit supports observables, you can easily use those calls within your processing chain usingflatMap().

Alas, network related calls can go wrong in many ways – especially on mobiles. In this case you might not want the Observable to stop working, which it would if you were to rely on your subscriber’s onError() callback alone.

But keep in mind that you have an Observable within your flatMap() method. Thus you could use the doOnError() method to change the UI in some way, yet still have a working Observable stream for future events.

So what this looks like is this:

flatMap(id -> service.getPost()
       .doOnError(t -> {
          // report problem to UI
       })
       .onErrorResumeNext(Observable.empty())
)

This method is especially useful if you query your remote resource as a result of potentially recurring UI events.

Use doOnNext() to save/cache network results

If at some point in your chain you make network calls, you could use doOnNext() to store the incoming results to your local database or put them in some cache.

It would be as simple as the following lines:

// getOrderById is getting a fresh order
// from the net and returns an observable of orders
// Observable<Order> getOrderById(long id) {…}

Observable.from(aListWithIds)
         .flatMap(id -> getOrderById(id)
                              .doOnNext(order -> cacheOrder(order))
         // carry on with more processing

See this pattern applied in more detail in Daniel Lew’s excellent blog post aboutaccessing multiple sources.

Wrap Up

As you’ve seen, you can use the side effect methods of RxJava in multiple ways. Even though they do not change the stream of emitted items, they change the state of your overall system. This can be something as simple as logging the current items of yourObservable at a certain point within your processing pipeline, up to writing objects to your database as a result of a network call.

In my next post I am going to show you how to use RxJava’s hooks to get further insights. Stay tuned!