Статьи

Идиоматический параллелизм: flatMap () и параллельный () — RxJava FAQ

Простой, эффективный и безопасный параллелизм был одним из принципов проектирования RxJava. И все же, по иронии судьбы, это, вероятно, один из самых неправильно понятых аспектов этой библиотеки. Давайте рассмотрим простой пример: представьте, что у нас есть набор UUID и для каждого из них мы должны выполнить набор задач. Первая проблема — выполнить интенсивную операцию ввода-вывода для каждого UUID , например, загрузить объект из базы данных:

1
2
3
4
5
6
Flowable<UUID> ids = Flowable
        .fromCallable(UUID::randomUUID)
        .repeat()
        .take(100);
  
ids.subscribe(id -> slowLoadBy(id));

Сначала я генерирую 100 случайных UUID только для тестирования. Затем для каждого UUID я хотел бы загрузить запись, используя следующий метод:

1
2
3
Person slowLoadBy(UUID id) {
    //...
}

Реализация slowLoadBy() имеет значения, просто имейте в виду, что она медленная и блокирующая. Использование subscribe() для вызова slowLoadBy() имеет много недостатков:

  • subscribe() является однопоточным по дизайну, и нет никакого способа обойти это. Каждый UUID загружается последовательно
  • когда вы вызываете subscribe() вы не можете трансформировать объект Person дальше. Это терминальная операция

Более надежный и еще более ошибочный подход состоит в map() каждого UUID :

1
2
Flowable<Person> people = ids
        .map(id -> slowLoadBy(id));  //BROKEN

Это очень читабельно, но, к сожалению, сломано. Операторы, как и подписчики, являются однопоточными. Это означает, что в любой момент времени может быть отображен только один UUID , параллелизм здесь также не разрешен. Что еще хуже, мы наследуем поток / работника от апстрима. Это имеет несколько недостатков. Если восходящий поток генерирует события с использованием какого-то выделенного планировщика, мы будем захватывать потоки из этого планировщика. Например, многие операторы, такие как interval() , прозрачно используют пул потоков Schedulers.computation() . Мы неожиданно начинаем выполнять интенсивные операции ввода-вывода в пуле, который совершенно не подходит для этого. Более того, мы замедляем весь конвейер с помощью одного последовательного шага блокировки. Очень очень плохо.

Возможно, вы слышали об этом операторе subscribeOn() и о том, как он обеспечивает параллелизм. Действительно, но вы должны быть очень осторожны при его применении. Следующий пример (снова) неверен :

1
2
3
4
5
6
import io.reactivex.schedulers.Schedulers;
  
  
Flowable<Person> people = ids
        .subscribeOn(Schedulers.io())
        .map(id -> slowLoadBy(id)); //BROKEN

Выше приведен фрагмент кода. observeOn() subscribeOn() (и в этом случае observeOn() ) едва переключают выполнение на другого работника (поток), не вводя параллелизма. Поток по-прежнему последовательно обрабатывает все события, но в другом потоке. Другими словами — вместо последовательного потребления событий в потоке, унаследованном от восходящего потока, теперь мы используем их последовательно в потоке io() . Так что насчет этого мифического оператора flatMap() ?

flatMap() на помощь

flatMap() включает параллелизм, разделяя поток событий на поток подпотоков. Но сначала еще один неработающий пример:

1
2
3
4
5
6
7
Flowable<Person> asyncLoadBy(UUID id) {
    return Flowable.fromCallable(() -> slowLoadBy(id));
}
  
Flowable<Person> people = ids
        .subscribeOn(Schedulers.io())
        .flatMap(id -> asyncLoadBy(id)); //BROKEN

О боже, это все еще сломано ! flatMap() логически делает две вещи:

  • применяя преобразование ( id -> asyncLoadBy(id) ) к каждому вышестоящему событию — это создает Flowable<Flowable<Person>> . Это имеет смысл, для каждого восходящего UUID мы получаем Flowable<Person> поэтому мы получаем поток потоков объектов Person
  • затем flatMap() пытается подписаться на все эти внутренние подпотоки одновременно. Всякий раз, когда какой-либо из подпотоков испускает событие Person , оно прозрачно передается как результат внешнего Flowable .

Технически, flatMap() создает и подписывается только на первые 128 (по умолчанию, необязательный параметр maxConcurrency ) подпотоков. Также, когда завершается последний подпоток, также завершается внешний поток Person . Теперь, почему на земле это сломано? RxJava не вводит пул потоков, если это явно не требуется. Например, этот кусок кода все еще блокирует:

01
02
03
04
05
06
07
08
09
10
11
log.info("Setup");
Flowable<String> blocking = Flowable
        .fromCallable(() -> {
            log.info("Starting");
            TimeUnit.SECONDS.sleep(1);
            log.info("Done");
            return "Hello, world!";
        });
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");

Внимательно посмотрите на вывод, особенно в порядке событий и потоков:

1
2
3
4
5
6
19:57:28.847 | INFO  | main | Setup
19:57:28.943 | INFO  | main | Created
19:57:28.949 | INFO  | main | Starting
19:57:29.954 | INFO  | main | Done
19:57:29.955 | INFO  | main | Received Hello, world!
19:57:29.957 | INFO  | main | Done

Никакого параллелизма, никаких дополнительных потоков. Простая упаковка кода блокировки в Flowable волшебным образом не добавляет параллелизма. Вы должны явно использовать… subscribeOn() :

01
02
03
04
05
06
07
08
09
10
11
12
log.info("Setup");
Flowable<String> blocking = Flowable
        .fromCallable(() -> {
            log.info("Starting");
            TimeUnit.SECONDS.sleep(1);
            log.info("Done");
            return "Hello, world!";
        })
        .subscribeOn(Schedulers.io());
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");

Вывод на этот раз более перспективный:

1
2
3
4
5
6
19:59:10.547 | INFO  | main | Setup
19:59:10.653 | INFO  | main | Created
19:59:10.662 | INFO  | main | Done
19:59:10.664 | INFO  | RxCachedThreadScheduler-1 | Starting
19:59:11.668 | INFO  | RxCachedThreadScheduler-1 | Done
19:59:11.669 | INFO  | RxCachedThreadScheduler-1 | Received Hello, world!

Но в прошлый раз мы использовали subscribeOn() , что происходит? Ну, метод subscribeOn() на уровне внешнего потока в основном говорит, что все события должны обрабатываться последовательно, в этом потоке, в другом потоке. Мы не говорили, что должно быть много подпотоков, работающих одновременно. И поскольку все подпотоки блокируются, когда RxJava пытается подписаться на все из них, он эффективно подписывается последовательно один за другим. asyncLoadBy() самом деле не асинхронен , поэтому он блокируется, когда оператор flatMap() пытается подписаться на него. Исправить легко. Обычно вы бы помещали subscribeOn() в asyncLoadBy() но в образовательных целях я поместил бы его прямо в основной конвейер:

1
2
Flowable<Person> people = ids
    .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));

Теперь это работает как шарм! По умолчанию RxJava примет первые 128 восходящих событий ( UUID ), превратит их в подпотоки и подпишется на все из них. Если подпотоки являются асинхронными и в высокой степени распараллеливаемыми (например, сетевые вызовы), мы получаем 128 одновременных вызовов asyncLoadBy() . Уровень параллелизма (128) настраивается с помощью параметра maxConcurrency :

1
2
3
4
5
Flowable<Person> people = ids
    .flatMap(id ->
                asyncLoadBy(id).subscribeOn(Schedulers.io()),
                10  //maxConcurrency
    );

Это было много работы, вы не думаете? Разве параллелизм не должен быть еще более декларативным? Мы больше не имеем дело с Executor и фьючерсами, но все же кажется, что этот подход слишком подвержен ошибкам. Разве это не может быть так просто, как parallel() в потоках Java 8?

Введите ParallelFlowable

Давайте сначала снова посмотрим на наш пример и сделаем его еще более сложным, добавив filter() :

1
2
3
Flowable<Person> people = ids
        .map(this::slowLoadBy)     //BROKEN
        .filter(this::hasLowRisk); //BROKEN

где hasLowRisk() — это медленный предикат:

1
2
3
boolean hasLowRisk(Person p) {
    //slow...
}

Мы уже знаем, что идиоматический подход к этой проблеме заключается в использовании flatMap() , дважды:

1
2
3
Flowable<Person> people = ids
        .flatMap(id -> asyncLoadBy(id).subscribeOn(io()))
        .flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));

asyncHasLowRisk() довольно неясен — он возвращает либо одноэлементный поток при asyncHasLowRisk() предиката, либо пустой поток при сбое. Вот как вы эмулируете filter() с помощью flatMap() . Можем ли мы сделать лучше? Начиная с RxJava 2.0.5, появился новый оператор, который называется… parallel() ! Это довольно удивительно, потому что оператор с тем же именем был удален до того, как RxJava стал 1.0 из-за многих заблуждений и неправильного использования. В 2.x parallel() 2.x), кажется, наконец решается проблема идиоматического параллелизма безопасным и декларативным способом. Во-первых, давайте посмотрим на красивый код!

1
2
3
4
5
6
Flowable<Person> people = ids
        .parallel(10)
        .runOn(Schedulers.io())
        .map(this::slowLoadBy)
        .filter(this::hasLowRisk)
        .sequential();

Просто так! Блок кода между parallel() и sequential() выполняется … параллельно. Что мы имеем здесь? Прежде всего, новый оператор parallel() превращает Flowable<UUID> в ParallelFlowable<UUID> который имеет гораздо меньший API, чем Flowable. Через секунду вы увидите, почему. Необязательный параметр int (в нашем случае 10) определяет параллелизм, или (как указано в документации), сколько параллельных «рельсов» создано. Поэтому для нас мы разделили один Flowable<Person> на 10 параллельных независимых рельсов (думаю: потоков ). События из исходного потока UUID разделены ( modulo 10 ) на разные рельсы, подпотоки, которые не зависят друг от друга. Думайте о них как об отправке вышестоящих событий в 10 отдельных потоков. Но сначала мы должны определить, откуда эти потоки — используя удобный оператор runOn() . Это намного лучше, чем parallel() в потоках Java 8, где вы не можете контролировать уровень параллелизма.

На данный момент у нас есть ParallelFlowable . Когда событие появляется в восходящем потоке ( UUID ), оно делегируется одному из 10 «рельсов», параллельных, независимых конвейеров. Конвейер предоставляет ограниченное подмножество операторов, которые безопасны для одновременной работы, например map() и filter() , но также и reduce() . Нет никаких buffer() , take() и т. Д., Так как их семантика неясна при вызове сразу для нескольких подпотоков. Наша блокировка slowLoadBy() а также hasLowRisk() по-прежнему вызывается последовательно, но только в пределах одного «рельса». Поскольку у нас теперь есть 10 одновременных «рельсов», мы эффективно распараллеливали их без особых усилий.

Когда события достигают конца подпотока («рельса»), они сталкиваются с оператором sequential() . Этот оператор превращает ParallelFlowable обратно в Flowable . Поскольку наши преобразователи и фильтры являются поточно-ориентированными, пара parallel() / sequential() обеспечивает очень простой способ распараллеливания потоков. Одна небольшая оговорка — вы неизбежно получите переупорядоченные сообщения. Последовательные map() и filter() всегда сохраняют порядок (как и большинство операторов). Но как только вы запускаете их в блоке parallel() , порядок теряется. Это учитывает больший параллелизм, но вы должны иметь это в виду.

Стоит ли использовать parallel() вместо вложенного flatMap() для распараллеливания вашего кода? Это зависит от вас, но parallel() кажется намного проще для чтения и понимания.