Статьи

Фиксированная ставка против фиксированной задержки — RxJava FAQ

Если вы используете простую Java, начиная с версии 5 у нас есть удобный класс планировщика, который позволяет запускать задачи с фиксированной скоростью или с фиксированной задержкой:

1
2
3
4
5
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
  
ScheduledExecutorService scheduler =
        Executors.newScheduledThreadPool(10);

В основном это поддерживает два типа операций:

1
2
scheduler.scheduleAtFixedRate(() -> doStuff(), 2, 1, SECONDS);
scheduler.scheduleWithFixedDelay(() -> doStuff(), 2, 1, SECONDS);

scheduleAtFixedRate() будет гарантировать, что doStuff() вызывается точно каждую секунду с начальной задержкой в ​​две секунды. Конечно, сборка мусора, переключение контекста и т. Д. Все еще могут повлиять на точность. scheduleWithFixedDelay() по-видимому, аналогичен, однако учитывает doStuff() обработки doStuff() . Например, если doStuff() работает в течение doStuff() мс, фиксированная скорость будет ждать только 800 мс до следующей попытки. scheduleWithFixedDelay() с другой стороны, всегда ожидает одинаковое количество времени (в нашем случае 1 секунда) между повторными попытками. Оба поведения, конечно, желательны при разных обстоятельствах. Только помните, что когда doStuff() медленнее, чем 1 секунда, scheduleAtFixedRate() не сохранит желаемую частоту. Даже несмотря на то, что наш ScheduledExecutorService имеет 10 потоков, doStuff() никогда не будет вызываться одновременно и перекрываться с предыдущим выполнением. Следовательно, в этом случае скорость будет на самом деле меньше настроенной.
<h1 ″> Планирование в RxJava

Имитация scheduleAtFixedRate() с помощью RxJava очень проста с помощью оператора interval() . С несколькими оговорками:

1
2
3
Flowable
        .interval(2, 1, SECONDS)
        .subscribe(i -> doStuff());

Если doStuff() медленнее, чем 1 секунда, происходят плохие вещи. Прежде всего, мы используем пул потоков Schedulers.computation() , по умолчанию один унаследован от оператора interval() . Это плохая идея, этот пул потоков следует использовать только для задач, интенсивно использующих процессор, и он используется для всего RxJava. Лучшая идея — использовать свой собственный планировщик (или хотя бы io() ):

1
2
3
4
Flowable
        .interval(2, 1, SECONDS)
        .observeOn(Schedulers.io())
        .subscribe(i -> doStuff());

observeOn() переключается от планировщика computation() используемого в interval() к планировщику io() . Поскольку метод subscribe() никогда не вызывается одновременно по замыслу, doStuff() никогда не вызывается одновременно, как и в scheduleAtFixedRate() . Однако оператор interval() очень старается сохранить постоянную частоту. Это означает, что если doStuff() будет медленнее, чем 1 секунда, через некоторое время мы должны ожидать MissingBackpressureException … RxJava в основном говорит нам, что наш подписчик слишком медленный, но interval() (по замыслу) не может замедляться. Если вы допускаете (или даже ожидаете) перекрывающиеся параллельные выполнения doStuff() , это очень просто исправить. Во-первых, вы должны обернуть блокирующий doStuff() неблокирующим Completable . Технически, Flowable Single или Maybe будут работать так же хорошо, но, поскольку doStuff() является void , Completable звучит хорошо:

01
02
03
04
05
06
07
08
09
10
import io.reactivex.Completable;
import io.reactivex.schedulers.Schedulers;
  
Completable doStuffAsync() {
    return Completable
            .fromRunnable(this::doStuff)
            .subscribeOn(Schedulers.io())
            .doOnError(e -> log.error("Stuff failed", e))
            .onErrorComplete();
}

Важно отлавливать и проглатывать исключения, в противном случае одна ошибка приведет к прерыванию всего interval() . doOnError() позволяет вести журнал, но передает исключение через нисходящий doOnError() . doOnComplete() с другой стороны, просто проглатывает исключение. Теперь мы можем просто запустить эту операцию при каждом интервальном событии

1
2
3
4
Flowable
        .interval(2, 1, SECONDS)
        .flatMapCompletable(i -> doStuffAsync())
        .subscribe();

Если вы не subscribe() цикл никогда не запустится — но это RxJava 101. Обратите внимание, что если doStuffAsync() занимает больше одной секунды, мы получим перекрывающиеся параллельные выполнения. В этом нет ничего плохого, вы просто должны знать об этом. Но что, если вам действительно нужна фиксированная задержка?

Исправлены задержки в RxJava

В некоторых случаях вам требуется фиксированная задержка: задачи не должны пересекаться, и мы должны немного дышать между выполнениями. Независимо от того, насколько медленной является периодическая задача, всегда должна быть постоянная пауза времени. Оператор interval() не подходит для реализации этого требования. Однако, если получится, решение в RxJava смущающе простое. Подумайте об этом: вам нужно поспать некоторое время, выполнить какое-то задание, и когда оно завершится, повторите. Позвольте мне сказать это снова:

  • немного поспать (есть какой-то timer() )
  • запустить какое-то задание и дождаться его complete()
  • repeat()

Это оно!

1
2
3
4
5
Flowable
        .timer(1, SECONDS)
        .flatMapCompletable(i -> doStuffAsync())
        .repeat()
        .subscribe();

Оператор timer() выдает одно событие ( 0 типа Long ) через секунду. Мы используем это событие для запуска doStuffAsync() . Когда наш материал готов, весь поток завершается, но мы хотели бы повторить! Хорошо, оператор repeat() делает именно это: когда он получает уведомление о завершении от восходящего потока, он повторно подписывается. Повторная подписка в основном означает: подождать еще 1 секунду, запустить doStuffAsync() и так далее.