Статьи

Java 8: CompletableFuture в действии

После тщательного изучения API CompletableFuture в Java 8 мы готовы написать упрощенный веб-сканер. Мы решили подобную проблему уже с помощью ExecutorCompletionService , Guava ListenableFuture и Scala / Akka . Я выбираю ту же проблему, чтобы было легко сравнивать подходы и методы реализации.

Сначала мы определим простой блокирующий метод для загрузки содержимого одного URL:

01
02
03
04
05
06
07
08
09
10
private String downloadSite(final String site) {
    try {
        log.debug("Downloading {}", site);
        final String res = IOUtils.toString(new URL("http://" + site), UTF_8);
        log.debug("Done {}", site);
        return res;
    } catch (IOException e) {
        throw Throwables.propagate(e);
    }
}

Ничего фантастического. Этот метод будет позже вызываться для разных сайтов в пуле потоков. Другой метод анализирует String в XML- Document (позвольте мне пропустить реализацию, никто не хочет смотреть на нее):

1
private Document parse(String xml)  //...

Наконец, ядро ​​нашего алгоритма — функция вычисления релевантности каждого веб-сайта, принимающая Document качестве входных данных. Как и выше, мы не заботимся о реализации, важна только подпись:

1
private CompletableFuture<Double> calculateRelevance(Document doc) //...

Давайте сложим все кусочки вместе. Имея список веб-сайтов, наш сканер начинает загружать содержимое каждого веб-сайта асинхронно и одновременно. Затем каждая загруженная HTML-строка будет проанализирована в XML- Document а затем будет вычислена релевантность . В качестве последнего шага мы берем все вычисленные метрики релевантности и находим самый большой. Это звучит довольно просто в тот момент, когда вы понимаете, что загрузка контента и актуальность вычислений асинхронна (возвращает CompletableFuture ), и мы определенно не хотим блокировать или заняты ожиданием. Вот первая часть:

01
02
03
04
05
06
07
08
09
10
11
ExecutorService executor = Executors.newFixedThreadPool(4);
  
List<String> topSites = Arrays.asList(
        "www.google.com", "www.youtube.com", "www.yahoo.com", "www.msn.com"
);
  
List<CompletableFuture<Double>> relevanceFutures = topSites.stream().
        map(site -> CompletableFuture.supplyAsync(() -> downloadSite(site), executor)).
        map(contentFuture -> contentFuture.thenApply(this::parse)).
        map(docFuture -> docFuture.thenCompose(this::calculateRelevance)).
        collect(Collectors.<CompletableFuture<Double>>toList());

Здесь на самом деле много чего происходит. Определение пула потоков и сайтов для обхода очевидно. Но есть это цепное выражение для вычисления relevanceFutures . Последовательность map() и collect() в конце довольно наглядна. Начиная со списка веб-сайтов, мы преобразуем каждый сайт ( String ) в CompletableFuture<String> , отправляя асинхронную задачу ( downloadSite() ) в пул потоков.

Итак, у нас есть список CompletableFuture<String> . Мы продолжаем преобразовывать его, на этот раз применяя метод parse() к каждому из них. Помните, что thenApply() будет вызывать предоставленную лямбду, когда базовое будущее завершается, и немедленно возвращает CompletableFuture<Document> . Третий и последний шаг преобразования создает каждый CompletableFuture<Document> во входном списке с помощью calculateRelevance() . Обратите внимание, что thenCompose() calculateRelevance() возвращает значение CompletableFuture<Double> вместо Double , поэтому мы используем thenCompose() вместо thenApply() . После этого множества этапов мы наконец collect() список CompletableFuture<Double> .

Теперь мы хотели бы выполнить некоторые вычисления для всех результатов. У нас есть список фьючерсов, и мы хотели бы знать, когда все они (последние) завершены. Конечно, мы можем зарегистрировать обратный вызов завершения в каждом будущем и использовать CountDownLatch для блокировки до тех пор, пока не будут вызваны все обратные вызовы. Я слишком ленив для этого, давайте использовать существующий CompletableFuture.allOf() . К сожалению, у него есть два незначительных недостатка: вместо varagg используется vararg и не возвращается будущее агрегированных результатов, а Void вместо этого. Под агрегированными результатами я подразумеваю: если мы предоставляем List<CompletableFuture<Double>> такой метод должен возвращать CompletableFuture<List<Double>> , а не CompletableFuture<Void> ! К счастью, это легко исправить с помощью небольшого кода:

1
2
3
4
5
6
7
8
9
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    CompletableFuture<Void> allDoneFuture =
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    return allDoneFuture.thenApply(v ->
            futures.stream().
                    map(future -> future.join()).
                    collect(Collectors.<T>toList())
    );
}

Внимательно sequence() аргументом sequence() и возвращаемым типом. Реализация на удивление проста, уловка состоит в том, чтобы использовать существующую allOf() но когда allDoneFuture завершает работу (что означает, что все базовые фьючерсы сделаны), просто переберите все фьючерсы и join() (блокируя ожидание) для каждого. Однако этот звонок гарантированно не блокируется, потому что к настоящему времени все фьючерсы завершены! Оснащенный таким утилитарным методом, мы можем наконец выполнить нашу задачу:

1
2
3
4
5
6
CompletableFuture<List<Double>> allDone = sequence(relevanceFutures);
CompletableFuture<OptionalDouble> maxRelevance = allDone.thenApply(relevances ->
        relevances.stream().
                mapToDouble(Double::valueOf).
                max()
);

Это легко — когда allDone завершает работу, применяем нашу функцию, которая считает максимальную релевантность во всем наборе. maxRelevance — это еще будущее. К тому времени, когда ваша JVM достигнет этой линии, вероятно, ни один из веб-сайтов еще не загружен. Но мы инкапсулировали бизнес-логику поверх фьючерсов, составляя их в виде событий. Код остается читаемым (версия без лямбды и с обычными Future s будет как минимум вдвое длиннее), но избегает блокировки основного потока. Конечно, allDone может быть промежуточным этапом, мы можем и дальше его трансформировать, пока не получив результата.

Упущения

CompletableFuture в Java 8 — огромный шаг вперед. От крошечной, тонкой абстракции над асинхронной задачей до полноценной, функциональной, многофункциональной утилиты. Однако после нескольких дней игры я обнаружил несколько незначительных недостатков:

  • CompletableFuture.allOf() возвращает CompletableFuture<Void> обсуждался ранее. Я думаю, что было бы справедливо сказать, что, если я пропущу коллекцию фьючерсов и захочу подождать их всех, я также хотел бы извлечь результаты, когда они легко появятся. Еще хуже с CompletableFuture.anyOf() . Если я жду, пока не завершится какое- либо из фьючерсов, я не могу представить, чтобы проходили фьючерсы разных типов, например, CompletableFuture<Car> и CompletableFuture<Restaurant> . Если мне все равно, какой из них завершается первым, как я должен обрабатывать возвращаемый тип? Обычно вы передаете коллекцию однородных фьючерсов (например, CompletableFuture<Car> ), а затем anyOf() может просто вернуть будущее этого типа (вместо CompletableFuture<Void> снова).
  • Смешивание настраиваемых и прослушиваемых абстракций. В Guava есть ListenableFuture и SettableFuture расширяющие его. ListenableFuture позволяет регистрировать обратные вызовы, а SettableFuture добавляет возможность установить значение будущего (разрешить его) из произвольного потока и контекста. CompletableFuture эквивалентна SettableFuture но не существует ограниченной версии, эквивалентной ListenableFuture . Почему это проблема? Если API возвращает CompletableFuture а затем два потока ждут его завершения (в этом нет ничего плохого), один из этих потоков может разрешить это будущее и пробудить другой поток, в то время как это должна делать только реализация API. Но когда API пытается разрешить будущее позже, вызов complete() игнорируется. Это может привести к действительно неприятным ошибкам, которых можно избежать в Гуаве, разделив эти две обязанности.
  • CompletableFuture игнорируется в JDK. ExecutorService не была модифицирована для возврата CompletableFuture . Буквально CompletableFuture нигде не упоминается в JDK. Это действительно полезный класс, обратно совместимый с Future , но не продвигаемый в стандартной библиотеке.
  • Раздутый API (?) Всего пятьдесят методов, большинство в трех вариантах. Разделение settable и listenable (см. Выше) поможет. Также некоторые методы, такие как runAfterBoth() или runAfterEither() IMHO, на самом деле не принадлежат ни одному CompletableFuture . Есть ли разница между fast.runAfterBoth(predictable, ...) и predictable.runAfterBoth(fast, ...) ? Нет, но API предпочитает одно или другое. На самом деле я считаю, что runAfterBoth(fast, predictable, ...) гораздо лучше выражает мои намерения.
  • CompletableFuture.getNow(T) должен принимать Supplier<T> вместо необработанной ссылки. В приведенном ниже примере expensiveAlternative() всегда является кодом, независимо от того, закончилось ли будущее или нет:
    1
    future.getNow(expensiveAlternative());

    Однако мы можем легко настроить это поведение (я знаю, что здесь есть небольшое условие гонки, но оригинальный getNow () также работает так):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static <T> T getNow(
                CompletableFuture<T> future,
                Supplier<T> valueIfAbsent) throws ExecutionException, InterruptedException {
        if (future.isDone()) {
            return future.get();
        } else {
            return valueIfAbsent.get();
        }
    }

    С помощью этого служебного метода мы можем избежать вызова expensiveAlternative() когда это не нужно:

    1
    2
    3
    getNow(future, () -> expensiveAlternative());
    //or:
    getNow(future, this::expensiveAlternative);

В целом CompletableFuture — это замечательный новый инструмент в нашем поясе JDK. Незначительные проблемы API и иногда слишком подробный синтаксис из-за ограниченного вывода типов не должны мешать вам использовать его. По крайней мере, это прочная основа для улучшения абстракций и более надежного кода.

Ссылка: Java 8: CompletableFuture в действии от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей .