Простой, эффективный и безопасный параллелизм был одним из принципов проектирования RxJava. И все же, по иронии судьбы, это, вероятно, один из самых неправильно понятых аспектов этой библиотеки. Давайте рассмотрим простой пример: представьте, что у нас есть набор UUID
и для каждого из них мы должны выполнить набор задач. Первая проблема — выполнить интенсивную операцию ввода-вывода для каждого UUID
, например, загрузить объект из базы данных:
1
2
3
4
5
6
|
Flowable<UUID> ids = Flowable .fromCallable(UUID::randomUUID) .repeat() .take( 100 ); ids.subscribe(id -> slowLoadBy(id)); |
Сначала я генерирую 100 случайных UUID только для тестирования. Затем для каждого UUID я хотел бы загрузить запись, используя следующий метод:
1
2
3
|
Person slowLoadBy(UUID id) { //... } |
Реализация slowLoadBy()
имеет значения, просто имейте в виду, что она медленная и блокирующая. Использование subscribe()
для вызова slowLoadBy()
имеет много недостатков:
-
subscribe()
является однопоточным по дизайну, и нет никакого способа обойти это. КаждыйUUID
загружается последовательно - когда вы вызываете
subscribe()
вы не можете трансформировать объектPerson
дальше. Это терминальная операция
Более надежный и еще более ошибочный подход состоит в map()
каждого UUID
:
1
2
|
Flowable<Person> people = ids .map(id -> slowLoadBy(id)); //BROKEN |
Это очень читабельно, но, к сожалению, сломано. Операторы, как и подписчики, являются однопоточными. Это означает, что в любой момент времени может быть отображен только один UUID
, параллелизм здесь также не разрешен. Что еще хуже, мы наследуем поток / работника от апстрима. Это имеет несколько недостатков. Если восходящий поток генерирует события с использованием какого-то выделенного планировщика, мы будем захватывать потоки из этого планировщика. Например, многие операторы, такие как interval()
, прозрачно используют пул потоков Schedulers.computation()
. Мы неожиданно начинаем выполнять интенсивные операции ввода-вывода в пуле, который совершенно не подходит для этого. Более того, мы замедляем весь конвейер с помощью одного последовательного шага блокировки. Очень очень плохо.
Возможно, вы слышали об этом операторе subscribeOn()
и о том, как он обеспечивает параллелизм. Действительно, но вы должны быть очень осторожны при его применении. Следующий пример (снова) неверен :
1
2
3
4
5
6
|
import io.reactivex.schedulers.Schedulers; Flowable<Person> people = ids .subscribeOn(Schedulers.io()) .map(id -> slowLoadBy(id)); //BROKEN |
Выше приведен фрагмент кода. observeOn()
subscribeOn()
(и в этом случае observeOn()
) едва переключают выполнение на другого работника (поток), не вводя параллелизма. Поток по-прежнему последовательно обрабатывает все события, но в другом потоке. Другими словами — вместо последовательного потребления событий в потоке, унаследованном от восходящего потока, теперь мы используем их последовательно в потоке io()
. Так что насчет этого мифического оператора flatMap()
?
flatMap()
на помощь
flatMap()
включает параллелизм, разделяя поток событий на поток подпотоков. Но сначала еще один неработающий пример:
1
2
3
4
5
6
7
|
Flowable<Person> asyncLoadBy(UUID id) { return Flowable.fromCallable(() -> slowLoadBy(id)); } Flowable<Person> people = ids .subscribeOn(Schedulers.io()) .flatMap(id -> asyncLoadBy(id)); //BROKEN |
О боже, это все еще сломано ! flatMap()
логически делает две вещи:
- применяя преобразование (
id -> asyncLoadBy(id)
) к каждому вышестоящему событию — это создаетFlowable<Flowable<Person>>
. Это имеет смысл, для каждого восходящегоUUID
мы получаемFlowable<Person>
поэтому мы получаем поток потоков объектовPerson
- затем
flatMap()
пытается подписаться на все эти внутренние подпотоки одновременно. Всякий раз, когда какой-либо из подпотоков испускает событиеPerson
, оно прозрачно передается как результат внешнегоFlowable
.
Технически, flatMap()
создает и подписывается только на первые 128 (по умолчанию, необязательный параметр maxConcurrency
) подпотоков. Также, когда завершается последний подпоток, также завершается внешний поток Person
. Теперь, почему на земле это сломано? RxJava не вводит пул потоков, если это явно не требуется. Например, этот кусок кода все еще блокирует:
01
02
03
04
05
06
07
08
09
10
11
|
log.info( "Setup" ); Flowable<String> blocking = Flowable .fromCallable(() -> { log.info( "Starting" ); TimeUnit.SECONDS.sleep( 1 ); log.info( "Done" ); return "Hello, world!" ; }); log.info( "Created" ); blocking.subscribe(s -> log.info( "Received {}" , s)); log.info( "Done" ); |
Внимательно посмотрите на вывод, особенно в порядке событий и потоков:
1
2
3
4
5
6
|
19:57:28.847 | INFO | main | Setup 19:57:28.943 | INFO | main | Created 19:57:28.949 | INFO | main | Starting 19:57:29.954 | INFO | main | Done 19:57:29.955 | INFO | main | Received Hello, world! 19:57:29.957 | INFO | main | Done |
Никакого параллелизма, никаких дополнительных потоков. Простая упаковка кода блокировки в Flowable
волшебным образом не добавляет параллелизма. Вы должны явно использовать… subscribeOn()
:
01
02
03
04
05
06
07
08
09
10
11
12
|
log.info( "Setup" ); Flowable<String> blocking = Flowable .fromCallable(() -> { log.info( "Starting" ); TimeUnit.SECONDS.sleep( 1 ); log.info( "Done" ); return "Hello, world!" ; }) .subscribeOn(Schedulers.io()); log.info( "Created" ); blocking.subscribe(s -> log.info( "Received {}" , s)); log.info( "Done" ); |
Вывод на этот раз более перспективный:
1
2
3
4
5
6
|
19:59:10.547 | INFO | main | Setup 19:59:10.653 | INFO | main | Created 19:59:10.662 | INFO | main | Done 19:59:10.664 | INFO | RxCachedThreadScheduler-1 | Starting 19:59:11.668 | INFO | RxCachedThreadScheduler-1 | Done 19:59:11.669 | INFO | RxCachedThreadScheduler-1 | Received Hello, world! |
Но в прошлый раз мы использовали subscribeOn()
, что происходит? Ну, метод subscribeOn()
на уровне внешнего потока в основном говорит, что все события должны обрабатываться последовательно, в этом потоке, в другом потоке. Мы не говорили, что должно быть много подпотоков, работающих одновременно. И поскольку все подпотоки блокируются, когда RxJava пытается подписаться на все из них, он эффективно подписывается последовательно один за другим. asyncLoadBy()
самом деле не асинхронен , поэтому он блокируется, когда оператор flatMap()
пытается подписаться на него. Исправить легко. Обычно вы бы помещали subscribeOn()
в asyncLoadBy()
но в образовательных целях я поместил бы его прямо в основной конвейер:
1
2
|
Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io())); |
Теперь это работает как шарм! По умолчанию RxJava примет первые 128 восходящих событий ( UUID
), превратит их в подпотоки и подпишется на все из них. Если подпотоки являются асинхронными и в высокой степени распараллеливаемыми (например, сетевые вызовы), мы получаем 128 одновременных вызовов asyncLoadBy()
. Уровень параллелизма (128) настраивается с помощью параметра maxConcurrency
:
1
2
3
4
5
|
Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()), 10 //maxConcurrency ); |
Это было много работы, вы не думаете? Разве параллелизм не должен быть еще более декларативным? Мы больше не имеем дело с Executor
и фьючерсами, но все же кажется, что этот подход слишком подвержен ошибкам. Разве это не может быть так просто, как parallel()
в потоках Java 8?
Введите ParallelFlowable
Давайте сначала снова посмотрим на наш пример и сделаем его еще более сложным, добавив filter()
:
1
2
3
|
Flowable<Person> people = ids .map( this ::slowLoadBy) //BROKEN .filter( this ::hasLowRisk); //BROKEN |
где hasLowRisk()
— это медленный предикат:
1
2
3
|
boolean hasLowRisk(Person p) { //slow... } |
Мы уже знаем, что идиоматический подход к этой проблеме заключается в использовании flatMap()
, дважды:
1
2
3
|
Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(io())) .flatMap(p -> asyncHasLowRisk(p).subscribeOn(io())); |
asyncHasLowRisk()
довольно неясен — он возвращает либо одноэлементный поток при asyncHasLowRisk()
предиката, либо пустой поток при сбое. Вот как вы эмулируете filter()
с помощью flatMap()
. Можем ли мы сделать лучше? Начиная с RxJava 2.0.5, появился новый оператор, который называется… parallel()
! Это довольно удивительно, потому что оператор с тем же именем был удален до того, как RxJava стал 1.0 из-за многих заблуждений и неправильного использования. В 2.x parallel()
2.x), кажется, наконец решается проблема идиоматического параллелизма безопасным и декларативным способом. Во-первых, давайте посмотрим на красивый код!
1
2
3
4
5
6
|
Flowable<Person> people = ids .parallel( 10 ) .runOn(Schedulers.io()) .map( this ::slowLoadBy) .filter( this ::hasLowRisk) .sequential(); |
Просто так! Блок кода между parallel()
и sequential()
выполняется … параллельно. Что мы имеем здесь? Прежде всего, новый оператор parallel()
превращает Flowable<UUID>
в ParallelFlowable<UUID>
который имеет гораздо меньший API, чем Flowable. Через секунду вы увидите, почему. Необязательный параметр int
(в нашем случае 10) определяет параллелизм, или (как указано в документации), сколько параллельных «рельсов» создано. Поэтому для нас мы разделили один Flowable<Person>
на 10 параллельных независимых рельсов (думаю: потоков ). События из исходного потока UUID
разделены ( modulo 10
) на разные рельсы, подпотоки, которые не зависят друг от друга. Думайте о них как об отправке вышестоящих событий в 10 отдельных потоков. Но сначала мы должны определить, откуда эти потоки — используя удобный оператор runOn()
. Это намного лучше, чем parallel()
в потоках Java 8, где вы не можете контролировать уровень параллелизма.
На данный момент у нас есть ParallelFlowable
. Когда событие появляется в восходящем потоке ( UUID
), оно делегируется одному из 10 «рельсов», параллельных, независимых конвейеров. Конвейер предоставляет ограниченное подмножество операторов, которые безопасны для одновременной работы, например map()
и filter()
, но также и reduce()
. Нет никаких buffer()
, take()
и т. Д., Так как их семантика неясна при вызове сразу для нескольких подпотоков. Наша блокировка slowLoadBy()
а также hasLowRisk()
по-прежнему вызывается последовательно, но только в пределах одного «рельса». Поскольку у нас теперь есть 10 одновременных «рельсов», мы эффективно распараллеливали их без особых усилий.
Когда события достигают конца подпотока («рельса»), они сталкиваются с оператором sequential()
. Этот оператор превращает ParallelFlowable
обратно в Flowable
. Поскольку наши преобразователи и фильтры являются поточно-ориентированными, пара parallel()
/ sequential()
обеспечивает очень простой способ распараллеливания потоков. Одна небольшая оговорка — вы неизбежно получите переупорядоченные сообщения. Последовательные map()
и filter()
всегда сохраняют порядок (как и большинство операторов). Но как только вы запускаете их в блоке parallel()
, порядок теряется. Это учитывает больший параллелизм, но вы должны иметь это в виду.
Стоит ли использовать parallel()
вместо вложенного flatMap()
для распараллеливания вашего кода? Это зависит от вас, но parallel()
кажется намного проще для чтения и понимания.
Ссылка: | Парадигматический параллелизм: flatMap () и parallel () — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге по Java и соседству . |