Статьи

1.x в 2.x миграция: наблюдаемая и наблюдаемая: RxJava FAQ

Название не является ошибкой. rx.Observable из RxJava 1.x — совершенно другой зверь, чем io.reactivex.Observable из 2.x. Слепое обновление зависимости rx и переименование всех импортов в вашем проекте будет компилироваться (с небольшими изменениями), но не гарантирует того же поведения. В самые первые дни проекта Observable в 1.x не имел понятия противодавления, но позднее было включено противодавление. Что это на самом деле означает? Давайте представим, что у нас есть поток, который генерирует одно событие каждую 1 миллисекунду, но для обработки одного такого элемента требуется 1 секунда . Вы видите, что это не может работать таким образом в долгосрочной перспективе:

1
2
3
4
5
6
7
8
import rx.Observable;  //RxJava 1.x
import rx.schedulers.Schedulers;
  
Observable
        .interval(1, MILLISECONDS)
        .observeOn(Schedulers.computation())
        .subscribe(
                x -> sleep(Duration.ofSeconds(1)));

MissingBackpressureException закрадывается в течение нескольких сотен миллисекунд. Но что означает это исключение? Ну, в основном это система безопасности (или проверка работоспособности, если хотите), которая предотвращает нанесение вреда вашему приложению. RxJava автоматически обнаруживает, что производитель переполняет потребителя, и активно прерывает поток, чтобы избежать дальнейшего повреждения. Так что, если мы просто найдем и заменим несколько импортов здесь и там?

1
2
3
4
5
6
7
8
import io.reactivex.Observable;     //RxJava 2.x
import io.reactivex.schedulers.Schedulers;
  
Observable
        .interval(1, MILLISECONDS)
        .observeOn(Schedulers.computation())
        .subscribe(
                x -> sleep(Duration.ofSeconds(1)));

Исключение прошло! Такова наша пропускная способность … Приложение через некоторое время останавливается, оставаясь в бесконечном цикле GC. Видите ли, Observable в RxJava 1.x имеет утверждения (ограниченные очереди, проверки и т. Д.) Повсюду, следя за тем, чтобы вы нигде не были переполнены. Например, оператор наблюдаем в 1.x имеет очередь, ограниченную 128 элементами по умолчанию. Когда обратное давление должным образом реализовано во всем стеке, оператор observeOn() запрашивает в восходящем направлении не более 128 элементов для заполнения своего внутреннего буфера. Затем отдельные потоки (рабочие) из этого планировщика собирают события из этой очереди. Когда очередь становится почти пустой, оператор observeOn() ( request() метод)) больше. Этот механизм разваливается, когда производитель не уважает запросы противодавления и отправляет больше данных, чем это было разрешено, что фактически переполняет потребителя. Внутренняя очередь внутри оператора observeOn() заполнена, но оператор interval() продолжает выдавать новые события — потому что это то, что должен делать interval() .

Observable в 1.x обнаруживает такое переполнение и быстро завершается с MissingBackpressureException . Это буквально означает: я так старался поддерживать систему в исправном состоянии, но мой восходящий поток не учитывает противодавление — реализация противодавления отсутствует . Однако Observable в 2.x не имеет такого механизма безопасности. Это ванильный поток, который надеется, что вы будете хорошим гражданином и будете иметь либо медленных производителей, либо быстрых потребителей. Когда система исправна, оба Observable ведут себя одинаково. Однако под нагрузкой 1.x терпит неудачу быстро, 2.x терпит неудачу медленно и мучительно.

Означает ли это, что RxJava 2.x — шаг назад? Наоборот! В 2.x было сделано важное различие:

  • Observable не заботится о противодавлении, что значительно упрощает его конструкцию и реализацию. Он должен использоваться для моделирования потоков, которые не могут поддерживать противодавление по определению, например, события пользовательского интерфейса
  • Flowable поддерживает противодавление и имеет все меры безопасности на месте. Другими словами, все шаги в конвейере вычислений гарантируют, что вы не переполняете потребителя.

2.x проводит важное различие между потоками, которые могут поддерживать противодавление (« могут замедляться при необходимости » простыми словами), и теми, которые этого не делают. С точки зрения системы типов становится ясно, с каким источником мы имеем дело и каковы его гарантии. Так как же нам перенести наш пример interval() в RxJava 2.x? Проще, чем вы думаете:

1
2
3
4
5
Flowable
        .interval(1, MILLISECONDS)
        .observeOn(Schedulers.computation())
        .subscribe(
                x -> sleep(Duration.ofSeconds(1)));

Так просто. Вы можете задать себе вопрос: почему у Flowable может быть оператор interval() который по определению не может поддерживать противодавление? Предполагается, что после того, как весь interval() доставляет события с постоянной скоростью, он не может замедляться! Хорошо, если вы посмотрите на объявление interval() вы заметите:

1
@BackpressureSupport(BackpressureKind.ERROR)

Проще говоря, это говорит нам о том, что всякий раз, когда обратное давление больше не может быть гарантировано, RxJava позаботится об этом и MissingBackpressureException . Именно так и происходит, когда мы запускаем программу Flowable.interval() — она ​​быстро дает сбой, в отличие от дестабилизации всего приложения.

Итак, в заключение, всякий раз, когда вы видите Observable из 1.x, то, что вы, вероятно, хотите, это Flowable из 2.x. По крайней мере, если ваш поток по определению не поддерживает противодавление. Несмотря на то же имя, Observable s в этих двух главных выпусках довольно различны. Но как только вы выполните поиск и замену с Observable на Flowable вы заметите, что миграция не так проста. Речь идет не об изменениях API, различия более глубокие.

В Flowable.create() нет простого Flowable.create() прямо эквивалентного Observable.create() Я сам сделал ошибку, чтобы в прошлом чрезмерно использовать фабричный метод Observable.create() . create() позволяет генерировать события с произвольной скоростью, полностью игнорируя противодавление. 2.x имеет несколько дружественных средств для обработки запросов противодавления, но они требуют тщательного проектирования ваших потоков. Это будет рассмотрено в следующем FAQ.