Как параллельное выполнение блокирующих задач «только с побочными эффектами» (иначе говоря, void) стало легче с абстракцией Completable
представленной в RxJava 1.1.1 «
Как вы, наверное, заметили, читая мой блог, я в основном специализируюсь на Software Craftsmanship и автоматическом тестировании кода. Однако, кроме того, я энтузиаст непрерывной доставки и широкого определения параллелизма. Последний пункт варьируется от чистых потоков и семафоров в C до более высокоуровневых решений, таких как ReactiveX и модель актера. На этот раз пример использования для очень удобной (в определенных случаях) функции, представленной в новом RxJava 1.1.1 — rx.Completable
. Подобно многим моим записям в блоге, это также отражение реального события, с которым я столкнулся, работая над реальными задачами и вариантами использования.
Задача сделать
Представьте себе систему с довольно сложной обработкой асинхронных событий, поступающих из разных источников. Фильтрация, объединение, преобразование, группировка, обогащение и многое другое. RxJava здесь очень хорошо подходит, особенно если мы хотим быть реактивными. Давайте предположим, что мы уже реализовали это (выглядит и работает хорошо), и осталась только одна вещь. Прежде чем мы начнем обработку, необходимо сообщить 3 внешним системам, что мы готовы получать сообщения. 3 синхронных вызова в унаследованные системы (через RMI, JMX или SOAP). Каждый из них может длиться несколько секунд, и нам нужно дождаться их всех, прежде чем мы начнем. К счастью, они уже реализованы, и мы относимся к ним как к черным ящикам, которые могут преуспеть (или потерпеть неудачу за исключением). Нам просто нужно позвонить им (желательно одновременно) и дождаться окончания.
rx.Observable — подход 1
Имея RxJava под рукой, это выглядит как очевидный подход. Во-первых, выполнение задания можно обернуть с помощью Observable
:
1
2
3
4
5
6
|
private Observable<Void> rxJobExecute(Job job) { return Observable.fromCallable(() -> { job.execute(); return null ; }); } |
К сожалению (в нашем случае) Observable
ожидает возврата некоторых элементов. Нам нужно использовать Void
и неловко return null
(вместо просто ссылки на метод job::execute
.
Затем мы можем использовать метод subscribeOn()
чтобы использовать другой поток для выполнения нашей работы (а не блокировать основной / текущий поток — мы не хотим выполнять наши задания последовательно). Schedulers.io()
предоставляет планировщику набор потоков, предназначенных для работы с IO.
1
2
|
Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io()); |
Наконец, нам нужно дождаться завершения всех из них (все Obvervable
завершены). Для этого можно настроить функцию zip. Он объединяет элементы, испускаемые Obserbable
по порядковому номеру. В нашем случае нас интересует только первый псевдоэлемент из каждого задания Observable
(мы выдаем только null
чтобы удовлетворить API) и ждем их блокирующим способом. Функция zip в операторе zip должна что-то возвращать, поэтому нам нужно повторить обходной путь с null
.
1
2
3
|
Observable.zip(run1, run2, (r1, r2) -> return null ) .toBlocking() .single(); |
Довольно очевидно, что Observable
был разработан для работы с потоками значений, и для его настройки требуется только дополнительная операция (ничего не возвращающая). Ситуация становится еще хуже, когда нам нужно объединить (например, объединить) нашу операцию только с побочными эффектами с другими, возвращающими некоторые значения — требуется более уродливое приведение. Смотрите реальный пример использования из RxNetty API.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
public void execute() { Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io()); Observable.zip(run1, run2, (r1, r2) -> null ) .toBlocking() .single(); } private Observable<Void> rxJobExecute(Job job) { return Observable.fromCallable(() -> { job.execute(); return null ; }); } |
rx.Observable — подход 2
Может быть использован другой подход. Вместо создания искусственного элемента, пустое Observable
с нашей задачей может быть выполнено как действие onComplete
. Это заставляет нас перейти от операции zip
к merge
. В результате нам нужно предоставить действие onNext
(которое никогда не выполняется для пустого Observable
), которое подтверждает нас в убеждении, что мы пытаемся взломать систему.
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public void execute() { Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io()); Observable.merge(run1, run2) .toBlocking() .subscribe(next -> {}); } private Observable<Object> rxJobExecute(Job job) { return Observable.empty() .doOnCompleted(job::execute); } |
rx.Completable
Лучшая поддержка Observable, которая не возвращает никакого значения, была рассмотрена в RxJava 1.1.1. Completable
может рассматриваться как удаленная версия Observable
которая может либо успешно завершиться (событие onCompleted
onError
), либо завершиться неудачей ( onError
). Самым простым способом создания экземпляра Completable
является использование метода fromAction
который принимает Action0
который не возвращает никакого значения (например, Runnable
).
1
2
3
4
|
Completable completable1 = Completable.fromAction(job1::execute) .subscribeOn(Schedulers.io()); Completable completable2 = Completable.fromAction(job2::execute) .subscribeOn(Schedulers.io()); |
Затем мы можем использовать метод merge()
который возвращает экземпляр Completable
который подписывается на все последующие Completable
s сразу и завершается, когда все они завершаются (или один из них завершается неудачно). Поскольку мы использовали метод subscribeOn
с внешним планировщиком, все задания выполняются параллельно (в разных потоках).
1
2
|
Completable.merge(completable1, completable2) .await(); |
Метод await()
блокируется до завершения всех заданий (в случае ошибки исключение будет переброшено). Чисто и просто.
1
2
3
4
5
6
7
8
9
|
public void execute() { Completable completable1 = Completable.fromAction(job1::execute) .subscribeOn(Schedulers.io()); Completable completable2 = Completable.fromAction(job2::execute) .subscribeOn(Schedulers.io()); Completable.merge(completable1, completable2) .await(); } |
java.util.concurrent.CompletableFuture
Некоторые из вас могут спросить: почему бы просто не использовать CompletableFuture
? Это был бы хороший вопрос. В то время как чистое Future
представленное в Java 5, может потребовать дополнительной работы с нашей стороны, ListenableFuture
(из Guava) и CompletableFuture
(из Java 8) делают его довольно тривиальным.
Во-первых, нам нужно запустить / запланировать выполнение наших заданий. Затем, используя метод CompletableFuture.allOf()
мы можем создать новый CompletableFuture
который завершается в момент завершения всех заданий (разве мы не видели эту концепцию раньше?). Метод get()
просто блокирует ожидание этого.
01
02
03
04
05
06
07
08
09
10
11
12
|
public void execute() { try { CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute); CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute); CompletableFuture.allOf(run1, run2) .get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException( "Jobs execution failed" , e); } } |
Нам нужно что-то делать с проверенными исключениями (очень часто мы не хотим загрязнять наш API ими), но в целом это выглядит разумно. Однако стоит помнить, что CompletableFuture
не CompletableFuture
когда требуется более сложная цепная обработка. Помимо того, что RxJava уже используется в нашем проекте, часто полезно использовать тот же (или похожий) API вместо того, чтобы вводить что-то совершенно новое.
Резюме
Благодаря rx.Completable
выполнять задачи с побочными эффектами (ничего не возвращая) с помощью RxJava. В кодовой базе, уже использующей RxJava, она может быть предпочтительнее, чем CompletableFuture
даже для простых случаев. Тем не менее, Completable
предоставляет множество дополнительных возможностей для операторов и методов, и, кроме того, его можно легко смешать с Observable
что делает его еще более мощным.
Чтобы узнать больше о Completable
вы можете увидеть примечания к выпуску . Для тех, кто хочет глубже понять тему, есть подробное введение в Completable API в блоге Advanced RxJava ( часть 1 и 2 ).
- Исходный код примеров кода доступен на GitHub .
Кстати, если вы заинтересованы в RxJava в целом, я могу с чистой совестью порекомендовать вам книгу, которую в настоящее время пишут Томаш Нуркевич и Бен Кристенсен — Реактивное программирование с помощью RxJava .
Ссылка: | Параллельное выполнение задач блокировки с помощью RxJava и Completable от нашего партнера по JCG Марцина Заячковского в блоге Solid Soft . |