Статьи

Параллельное выполнение задач блокировки с помощью RxJava и Completable

Как параллельное выполнение блокирующих задач «только с побочными эффектами» (иначе говоря, void) стало легче с абстракцией Completable представленной в RxJava 1.1.1 «

rx_logo_512 Как вы, наверное, заметили, читая мой блог, я в основном специализируюсь на Software Craftsmanship и автоматическом тестировании кода. Однако, кроме того, я энтузиаст непрерывной доставки и широкого определения параллелизма. Последний пункт варьируется от чистых потоков и семафоров в C до более высокоуровневых решений, таких как ReactiveX и модель актера. На этот раз пример использования для очень удобной (в определенных случаях) функции, представленной в новом RxJava 1.1.1 — rx.Completable . Подобно многим моим записям в блоге, это также отражение реального события, с которым я столкнулся, работая над реальными задачами и вариантами использования.

Задача сделать

Представьте себе систему с довольно сложной обработкой асинхронных событий, поступающих из разных источников. Фильтрация, объединение, преобразование, группировка, обогащение и многое другое. RxJava здесь очень хорошо подходит, особенно если мы хотим быть реактивными. Давайте предположим, что мы уже реализовали это (выглядит и работает хорошо), и осталась только одна вещь. Прежде чем мы начнем обработку, необходимо сообщить 3 внешним системам, что мы готовы получать сообщения. 3 синхронных вызова в унаследованные системы (через RMI, JMX или SOAP). Каждый из них может длиться несколько секунд, и нам нужно дождаться их всех, прежде чем мы начнем. К счастью, они уже реализованы, и мы относимся к ним как к черным ящикам, которые могут преуспеть (или потерпеть неудачу за исключением). Нам просто нужно позвонить им (желательно одновременно) и дождаться окончания.

rx.Observable — подход 1

Имея RxJava под рукой, это выглядит как очевидный подход. Во-первых, выполнение задания можно обернуть с помощью Observable :

1
2
3
4
5
6
private Observable<Void> rxJobExecute(Job job) {
    return Observable.fromCallable(() -> {
        job.execute();
        return null;
    });
}

К сожалению (в нашем случае) Observable ожидает возврата некоторых элементов. Нам нужно использовать Void и неловко return null (вместо просто ссылки на метод job::execute .

Затем мы можем использовать метод subscribeOn() чтобы использовать другой поток для выполнения нашей работы (а не блокировать основной / текущий поток — мы не хотим выполнять наши задания последовательно). Schedulers.io() предоставляет планировщику набор потоков, предназначенных для работы с IO.

1
2
Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

Наконец, нам нужно дождаться завершения всех из них (все Obvervable завершены). Для этого можно настроить функцию zip. Он объединяет элементы, испускаемые Obserbable по порядковому номеру. В нашем случае нас интересует только первый псевдоэлемент из каждого задания Observable (мы выдаем только null чтобы удовлетворить API) и ждем их блокирующим способом. Функция zip в операторе zip должна что-то возвращать, поэтому нам нужно повторить обходной путь с null .

1
2
3
Observable.zip(run1, run2, (r1, r2) -> return null)
         .toBlocking()
         .single();

Довольно очевидно, что Observable был разработан для работы с потоками значений, и для его настройки требуется только дополнительная операция (ничего не возвращающая). Ситуация становится еще хуже, когда нам нужно объединить (например, объединить) нашу операцию только с побочными эффектами с другими, возвращающими некоторые значения — требуется более уродливое приведение. Смотрите реальный пример использования из RxNetty API.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
public void execute() {
    Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
    Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());
 
    Observable.zip(run1, run2, (r1, r2) -> null)
        .toBlocking()
        .single();
}
 
private Observable<Void> rxJobExecute(Job job) {
    return Observable.fromCallable(() -> {
        job.execute();
        return null;
    });
}

rx.Observable — подход 2

Может быть использован другой подход. Вместо создания искусственного элемента, пустое Observable с нашей задачей может быть выполнено как действие onComplete . Это заставляет нас перейти от операции zip к merge . В результате нам нужно предоставить действие onNext (которое никогда не выполняется для пустого Observable ), которое подтверждает нас в убеждении, что мы пытаемся взломать систему.

01
02
03
04
05
06
07
08
09
10
11
12
13
public void execute() {
    Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
    Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());
 
    Observable.merge(run1, run2)
            .toBlocking()
            .subscribe(next -> {});
}
 
private Observable<Object> rxJobExecute(Job job) {
    return Observable.empty()
            .doOnCompleted(job::execute);
}

rx.Completable

Лучшая поддержка Observable, которая не возвращает никакого значения, была рассмотрена в RxJava 1.1.1. Completable может рассматриваться как удаленная версия Observable которая может либо успешно завершиться (событие onCompleted onError ), либо завершиться неудачей ( onError ). Самым простым способом создания экземпляра Completable является использование метода fromAction который принимает Action0 который не возвращает никакого значения (например, Runnable ).

1
2
3
4
Completable completable1 = Completable.fromAction(job1::execute)
        .subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute)
        .subscribeOn(Schedulers.io());

Затем мы можем использовать метод merge() который возвращает экземпляр Completable который подписывается на все последующие Completable s сразу и завершается, когда все они завершаются (или один из них завершается неудачно). Поскольку мы использовали метод subscribeOn с внешним планировщиком, все задания выполняются параллельно (в разных потоках).

1
2
Completable.merge(completable1, completable2)
        .await();

Метод await() блокируется до завершения всех заданий (в случае ошибки исключение будет переброшено). Чисто и просто.

1
2
3
4
5
6
7
8
9
public void execute() {
    Completable completable1 = Completable.fromAction(job1::execute)
            .subscribeOn(Schedulers.io());
    Completable completable2 = Completable.fromAction(job2::execute)
            .subscribeOn(Schedulers.io());
 
    Completable.merge(completable1, completable2)
        .await();
}

java.util.concurrent.CompletableFuture

Некоторые из вас могут спросить: почему бы просто не использовать CompletableFuture ? Это был бы хороший вопрос. В то время как чистое Future представленное в Java 5, может потребовать дополнительной работы с нашей стороны, ListenableFuture (из Guava) и CompletableFuture (из Java 8) делают его довольно тривиальным.

Во-первых, нам нужно запустить / запланировать выполнение наших заданий. Затем, используя метод CompletableFuture.allOf() мы можем создать новый CompletableFuture который завершается в момент завершения всех заданий (разве мы не видели эту концепцию раньше?). Метод get() просто блокирует ожидание этого.

01
02
03
04
05
06
07
08
09
10
11
12
public void execute() {
    try {
        CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);
        CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);
 
        CompletableFuture.allOf(run1, run2)
            .get();
 
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException("Jobs execution failed", e);
    }
}

Нам нужно что-то делать с проверенными исключениями (очень часто мы не хотим загрязнять наш API ими), но в целом это выглядит разумно. Однако стоит помнить, что CompletableFuture не CompletableFuture когда требуется более сложная цепная обработка. Помимо того, что RxJava уже используется в нашем проекте, часто полезно использовать тот же (или похожий) API вместо того, чтобы вводить что-то совершенно новое.

Резюме

Благодаря rx.Completable выполнять задачи с побочными эффектами (ничего не возвращая) с помощью RxJava. В кодовой базе, уже использующей RxJava, она может быть предпочтительнее, чем CompletableFuture даже для простых случаев. Тем не менее, Completable предоставляет множество дополнительных возможностей для операторов и методов, и, кроме того, его можно легко смешать с Observable что делает его еще более мощным.

rxjava книга-маленький Чтобы узнать больше о Completable вы можете увидеть примечания к выпуску . Для тех, кто хочет глубже понять тему, есть подробное введение в Completable API в блоге Advanced RxJava ( часть 1 и 2 ).

  • Исходный код примеров кода доступен на GitHub .

Кстати, если вы заинтересованы в RxJava в целом, я могу с чистой совестью порекомендовать вам книгу, которую в настоящее время пишут Томаш Нуркевич и Бен Кристенсен — Реактивное программирование с помощью RxJava .