Статьи

Создание потоков с поддержкой обратного давления с помощью Flowable.generate () – RxJava FAQ

RxJava отсутствует фабрика для создания бесконечного потока натуральных чисел. Такой поток полезен, например, когда вы хотите назначить уникальные порядковые номера для возможно бесконечного потока событий, сжав оба из них:

1
2
3
4
5
6
7
8
Flowable<Long> naturalNumbers = //???
 
Flowable<Event> someInfiniteEventStream = //...
Flowable<Pair<Long, Event>> sequenced = Flowable.zip(
        naturalNumbers,
        someInfiniteEventStream,
        Pair::of
);

Реализация naturalNumbers на удивление сложна. В RxJava 1.x вы могли бы ненадолго обойтись без Observable которое не учитывает противодавление:

1
2
3
4
5
6
7
8
9
import rx.Observable;  //RxJava 1.x
 
Observable<Long> naturalNumbers = Observable.create(subscriber -> {
    long state = 0;
    //poor solution 🙁
    while (!subscriber.isUnsubscribed()) {
        subscriber.onNext(state++);
    }
});

Что это означает, что такой поток не реагирует на противодавление? Ну, в основном поток генерирует события (постоянно увеличивающиеся переменные state ) так быстро, как позволяет ядро ​​ЦП, миллионы в секунду, легко. Однако, когда потребители не могут потреблять события так быстро, начинает появляться растущее отставание от необработанных событий:

1
2
3
4
5
6
7
naturalNumbers
//      .observeOn(Schedulers.io())
        .subscribe(
                x -> {
                    //slooow, 1 millisecond
                }
        );

Вышеприведенная программа (с observeOn() оператором observeOn() ) работает нормально, так как имеет случайное противодавление. По умолчанию все в RxJava однопоточное, поэтому производитель и потребитель работают в одном потоке. Вызов subscriber.onNext() фактически блокирует, поэтому цикл while автоматически регулируется. Но попробуйте раскомментировать observeOn() и через несколько миллисекунд произойдет observeOn() . Обратный вызов подписки является однопоточным. Для каждого элемента требуется не менее 1 миллисекунды, поэтому этот поток может обрабатывать не более 1000 событий в секунду. Нам несколько везет. RxJava быстро обнаруживает это катастрофическое состояние и быстро терпит неудачу с MissingBackpressureException

Нашей самой большой ошибкой было создание событий без учета того, насколько медленным является потребитель. Кстати, это основная идея реактивных потоков : производителю не разрешено генерировать больше событий, чем запрошено потребителем. В RxJava 1.x реализация даже самого простого потока, который учитывал противодавление с нуля, была нетривиальной задачей. В RxJava 2.x появилось несколько удобных операторов, основанных на опыте предыдущих версий. Прежде всего, RxJava 2.x не позволяет вам реализовывать Flowable (с поддержкой обратного давления) так же, как вы можете использовать Observable . Невозможно создать Flowable который перегружает потребителя сообщениями:

1
2
3
4
5
6
Flowable<Long> naturalNumbers = Flowable.create(subscriber -> {
    long state = 0;
    while (!subscriber.isCancelled()) {
        subscriber.onNext(state++);
    }
}, BackpressureStrategy.DROP);

Вы заметили этот дополнительный параметр DROP? Прежде чем мы это объясним, давайте посмотрим на вывод, когда мы подписываемся на медленного потребителя:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
0
1
2
3
//...continuous numbers...
126
127
101811682
//...where did my 100M events go?!?
101811683
101811684
101811685
//...continuous numbers...
101811776
//...17M events disappeared again...
101811777
//...

Ваш пробег может варьироваться. Что случается? Оператор observeOn() переключается между планировщиками (пулами потоков). Пул потоков, которые гидратируются из очереди ожидающих событий. Эта очередь конечна и имеет емкость 128 элементов. observeOn() , зная об этом ограничении, запрашивает только 128 элементов из восходящего потока (наш собственный Flowable ). На этом этапе он позволяет нашему подписчику обрабатывать события, 1 на миллисекунду. Таким образом, примерно через 100 миллисекунд observeOn() обнаруживает, что его внутренняя очередь почти пуста и запрашивает больше. Это получает 128, 129, 130 …? Нет! В течение этого 0,1-секундного периода наш Flowable события, похожие на сумасшедшие, и за это время (удивительно) удалось сгенерировать более 100 миллионов чисел. Куда они делись? Ну что ж, observeOn() за ними observeOn() не запрашивалось, поэтому стратегия DROP (обязательный параметр) просто отбрасывала нежелательные события.

BackpressureStrategy

Это не звучит правильно, есть ли другие стратегии? Да много:

  • BackpressureStrategy.BUFFER : Если восходящий BackpressureStrategy.BUFFER генерирует слишком много событий, они помещаются в буфер в неограниченной очереди. События не потеряны, но, скорее всего, все ваше приложение. Если вам повезет, OutOfMemoryError спасет вас. Я застрял на 5+ секундных GC паузах.
  • BackpressureStrategy.ERROR : если обнаружено MissingBackpressureException событий, будет MissingBackpressureException . Это вменяемая (и безопасная) стратегия.
  • BackpressureStrategy.LATEST : аналогично DROP , но запоминает последнее BackpressureStrategy.LATEST событие. На всякий случай приходит запрос дополнительных данных, но мы просто отбрасываем все – по крайней мере, у нас есть последнее увиденное значение.
  • BackpressureStrategy.MISSING : Нет мер безопасности, BackpressureStrategy.MISSING с этим. Скорее всего, один из нижестоящих операторов (например, observeOn() ) сгенерирует MissingBackpressureException .
  • BackpressureStrategy.DROP : удаляет события, которые не были запрошены.

Кстати, когда вы превращаете Observable в Flowable вы также должны предоставить BackpressureStrategy . RxJava должен знать, как ограничить перепроизводство Observable . Итак, какова правильная реализация такого простого потока последовательных натуральных чисел?

Встречайте Flowable.generate()

Разница между create() и generate() заключается в ответственности. Предполагается, что Flowable.create() генерирует поток полностью, без учета противодавления. Он просто создает события, когда хочет. Flowable.generate() с другой стороны, может генерировать только одно событие за раз (или завершать поток). Механизм противодавления прозрачно вычисляет, сколько событий ему нужно в данный момент. observeOn() generate() вызывается соответствующее количество раз, например 128 раз, в случае observeOn() .

Поскольку этот оператор генерирует события по одному, обычно ему требуется какое-то состояние, чтобы выяснить, где он был в последний раз 1 . Вот что значит generate() : держатель (im) изменяемого состояния и функция, которая на его основе генерирует следующее событие:

1
2
3
4
5
Flowable<Long> naturalNumbers =
    Flowable.generate(() -> 0L, (state, emitter) -> {
        emitter.onNext(state);
        return state + 1;
    });

Первый аргумент метода generate() – это начальное состояние (фабрика), в нашем случае 0L . Теперь каждый раз, когда подписчик или любой последующий оператор запрашивает некоторое количество событий, вызывается лямбда-выражение. Его обязанность – вызывать onNext() не более одного раза (генерировать не более одного события) каким-либо образом на основе предоставленного состояния. Когда лямбда вызывается впервые, state равно начальному значению 0L . Однако нам разрешено изменять состояние и возвращать его новое значение. В этом примере мы увеличиваем long так, чтобы последующий вызов лямбда-выражения получал state = 1L . Очевидно, что это продолжается и продолжается, производя последовательные натуральные числа.

Такая модель программирования, очевидно, сложнее, чем цикл while. Это также в корне меняет способ реализации ваших источников событий. Вместо того, чтобы выдвигать события всякий раз, когда вам это нравится, вы просто пассивно ждете запросов. Операторы и подписчики нисходящего потока извлекают данные из вашего потока. Этот сдвиг обеспечивает противодавление на всех уровнях вашего трубопровода.

generate() имеет несколько разновидностей. Прежде всего, если ваше состояние является изменяемым объектом, вы можете использовать перегруженную версию, которая не требует возврата нового значения состояния. Несмотря на то, что изменяемое состояние является менее функциональным, оно обычно производит меньше мусора. Это предполагает, что ваше состояние постоянно изменяется, и каждый раз передается один и тот же экземпляр объекта состояния. Например, вы можете легко превратить Iterator (также на основе pull!) В поток со всеми чудесами обратного давления:

1
2
3
4
5
6
7
8
9
Iterator<Integer> iter = //...
 
Flowable<String> strings = Flowable.generate(() -> iter, (iterator, emitter) -> {
    if (iterator.hasNext()) {
        emitter.onNext(iterator.next().toString());
    } else {
        emitter.onComplete();
    }
});

Обратите внимание, что тип потока ( <String> ) не обязательно должен совпадать с типом состояния ( Iterator<Integer> ). Конечно, если у вас есть Collection Java и вы хотите превратить ее в поток, вам не нужно сначала создавать итератор. Для этого достаточно использовать Flowable.fromIterable() . Еще более простая версия generate() предполагает, что у вас нет состояния вообще. Например поток случайных чисел:

1
2
Flowable<Double> randoms = Flowable
        .generate(emitter -> emitter.onNext(Math.random()));

Но, честно говоря, вам, вероятно, понадобится экземпляр Random :

1
2
3
Flowable.generate(Random::new, (random, emitter) -> {
    emitter.onNext(random.nextBoolean());
});

Резюме

Как вы можете видеть, Observable.create() в RxJava 1.x и Flowable.create() имеют некоторые недостатки. Если вы действительно заботитесь о масштабируемости и работоспособности своей системы с высокой степенью параллелизма (и в противном случае вы бы этого не читали!), Вы должны знать о противодавлении. Если вам действительно нужно создавать потоки с нуля, а не использовать методы из семейства from*() или различные библиотеки, которые выполняют тяжелую работу – ознакомьтесь с методом generate() . По сути, вы должны научиться моделировать определенные типы источников данных как причудливые итераторы. Ожидайте больше статей, объясняющих, как реализовать больше реальных потоков.

Это похоже на HTTP-протокол без сохранения состояния, который использует небольшие части состояния, называемые сеансом * на сервере, для отслеживания прошлых запросов.