После тщательного изучения API CompletableFuture в Java 8 мы готовы написать упрощенный веб-сканер. Мы решили подобную проблему уже с помощью ExecutorCompletionService , Guava ListenableFuture и Scala / Akka . Я выбираю ту же проблему, чтобы было легко сравнивать подходы и методы реализации.
Сначала мы определим простой блокирующий метод для загрузки содержимого одного URL:
|
01
02
03
04
05
06
07
08
09
10
|
private String downloadSite(final String site) { try { log.debug("Downloading {}", site); log.debug("Done {}", site); return res; } catch (IOException e) { throw Throwables.propagate(e); }} |
Ничего фантастического. Этот метод будет позже вызываться для разных сайтов в пуле потоков. Другой метод анализирует String в XML- Document (позвольте мне пропустить реализацию, никто не хочет смотреть на нее):
|
1
|
private Document parse(String xml) //... |
Наконец, ядро нашего алгоритма — функция вычисления релевантности каждого веб-сайта, принимающая Document качестве входных данных. Как и выше, мы не заботимся о реализации, важна только подпись:
|
1
|
private CompletableFuture<Double> calculateRelevance(Document doc) //... |
Давайте сложим все кусочки вместе. Имея список веб-сайтов, наш сканер начинает загружать содержимое каждого веб-сайта асинхронно и одновременно. Затем каждая загруженная HTML-строка будет проанализирована в XML- Document а затем будет вычислена релевантность . В качестве последнего шага мы берем все вычисленные метрики релевантности и находим самый большой. Это звучит довольно просто в тот момент, когда вы понимаете, что загрузка контента и актуальность вычислений асинхронна (возвращает CompletableFuture ), и мы определенно не хотим блокировать или заняты ожиданием. Вот первая часть:
|
01
02
03
04
05
06
07
08
09
10
11
|
ExecutorService executor = Executors.newFixedThreadPool(4); List<String> topSites = Arrays.asList( "www.google.com", "www.youtube.com", "www.yahoo.com", "www.msn.com"); List<CompletableFuture<Double>> relevanceFutures = topSites.stream(). map(site -> CompletableFuture.supplyAsync(() -> downloadSite(site), executor)). map(contentFuture -> contentFuture.thenApply(this::parse)). map(docFuture -> docFuture.thenCompose(this::calculateRelevance)). collect(Collectors.<CompletableFuture<Double>>toList()); |
Здесь на самом деле много чего происходит. Определение пула потоков и сайтов для обхода очевидно. Но есть это цепное выражение для вычисления relevanceFutures . Последовательность map() и collect() в конце довольно наглядна. Начиная со списка веб-сайтов, мы преобразуем каждый сайт ( String ) в CompletableFuture<String> , отправляя асинхронную задачу ( downloadSite() ) в пул потоков.
Итак, у нас есть список CompletableFuture<String> . Мы продолжаем преобразовывать его, на этот раз применяя метод parse() к каждому из них. Помните, что thenApply() будет вызывать предоставленную лямбду, когда базовое будущее завершается, и немедленно возвращает CompletableFuture<Document> . Третий и последний шаг преобразования создает каждый CompletableFuture<Document> во входном списке с помощью calculateRelevance() . Обратите внимание, что thenCompose() calculateRelevance() возвращает значение CompletableFuture<Double> вместо Double , поэтому мы используем thenCompose() вместо thenApply() . После этого множества этапов мы наконец collect() список CompletableFuture<Double> .
Теперь мы хотели бы выполнить некоторые вычисления для всех результатов. У нас есть список фьючерсов, и мы хотели бы знать, когда все они (последние) завершены. Конечно, мы можем зарегистрировать обратный вызов завершения в каждом будущем и использовать CountDownLatch для блокировки до тех пор, пока не будут вызваны все обратные вызовы. Я слишком ленив для этого, давайте использовать существующий CompletableFuture.allOf() . К сожалению, у него есть два незначительных недостатка: вместо varagg используется vararg и не возвращается будущее агрегированных результатов, а Void вместо этого. Под агрегированными результатами я подразумеваю: если мы предоставляем List<CompletableFuture<Double>> такой метод должен возвращать CompletableFuture<List<Double>> , а не CompletableFuture<Void> ! К счастью, это легко исправить с помощью небольшого кода:
|
1
2
3
4
5
6
7
8
9
|
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); return allDoneFuture.thenApply(v -> futures.stream(). map(future -> future.join()). collect(Collectors.<T>toList()) );} |
Внимательно sequence() аргументом sequence() и возвращаемым типом. Реализация на удивление проста, уловка состоит в том, чтобы использовать существующую allOf() но когда allDoneFuture завершает работу (что означает, что все базовые фьючерсы сделаны), просто переберите все фьючерсы и join() (блокируя ожидание) для каждого. Однако этот звонок гарантированно не блокируется, потому что к настоящему времени все фьючерсы завершены! Оснащенный таким утилитарным методом, мы можем наконец выполнить нашу задачу:
|
1
2
3
4
5
6
|
CompletableFuture<List<Double>> allDone = sequence(relevanceFutures);CompletableFuture<OptionalDouble> maxRelevance = allDone.thenApply(relevances -> relevances.stream(). mapToDouble(Double::valueOf). max()); |
Это легко — когда allDone завершает работу, применяем нашу функцию, которая считает максимальную релевантность во всем наборе. maxRelevance — это еще будущее. К тому времени, когда ваша JVM достигнет этой линии, вероятно, ни один из веб-сайтов еще не загружен. Но мы инкапсулировали бизнес-логику поверх фьючерсов, составляя их в виде событий. Код остается читаемым (версия без лямбды и с обычными Future s будет как минимум вдвое длиннее), но избегает блокировки основного потока. Конечно, allDone может быть промежуточным этапом, мы можем и дальше его трансформировать, пока не получив результата.
Упущения
CompletableFuture в Java 8 — огромный шаг вперед. От крошечной, тонкой абстракции над асинхронной задачей до полноценной, функциональной, многофункциональной утилиты. Однако после нескольких дней игры я обнаружил несколько незначительных недостатков:
-
CompletableFuture.allOf()возвращаетCompletableFuture<Void>обсуждался ранее. Я думаю, что было бы справедливо сказать, что, если я пропущу коллекцию фьючерсов и захочу подождать их всех, я также хотел бы извлечь результаты, когда они легко появятся. Еще хуже сCompletableFuture.anyOf(). Если я жду, пока не завершится какое- либо из фьючерсов, я не могу представить, чтобы проходили фьючерсы разных типов, например,CompletableFuture<Car>иCompletableFuture<Restaurant>. Если мне все равно, какой из них завершается первым, как я должен обрабатывать возвращаемый тип? Обычно вы передаете коллекцию однородных фьючерсов (например,CompletableFuture<Car>), а затемanyOf()может просто вернуть будущее этого типа (вместоCompletableFuture<Void>снова). - Смешивание настраиваемых и прослушиваемых абстракций. В Guava есть
ListenableFutureиSettableFutureрасширяющие его.ListenableFutureпозволяет регистрировать обратные вызовы, аSettableFutureдобавляет возможность установить значение будущего (разрешить его) из произвольного потока и контекста.CompletableFutureэквивалентнаSettableFutureно не существует ограниченной версии, эквивалентнойListenableFuture. Почему это проблема? Если API возвращаетCompletableFutureа затем два потока ждут его завершения (в этом нет ничего плохого), один из этих потоков может разрешить это будущее и пробудить другой поток, в то время как это должна делать только реализация API. Но когда API пытается разрешить будущее позже, вызовcomplete()игнорируется. Это может привести к действительно неприятным ошибкам, которых можно избежать в Гуаве, разделив эти две обязанности. -
CompletableFutureигнорируется в JDK.ExecutorServiceне была модифицирована для возвратаCompletableFuture. БуквальноCompletableFutureнигде не упоминается в JDK. Это действительно полезный класс, обратно совместимый сFuture, но не продвигаемый в стандартной библиотеке. - Раздутый API (?) Всего пятьдесят методов, большинство в трех вариантах. Разделение settable и listenable (см. Выше) поможет. Также некоторые методы, такие как
runAfterBoth()илиrunAfterEither()IMHO, на самом деле не принадлежат ни одномуCompletableFuture. Есть ли разница междуfast.runAfterBoth(predictable, ...)иpredictable.runAfterBoth(fast, ...)? Нет, но API предпочитает одно или другое. На самом деле я считаю, чтоrunAfterBoth(fast, predictable, ...)гораздо лучше выражает мои намерения. -
CompletableFuture.getNow(T)должен приниматьSupplier<T>вместо необработанной ссылки. В приведенном ниже примереexpensiveAlternative()всегда является кодом, независимо от того, закончилось ли будущее или нет:1future.getNow(expensiveAlternative());Однако мы можем легко настроить это поведение (я знаю, что здесь есть небольшое условие гонки, но оригинальный getNow () также работает так):
123456789publicstatic<T> T getNow(CompletableFuture<T> future,Supplier<T> valueIfAbsent)throwsExecutionException, InterruptedException {if(future.isDone()) {returnfuture.get();}else{returnvalueIfAbsent.get();}}С помощью этого служебного метода мы можем избежать вызова
expensiveAlternative()когда это не нужно:123getNow(future, () -> expensiveAlternative());//or:getNow(future,this::expensiveAlternative);
В целом CompletableFuture — это замечательный новый инструмент в нашем поясе JDK. Незначительные проблемы API и иногда слишком подробный синтаксис из-за ограниченного вывода типов не должны мешать вам использовать его. По крайней мере, это прочная основа для улучшения абстракций и более надежного кода.