Статьи

Java 8: полное руководство по CompletableFuture

Java 8 идет, поэтому пришло время изучить новые функции. Хотя Java 7 и Java 6 были довольно незначительными выпусками, версия 8 станет большим шагом вперед. Может быть, даже слишком большой? Сегодня я дам вам подробное объяснение новой абстракции в JDK 8 — CompletableFuture<T> . Как вы все знаете, я надеюсь, что Java 8 выйдет менее чем через год, поэтому эта статья основана на JDK 8 build 88 с поддержкой лямбды . CompletableFuture<T> расширяет Future<T> , предоставляя функциональные монадические (!) Операции и продвигая модель асинхронного, управляемого событиями программирования, в отличие от блокировки в более старой Java. Если вы открыли JavaDoc CompletableFuture<T> вы наверняка перегружены. Около пятидесяти методов (!), Некоторые из них очень загадочные и экзотические, например:

1
2
3
4
public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletableFuture<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn,
    Executor executor)

Не волнуйся, но продолжай читать. CompletableFuture собирает все функции ListenableFuture в Гуаве с SettableFuture . Более того, встроенная лямбда-поддержка приближает его к фьючерсам Scala / Akka . Звучит слишком хорошо, чтобы быть правдой, но продолжайте читать. CompletableFuture имеет две основные области, превосходящие доброе старое Future<T> асинхронного обратного вызова / преобразований и возможность устанавливать значение CompletableFuture из любого потока в любой момент времени.

Извлечь / изменить упакованное значение

Обычно фьючерсы представляют собой фрагмент кода, выполняемый другим потоком. Но это не всегда так. Иногда вы хотите создать Future представляющее какое-то событие, которое, как вы знаете, произойдет, например, прибытие сообщения JMS Итак, у вас есть Future<Message> но в этом будущем нет асинхронной работы. Вы просто хотите завершить (разрешить) это будущее, когда придет сообщение JMS, и это обусловлено событием. В этом случае вы можете просто создать CompletableFuture , вернуть его своему клиенту и, когда вы считаете, что ваши результаты доступны, просто complete() будущее и разблокировать всех клиентов, ожидающих этого будущего.

Для начала вы можете просто создать новое CompletableFuture из воздуха и передать его своему клиенту:

1
2
3
4
5
public CompletableFuture<String> ask() {
    final CompletableFuture<String> future = new CompletableFuture<>();
    //...
    return future;
}

Обратите внимание, что это будущее не связано ни с Callable<String> , ни с пулом потоков, ни с асинхронным заданием. Если теперь клиентский код вызывает ask().get() он будет заблокирован навсегда. Если он регистрирует некоторые обратные вызовы завершения, они никогда не сработают. Так какой в ​​этом смысл? Теперь вы можете сказать:

1
future.complete("42")

… И в этот самый момент все клиенты, заблокированные в Future.get() , получат строку результата. Также обратные вызовы завершения будут срабатывать немедленно. Это очень удобно, когда вы хотите представить задачу в будущем, но не обязательно вычислительную задачу, выполняемую в каком-либо потоке выполнения. CompletableFuture.complete() может быть вызван только один раз, последующие вызовы игнорируются. Но есть задняя дверь под названием CompletableFuture.obtrudeValue(...) которая заменяет предыдущее значение Future новым. Используйте с осторожностью.

Иногда вы хотите сообщить о неудаче. Как вы знаете, объекты Future могут обрабатывать либо завернутый результат, либо исключение. Если вы хотите передать какое-то исключение дальше, есть CompletableFuture.completeExceptionally(ex)obtrudeException(ex) злой брат, который переопределяет предыдущее исключение). completeExceptionally() также разблокирует всех ожидающих клиентов, но на этот раз выдает исключение из get() . Говоря о get() , есть также метод CompletableFuture.join() с небольшими изменениями в обработке ошибок. Но в целом они одинаковы. И наконец, есть также метод CompletableFuture.getNow(valueIfAbsent) , который не блокирует, но, если Future еще не завершен, возвращает значение по умолчанию. Полезно при создании надежных систем, где мы не хотим слишком долго ждать.

Последний метод static утилиты — completedFuture(value) который возвращает уже завершенный объект Future . Может быть полезно для тестирования или при написании некоторого слоя адаптера.

Создание и получение CompletableFuture

Хорошо, так что создание CompletableFuture вручную — это наш единственный вариант? Не совсем. Как и в случае с обычными Future мы можем обернуть существующую задачу CompletableFuture используя следующее семейство фабричных методов:

1
2
3
4
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

Методы, которые не принимают Executor в качестве аргумента, но заканчиваются ...Async будет использовать ForkJoinPool.commonPool() (глобальный пул общего назначения, представленный в JDK 8). Это относится к большинству методов класса CompletableFuture . runAsync() прост для понимания, обратите внимание, что он принимает Runnable , поэтому он возвращает CompletableFuture<Void> поскольку Runnable ничего не возвращает. Если вам нужно что-то обработать асинхронно и вернуть результат, используйте Supplier<U> :

1
2
3
4
5
6
7
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        //...long running...
        return "42";
    }
}, executor);

Но эй, у нас есть лямбды в Java 8!

1
2
3
4
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    //...long running...
    return "42";
}, executor);

или даже:

1
2
final CompletableFuture<String> future =
    CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);

Эта статья не о проекте Lambda, но я буду использовать лямбды довольно широко.

Преобразование и действие на одном CompletableFuture (затем thenApply )

Поэтому я сказал, что CompletableFuture превосходит Future но вы еще не видели почему? Проще говоря, это потому, что CompletableFuture является монадой и функтором. Не помогает, я думаю? И Scala, и JavaScript позволяют регистрировать асинхронные обратные вызовы, когда будущее завершено. Нам не нужно ждать и блокировать, пока он не будет готов. Мы можем просто сказать: запустить эту функцию для результата, когда он прибудет . Более того, мы можем сложить такие функции, объединить несколько фьючерсов вместе и т. Д. Например, если у нас есть функция от String до Integer мы можем превратить CompletableFuture<String> в CompletableFuture<Integer не разворачивая ее. Это достигается с thenApply() семейства методов thenApply() :

1
2
3
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

Как указывалось ранее ...Async версии предоставляются для большинства операций над CompletableFuture поэтому я пропущу их в следующих разделах. Просто помните, что первый метод будет применять функцию в том же потоке, в котором завершено будущее, а остальные два будут применять его асинхронно в другом пуле потоков.

Давайте посмотрим, как работает thenApply() :

1
2
3
CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);

Или в одном утверждении:

1
2
CompletableFuture<Double> f3 =
    f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

Вы видите последовательность преобразований здесь. От String до Integer а затем до Double . Но что самое важное, эти преобразования не выполняются ни немедленно, ни блокируются. Они просто запоминаются, и когда оригинальный f1 завершается, они выполняются для вас. Если некоторые преобразования отнимают много времени, вы можете предоставить своего собственного Executor для их асинхронного запуска. Обратите внимание, что эта операция эквивалентна монадической map в Scala.

Запуск кода по завершении ( thenAccept / thenRun )

1
2
CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);

Эти два метода являются типичными «заключительными» этапами в будущем конвейере. Они позволяют вам потреблять будущую стоимость, когда она будет готова. thenAccept() как thenAccept() предоставляет окончательное значение, thenRun выполняет Runnable который даже не имеет доступа к вычисленному значению. Пример:

1
2
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");

...Async варианты доступны также для обоих методов, с неявным и явным исполнителем. Я не могу подчеркнуть это достаточно:
thenAccept() / thenRun() не блокируются (даже без явного executor ). Обращайтесь с ними как с прослушивателем / обработчиком событий, который вы присоединяете к будущему и который будет выполняться в будущем. Сообщение "Continuing" появится немедленно, даже если future еще не близко к завершению.

Обработка ошибок одного CompletableFuture

Пока мы говорили только о результате вычислений. Но как насчет исключений? Можем ли мы справиться с ними асинхронно? Конечно!

1
2
CompletableFuture<String> safe =
    future.exceptionally(ex -> "We have a problem: " + ex.getMessage());

exceptionally() принимает функцию, которая будет вызываться, когда исходное будущее генерирует исключение. Затем у нас есть возможность восстановиться, преобразовав это исключение в некоторое значение, совместимое с типом Future . Дальнейшие преобразования safe больше не приведут к исключению, а вместо этого String вернется из предоставленной функции.

Более гибкий подход — handle() который принимает функцию, получающую либо правильный результат, либо исключение:

1
2
3
4
5
6
7
8
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
    if (ok != null) {
        return Integer.parseInt(ok);
    } else {
        log.warn("Problem", ex);
        return -1;
    }
});

handle() вызывается всегда, а аргумент result или исключение не равен null . Это универсальная стратегия.

Объединяя два CompletableFuture вместе

Асинхронная обработка одного CompletableFuture хороша, но она действительно показывает свою мощь, когда несколько таких фьючерсов объединяются различными способами.

Объединение (связывание) двух фьючерсов ( thenCompose() )

Иногда вы хотите запустить какую-то функцию в будущем (когда она будет готова). Но эта функция также возвращает будущее. CompletableFuture должен быть достаточно умным, чтобы понимать, что результат нашей функции теперь должен использоваться как будущее верхнего уровня, в отличие от CompletableFuture<CompletableFuture<T>> . Метод thenCompose() , таким образом, эквивалентен flatMap в Scala:

1
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);

...Async Доступны также ...Async варианты. В приведенном ниже примере внимательно посмотрите на типы и разницу между thenApply() ( map ) и thenCompose() ( flatMap ) при применении функции flatMap calculateRelevance() возвращающей CompletableFuture<Double> :

01
02
03
04
05
06
07
08
09
10
11
CompletableFuture<Document> docFuture = //...
  
CompletableFuture<CompletableFuture<Double>> f =
    docFuture.thenApply(this::calculateRelevance);
  
CompletableFuture<Double> relevanceFuture =
    docFuture.thenCompose(this::calculateRelevance);
  
//...
  
private CompletableFuture<Double> calculateRelevance(Document doc)  //...

thenCompose() — это важный метод, который позволяет создавать надежные асинхронные конвейеры, не блокируя и не ожидая промежуточных шагов.

Преобразование значений двух фьючерсов ( thenCombine() )

thenCompose() как thenCompose() используется для thenCompose() одного будущего, зависящего от другого, thenCombine когда оба они завершены, thenCombine объединяет два независимых будущего:

1
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

...Async Доступны также ...Async варианты. Представьте, что у вас есть два CompletableFuture , один, который загружает Customer а другой загружает ближайший Shop . Они полностью независимы друг от друга, но когда оба они завершены, вы хотите использовать их значения для расчета Route . Вот раздетый пример:

1
2
3
4
5
6
7
8
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture =
    customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
  
//...
  
private Route findRoute(Customer customer, Shop shop) //...

Обратите внимание, что в Java 8 вы можете заменить (cust, shop) -> findRoute(cust, shop) на простую this::findRoute метод this::findRoute :

1
customerFuture.thenCombine(shopFuture, this::findRoute);

Итак, вы поняли. У нас есть customerFuture и shopFuture . Затем routeFuture оборачивает их и «ждет» завершения обоих. Когда они оба готовы, запускается наша предоставленная функция, которая объединяет результаты ( findRoute() ). Таким образом, routeFuture завершится, когда два базовых фьючерса будут разрешены, и findRoute() завершен.

Ожидание завершения обоих CompletableFuture

Если вместо создания нового CompletableFuture объединяющего оба результата, мы просто хотим получить уведомление об их завершении, мы можем использовать семейство методов thenAcceptBoth() / runAfterBoth() ( ...Async также доступны ...Async варианты). Они работают аналогично thenAccept() и thenRun() но ждут два будущего вместо одного:

1
2
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)

Представьте себе, что в приведенном выше примере вместо создания нового CompletableFuture<Route> вы просто хотите немедленно отправить событие или обновить графический интерфейс. Это может быть легко достигнуто с помощью thenAcceptBoth() :

1
2
3
4
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
    final Route route = findRoute(cust, shop);
    //refresh GUI with route
});

Надеюсь, я ошибаюсь, но, возможно, некоторые из вас задают себе вопрос: почему я не могу просто заблокировать эти два будущего? Как здесь:

1
2
3
Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());

Ну, конечно, вы можете. Но весь смысл CompletableFuture заключается в том, чтобы позволить асинхронную, управляемую событиями модель программирования вместо блокировки и нетерпеливого ожидания результата. Таким образом, функционально два приведенных выше фрагмента кода эквивалентны, но последний излишне занимает один поток выполнения.

Ожидание первого завершения CompletableFuture

Еще одной интересной частью API CompletableFuture является возможность ожидания первого (в отличие от всех ) завершенного будущего. Это может пригодиться, когда у вас есть две задачи, дающие результат одного типа, и вы заботитесь только о времени отклика, а не о том, какая задача была получена первой. Методы API ( ...Async Доступны также ...Async варианты):

1
2
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block)
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)

В качестве примера скажем, что у вас есть две системы, с которыми вы интегрируетесь. Один имеет меньшее среднее время отклика, но высокое стандартное отклонение. Другой в целом медленнее, но более предсказуем. Чтобы взять лучшее из обоих миров (производительность и предсказуемость), вы вызываете обе системы одновременно и ждете завершения первой. Обычно это будет первый, но если он стал медленным, второй завершается в приемлемое время:

1
2
3
4
5
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
    System.out.println("Result: " + s);
});

s представляет String ответ либо из fetchFast() либо из fetchPredictably() . Мы не знаем и не заботимся.

Преобразование сначала завершено

applyToEither() является старшим братом acceptEither() . В то время как последний просто вызывает какой-то фрагмент кода, когда завершается более быстрое из двух applyToEither() , applyToEither() вернет новое будущее. Это будущее завершится, когда завершится первый из двух базовых фьючерсов. API немного похож (возможны также ...Async варианты):

1
<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)

Дополнительная функция fn вызывается в результате первого завершенного будущего. Я не совсем уверен, какова цель такого специализированного метода, в конце концов можно просто использовать: fast.applyToEither(predictable).thenApply(fn) . Поскольку мы застряли с этим API, но на самом деле нам не нужно дополнительное приложение-функция, я просто буду использовать заполнитель Function.identity() :

1
2
3
4
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone =
    fast.applyToEither(predictable, Function.<String>identity());

firstDone будущее может быть передано. Обратите внимание, что с точки зрения клиента скрыт тот факт, что на самом деле за firstDone стоят два фьючерса. Клиент просто ожидает завершения в будущем, а applyToEither() позаботится о том, чтобы уведомить клиента, когда любой из двух завершится первым.

Объединение нескольких CompletableFuture вместе

Итак, теперь мы знаем, как ждать завершения двух thenCombine() (используя thenCombine() ) и завершения первого ( applyToEither() ). Но может ли оно масштабироваться до произвольного числа фьючерсов? Конечно, используя static вспомогательные методы:

1
2
static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs)
static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)

allOf() принимает массив фьючерсов и возвращает будущее, которое завершается, когда все базовые фьючерсы завершены (барьер ожидает всех). anyOf() с другой стороны, будет ожидать только самого быстрого базового фьючерса. Пожалуйста, посмотрите на общий тип возвращаемых фьючерсов. Не совсем то, что вы ожидаете? Мы позаботимся об этом в следующей статье.

Резюме

Мы исследовали почти весь API CompletableFuture . Я уверен, что это было довольно обременительно, поэтому в следующей статье мы вскоре разработаем еще одну реализацию простой программы для веб-сканирования, используя преимущества функций CompletableFuture и лямбда-выражений Java 8. Мы также рассмотрим недостатки и недостатки CompletableFuture .

Ссылка: Java 8: исчерпывающее руководство по CompletableFuture от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей .