Как параллельное выполнение блокирующих задач «только с побочными эффектами» (иначе говоря, void) стало легче с абстракцией Completable представленной в RxJava 1.1.1 «

rx.Completable . Подобно многим моим записям в блоге, это также отражение реального события, с которым я столкнулся, работая над реальными задачами и вариантами использования.
Задача сделать
Представьте себе систему с довольно сложной обработкой асинхронных событий, поступающих из разных источников. Фильтрация, объединение, преобразование, группировка, обогащение и многое другое. RxJava здесь очень хорошо подходит, особенно если мы хотим быть реактивными. Давайте предположим, что мы уже реализовали это (выглядит и работает хорошо), и осталась только одна вещь. Прежде чем мы начнем обработку, необходимо сообщить 3 внешним системам, что мы готовы получать сообщения. 3 синхронных вызова в унаследованные системы (через RMI, JMX или SOAP). Каждый из них может длиться несколько секунд, и нам нужно дождаться их всех, прежде чем мы начнем. К счастью, они уже реализованы, и мы относимся к ним как к черным ящикам, которые могут преуспеть (или потерпеть неудачу за исключением). Нам просто нужно позвонить им (желательно одновременно) и дождаться окончания.
rx.Observable — подход 1
Имея RxJava под рукой, это выглядит как очевидный подход. Во-первых, выполнение задания можно обернуть с помощью Observable :
|
1
2
3
4
5
6
|
private Observable<Void> rxJobExecute(Job job) { return Observable.fromCallable(() -> { job.execute(); return null; });} |
К сожалению (в нашем случае) Observable ожидает возврата некоторых элементов. Нам нужно использовать Void и неловко return null (вместо просто ссылки на метод job::execute .
Затем мы можем использовать метод subscribeOn() чтобы использовать другой поток для выполнения нашей работы (а не блокировать основной / текущий поток — мы не хотим выполнять наши задания последовательно). Schedulers.io() предоставляет планировщику набор потоков, предназначенных для работы с IO.
|
1
2
|
Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io()); |
Наконец, нам нужно дождаться завершения всех из них (все Obvervable завершены). Для этого можно настроить функцию zip. Он объединяет элементы, испускаемые Obserbable по порядковому номеру. В нашем случае нас интересует только первый псевдоэлемент из каждого задания Observable (мы выдаем только null чтобы удовлетворить API) и ждем их блокирующим способом. Функция zip в операторе zip должна что-то возвращать, поэтому нам нужно повторить обходной путь с null .
|
1
2
3
|
Observable.zip(run1, run2, (r1, r2) -> return null) .toBlocking() .single(); |
Довольно очевидно, что Observable был разработан для работы с потоками значений, и для его настройки требуется только дополнительная операция (ничего не возвращающая). Ситуация становится еще хуже, когда нам нужно объединить (например, объединить) нашу операцию только с побочными эффектами с другими, возвращающими некоторые значения — требуется более уродливое приведение. Смотрите реальный пример использования из RxNetty API.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
public void execute() { Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io()); Observable.zip(run1, run2, (r1, r2) -> null) .toBlocking() .single();}private Observable<Void> rxJobExecute(Job job) { return Observable.fromCallable(() -> { job.execute(); return null; });} |
rx.Observable — подход 2
Может быть использован другой подход. Вместо создания искусственного элемента, пустое Observable с нашей задачей может быть выполнено как действие onComplete . Это заставляет нас перейти от операции zip к merge . В результате нам нужно предоставить действие onNext (которое никогда не выполняется для пустого Observable ), которое подтверждает нас в убеждении, что мы пытаемся взломать систему.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public void execute() { Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io()); Observable.merge(run1, run2) .toBlocking() .subscribe(next -> {});}private Observable<Object> rxJobExecute(Job job) { return Observable.empty() .doOnCompleted(job::execute);} |
rx.Completable
Лучшая поддержка Observable, которая не возвращает никакого значения, была рассмотрена в RxJava 1.1.1. Completable может рассматриваться как удаленная версия Observable которая может либо успешно завершиться (событие onCompleted onError ), либо завершиться неудачей ( onError ). Самым простым способом создания экземпляра Completable является использование метода fromAction который принимает Action0 который не возвращает никакого значения (например, Runnable ).
|
1
2
3
4
|
Completable completable1 = Completable.fromAction(job1::execute) .subscribeOn(Schedulers.io());Completable completable2 = Completable.fromAction(job2::execute) .subscribeOn(Schedulers.io()); |
Затем мы можем использовать метод merge() который возвращает экземпляр Completable который подписывается на все последующие Completable s сразу и завершается, когда все они завершаются (или один из них завершается неудачно). Поскольку мы использовали метод subscribeOn с внешним планировщиком, все задания выполняются параллельно (в разных потоках).
|
1
2
|
Completable.merge(completable1, completable2) .await(); |
Метод await() блокируется до завершения всех заданий (в случае ошибки исключение будет переброшено). Чисто и просто.
|
1
2
3
4
5
6
7
8
9
|
public void execute() { Completable completable1 = Completable.fromAction(job1::execute) .subscribeOn(Schedulers.io()); Completable completable2 = Completable.fromAction(job2::execute) .subscribeOn(Schedulers.io()); Completable.merge(completable1, completable2) .await();} |
java.util.concurrent.CompletableFuture
Некоторые из вас могут спросить: почему бы просто не использовать CompletableFuture ? Это был бы хороший вопрос. В то время как чистое Future представленное в Java 5, может потребовать дополнительной работы с нашей стороны, ListenableFuture (из Guava) и CompletableFuture (из Java 8) делают его довольно тривиальным.
Во-первых, нам нужно запустить / запланировать выполнение наших заданий. Затем, используя метод CompletableFuture.allOf() мы можем создать новый CompletableFuture который завершается в момент завершения всех заданий (разве мы не видели эту концепцию раньше?). Метод get() просто блокирует ожидание этого.
|
01
02
03
04
05
06
07
08
09
10
11
12
|
public void execute() { try { CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute); CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute); CompletableFuture.allOf(run1, run2) .get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Jobs execution failed", e); }} |
Нам нужно что-то делать с проверенными исключениями (очень часто мы не хотим загрязнять наш API ими), но в целом это выглядит разумно. Однако стоит помнить, что CompletableFuture не CompletableFuture когда требуется более сложная цепная обработка. Помимо того, что RxJava уже используется в нашем проекте, часто полезно использовать тот же (или похожий) API вместо того, чтобы вводить что-то совершенно новое.
Резюме
Благодаря rx.Completable выполнять задачи с побочными эффектами (ничего не возвращая) с помощью RxJava. В кодовой базе, уже использующей RxJava, она может быть предпочтительнее, чем CompletableFuture даже для простых случаев. Тем не менее, Completable предоставляет множество дополнительных возможностей для операторов и методов, и, кроме того, его можно легко смешать с Observable что делает его еще более мощным.

Completable вы можете увидеть примечания к выпуску . Для тех, кто хочет глубже понять тему, есть подробное введение в Completable API в блоге Advanced RxJava ( часть 1 и 2 ).
- Исходный код примеров кода доступен на GitHub .
Кстати, если вы заинтересованы в RxJava в целом, я могу с чистой совестью порекомендовать вам книгу, которую в настоящее время пишут Томаш Нуркевич и Бен Кристенсен — Реактивное программирование с помощью RxJava .
| Ссылка: | Параллельное выполнение задач блокировки с помощью RxJava и Completable от нашего партнера по JCG Марцина Заячковского в блоге Solid Soft . |