Простой, эффективный и безопасный параллелизм был одним из принципов проектирования RxJava. И все же, по иронии судьбы, это, вероятно, один из самых неправильно понятых аспектов этой библиотеки. Давайте рассмотрим простой пример: представьте, что у нас есть набор UUID и для каждого из них мы должны выполнить набор задач. Первая проблема — выполнить интенсивную операцию ввода-вывода для каждого UUID , например, загрузить объект из базы данных:
|
1
2
3
4
5
6
|
Flowable<UUID> ids = Flowable .fromCallable(UUID::randomUUID) .repeat() .take(100); ids.subscribe(id -> slowLoadBy(id)); |
Сначала я генерирую 100 случайных UUID только для тестирования. Затем для каждого UUID я хотел бы загрузить запись, используя следующий метод:
|
1
2
3
|
Person slowLoadBy(UUID id) { //...} |
Реализация slowLoadBy() имеет значения, просто имейте в виду, что она медленная и блокирующая. Использование subscribe() для вызова slowLoadBy() имеет много недостатков:
-
subscribe()является однопоточным по дизайну, и нет никакого способа обойти это. КаждыйUUIDзагружается последовательно - когда вы вызываете
subscribe()вы не можете трансформировать объектPersonдальше. Это терминальная операция
Более надежный и еще более ошибочный подход состоит в map() каждого UUID :
|
1
2
|
Flowable<Person> people = ids .map(id -> slowLoadBy(id)); //BROKEN |
Это очень читабельно, но, к сожалению, сломано. Операторы, как и подписчики, являются однопоточными. Это означает, что в любой момент времени может быть отображен только один UUID , параллелизм здесь также не разрешен. Что еще хуже, мы наследуем поток / работника от апстрима. Это имеет несколько недостатков. Если восходящий поток генерирует события с использованием какого-то выделенного планировщика, мы будем захватывать потоки из этого планировщика. Например, многие операторы, такие как interval() , прозрачно используют пул потоков Schedulers.computation() . Мы неожиданно начинаем выполнять интенсивные операции ввода-вывода в пуле, который совершенно не подходит для этого. Более того, мы замедляем весь конвейер с помощью одного последовательного шага блокировки. Очень очень плохо.
Возможно, вы слышали об этом операторе subscribeOn() и о том, как он обеспечивает параллелизм. Действительно, но вы должны быть очень осторожны при его применении. Следующий пример (снова) неверен :
|
1
2
3
4
5
6
|
import io.reactivex.schedulers.Schedulers; Flowable<Person> people = ids .subscribeOn(Schedulers.io()) .map(id -> slowLoadBy(id)); //BROKEN |
Выше приведен фрагмент кода. observeOn() subscribeOn() (и в этом случае observeOn() ) едва переключают выполнение на другого работника (поток), не вводя параллелизма. Поток по-прежнему последовательно обрабатывает все события, но в другом потоке. Другими словами — вместо последовательного потребления событий в потоке, унаследованном от восходящего потока, теперь мы используем их последовательно в потоке io() . Так что насчет этого мифического оператора flatMap() ?
flatMap() на помощь
flatMap() включает параллелизм, разделяя поток событий на поток подпотоков. Но сначала еще один неработающий пример:
|
1
2
3
4
5
6
7
|
Flowable<Person> asyncLoadBy(UUID id) { return Flowable.fromCallable(() -> slowLoadBy(id));} Flowable<Person> people = ids .subscribeOn(Schedulers.io()) .flatMap(id -> asyncLoadBy(id)); //BROKEN |
О боже, это все еще сломано ! flatMap() логически делает две вещи:
- применяя преобразование (
id -> asyncLoadBy(id)) к каждому вышестоящему событию — это создаетFlowable<Flowable<Person>>. Это имеет смысл, для каждого восходящегоUUIDмы получаемFlowable<Person>поэтому мы получаем поток потоков объектовPerson - затем
flatMap()пытается подписаться на все эти внутренние подпотоки одновременно. Всякий раз, когда какой-либо из подпотоков испускает событиеPerson, оно прозрачно передается как результат внешнегоFlowable.
Технически, flatMap() создает и подписывается только на первые 128 (по умолчанию, необязательный параметр maxConcurrency ) подпотоков. Также, когда завершается последний подпоток, также завершается внешний поток Person . Теперь, почему на земле это сломано? RxJava не вводит пул потоков, если это явно не требуется. Например, этот кусок кода все еще блокирует:
|
01
02
03
04
05
06
07
08
09
10
11
|
log.info("Setup");Flowable<String> blocking = Flowable .fromCallable(() -> { log.info("Starting"); TimeUnit.SECONDS.sleep(1); log.info("Done"); return "Hello, world!"; });log.info("Created");blocking.subscribe(s -> log.info("Received {}", s));log.info("Done"); |
Внимательно посмотрите на вывод, особенно в порядке событий и потоков:
|
1
2
3
4
5
6
|
19:57:28.847 | INFO | main | Setup19:57:28.943 | INFO | main | Created19:57:28.949 | INFO | main | Starting19:57:29.954 | INFO | main | Done19:57:29.955 | INFO | main | Received Hello, world!19:57:29.957 | INFO | main | Done |
Никакого параллелизма, никаких дополнительных потоков. Простая упаковка кода блокировки в Flowable волшебным образом не добавляет параллелизма. Вы должны явно использовать… subscribeOn() :
|
01
02
03
04
05
06
07
08
09
10
11
12
|
log.info("Setup");Flowable<String> blocking = Flowable .fromCallable(() -> { log.info("Starting"); TimeUnit.SECONDS.sleep(1); log.info("Done"); return "Hello, world!"; }) .subscribeOn(Schedulers.io());log.info("Created");blocking.subscribe(s -> log.info("Received {}", s));log.info("Done"); |
Вывод на этот раз более перспективный:
|
1
2
3
4
5
6
|
19:59:10.547 | INFO | main | Setup19:59:10.653 | INFO | main | Created19:59:10.662 | INFO | main | Done19:59:10.664 | INFO | RxCachedThreadScheduler-1 | Starting19:59:11.668 | INFO | RxCachedThreadScheduler-1 | Done19:59:11.669 | INFO | RxCachedThreadScheduler-1 | Received Hello, world! |
Но в прошлый раз мы использовали subscribeOn() , что происходит? Ну, метод subscribeOn() на уровне внешнего потока в основном говорит, что все события должны обрабатываться последовательно, в этом потоке, в другом потоке. Мы не говорили, что должно быть много подпотоков, работающих одновременно. И поскольку все подпотоки блокируются, когда RxJava пытается подписаться на все из них, он эффективно подписывается последовательно один за другим. asyncLoadBy() самом деле не асинхронен , поэтому он блокируется, когда оператор flatMap() пытается подписаться на него. Исправить легко. Обычно вы бы помещали subscribeOn() в asyncLoadBy() но в образовательных целях я поместил бы его прямо в основной конвейер:
|
1
2
|
Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io())); |
Теперь это работает как шарм! По умолчанию RxJava примет первые 128 восходящих событий ( UUID ), превратит их в подпотоки и подпишется на все из них. Если подпотоки являются асинхронными и в высокой степени распараллеливаемыми (например, сетевые вызовы), мы получаем 128 одновременных вызовов asyncLoadBy() . Уровень параллелизма (128) настраивается с помощью параметра maxConcurrency :
|
1
2
3
4
5
|
Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()), 10 //maxConcurrency ); |
Это было много работы, вы не думаете? Разве параллелизм не должен быть еще более декларативным? Мы больше не имеем дело с Executor и фьючерсами, но все же кажется, что этот подход слишком подвержен ошибкам. Разве это не может быть так просто, как parallel() в потоках Java 8?
Введите ParallelFlowable
Давайте сначала снова посмотрим на наш пример и сделаем его еще более сложным, добавив filter() :
|
1
2
3
|
Flowable<Person> people = ids .map(this::slowLoadBy) //BROKEN .filter(this::hasLowRisk); //BROKEN |
где hasLowRisk() — это медленный предикат:
|
1
2
3
|
boolean hasLowRisk(Person p) { //slow...} |
Мы уже знаем, что идиоматический подход к этой проблеме заключается в использовании flatMap() , дважды:
|
1
2
3
|
Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(io())) .flatMap(p -> asyncHasLowRisk(p).subscribeOn(io())); |
asyncHasLowRisk() довольно неясен — он возвращает либо одноэлементный поток при asyncHasLowRisk() предиката, либо пустой поток при сбое. Вот как вы эмулируете filter() с помощью flatMap() . Можем ли мы сделать лучше? Начиная с RxJava 2.0.5, появился новый оператор, который называется… parallel() ! Это довольно удивительно, потому что оператор с тем же именем был удален до того, как RxJava стал 1.0 из-за многих заблуждений и неправильного использования. В 2.x parallel() 2.x), кажется, наконец решается проблема идиоматического параллелизма безопасным и декларативным способом. Во-первых, давайте посмотрим на красивый код!
|
1
2
3
4
5
6
|
Flowable<Person> people = ids .parallel(10) .runOn(Schedulers.io()) .map(this::slowLoadBy) .filter(this::hasLowRisk) .sequential(); |
Просто так! Блок кода между parallel() и sequential() выполняется … параллельно. Что мы имеем здесь? Прежде всего, новый оператор parallel() превращает Flowable<UUID> в ParallelFlowable<UUID> который имеет гораздо меньший API, чем Flowable. Через секунду вы увидите, почему. Необязательный параметр int (в нашем случае 10) определяет параллелизм, или (как указано в документации), сколько параллельных «рельсов» создано. Поэтому для нас мы разделили один Flowable<Person> на 10 параллельных независимых рельсов (думаю: потоков ). События из исходного потока UUID разделены ( modulo 10 ) на разные рельсы, подпотоки, которые не зависят друг от друга. Думайте о них как об отправке вышестоящих событий в 10 отдельных потоков. Но сначала мы должны определить, откуда эти потоки — используя удобный оператор runOn() . Это намного лучше, чем parallel() в потоках Java 8, где вы не можете контролировать уровень параллелизма.
На данный момент у нас есть ParallelFlowable . Когда событие появляется в восходящем потоке ( UUID ), оно делегируется одному из 10 «рельсов», параллельных, независимых конвейеров. Конвейер предоставляет ограниченное подмножество операторов, которые безопасны для одновременной работы, например map() и filter() , но также и reduce() . Нет никаких buffer() , take() и т. Д., Так как их семантика неясна при вызове сразу для нескольких подпотоков. Наша блокировка slowLoadBy() а также hasLowRisk() по-прежнему вызывается последовательно, но только в пределах одного «рельса». Поскольку у нас теперь есть 10 одновременных «рельсов», мы эффективно распараллеливали их без особых усилий.
Когда события достигают конца подпотока («рельса»), они сталкиваются с оператором sequential() . Этот оператор превращает ParallelFlowable обратно в Flowable . Поскольку наши преобразователи и фильтры являются поточно-ориентированными, пара parallel() / sequential() обеспечивает очень простой способ распараллеливания потоков. Одна небольшая оговорка — вы неизбежно получите переупорядоченные сообщения. Последовательные map() и filter() всегда сохраняют порядок (как и большинство операторов). Но как только вы запускаете их в блоке parallel() , порядок теряется. Это учитывает больший параллелизм, но вы должны иметь это в виду.
Стоит ли использовать parallel() вместо вложенного flatMap() для распараллеливания вашего кода? Это зависит от вас, но parallel() кажется намного проще для чтения и понимания.
| Ссылка: | Парадигматический параллелизм: flatMap () и parallel () — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге по Java и соседству . |