После тщательного изучения API CompletableFuture
в Java 8 мы готовы написать упрощенный веб-сканер. Мы решили подобную проблему уже с помощью ExecutorCompletionService
, Guava ListenableFuture
и Scala / Akka . Я выбираю ту же проблему, чтобы было легко сравнивать подходы и методы реализации.
Сначала мы определим простой блокирующий метод для загрузки содержимого одного URL:
01
02
03
04
05
06
07
08
09
10
|
private String downloadSite( final String site) { try { log.debug( "Downloading {}" , site); log.debug( "Done {}" , site); return res; } catch (IOException e) { throw Throwables.propagate(e); } } |
Ничего фантастического. Этот метод будет позже вызываться для разных сайтов в пуле потоков. Другой метод анализирует String
в XML- Document
(позвольте мне пропустить реализацию, никто не хочет смотреть на нее):
1
|
private Document parse(String xml) //... |
Наконец, ядро нашего алгоритма — функция вычисления релевантности каждого веб-сайта, принимающая Document
качестве входных данных. Как и выше, мы не заботимся о реализации, важна только подпись:
1
|
private CompletableFuture<Double> calculateRelevance(Document doc) //... |
Давайте сложим все кусочки вместе. Имея список веб-сайтов, наш сканер начинает загружать содержимое каждого веб-сайта асинхронно и одновременно. Затем каждая загруженная HTML-строка будет проанализирована в XML- Document
а затем будет вычислена релевантность . В качестве последнего шага мы берем все вычисленные метрики релевантности и находим самый большой. Это звучит довольно просто в тот момент, когда вы понимаете, что загрузка контента и актуальность вычислений асинхронна (возвращает CompletableFuture
), и мы определенно не хотим блокировать или заняты ожиданием. Вот первая часть:
01
02
03
04
05
06
07
08
09
10
11
|
ExecutorService executor = Executors.newFixedThreadPool( 4 ); List<String> topSites = Arrays.asList( "www.google.com" , "www.youtube.com" , "www.yahoo.com" , "www.msn.com" ); List<CompletableFuture<Double>> relevanceFutures = topSites.stream(). map(site -> CompletableFuture.supplyAsync(() -> downloadSite(site), executor)). map(contentFuture -> contentFuture.thenApply( this ::parse)). map(docFuture -> docFuture.thenCompose( this ::calculateRelevance)). collect(Collectors.<CompletableFuture<Double>>toList()); |
Здесь на самом деле много чего происходит. Определение пула потоков и сайтов для обхода очевидно. Но есть это цепное выражение для вычисления relevanceFutures
. Последовательность map()
и collect()
в конце довольно наглядна. Начиная со списка веб-сайтов, мы преобразуем каждый сайт ( String
) в CompletableFuture<String>
, отправляя асинхронную задачу ( downloadSite()
) в пул потоков.
Итак, у нас есть список CompletableFuture<String>
. Мы продолжаем преобразовывать его, на этот раз применяя метод parse()
к каждому из них. Помните, что thenApply()
будет вызывать предоставленную лямбду, когда базовое будущее завершается, и немедленно возвращает CompletableFuture<Document>
. Третий и последний шаг преобразования создает каждый CompletableFuture<Document>
во входном списке с помощью calculateRelevance()
. Обратите внимание, что thenCompose()
calculateRelevance()
возвращает значение CompletableFuture<Double>
вместо Double
, поэтому мы используем thenCompose()
вместо thenApply()
. После этого множества этапов мы наконец collect()
список CompletableFuture<Double>
.
Теперь мы хотели бы выполнить некоторые вычисления для всех результатов. У нас есть список фьючерсов, и мы хотели бы знать, когда все они (последние) завершены. Конечно, мы можем зарегистрировать обратный вызов завершения в каждом будущем и использовать CountDownLatch
для блокировки до тех пор, пока не будут вызваны все обратные вызовы. Я слишком ленив для этого, давайте использовать существующий CompletableFuture.allOf()
. К сожалению, у него есть два незначительных недостатка: вместо varagg используется vararg и не возвращается будущее агрегированных результатов, а Void
вместо этого. Под агрегированными результатами я подразумеваю: если мы предоставляем List<CompletableFuture<Double>>
такой метод должен возвращать CompletableFuture<List<Double>>
, а не CompletableFuture<Void>
! К счастью, это легко исправить с помощью небольшого кода:
1
2
3
4
5
6
7
8
9
|
private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray( new CompletableFuture[futures.size()])); return allDoneFuture.thenApply(v -> futures.stream(). map(future -> future.join()). collect(Collectors.<T>toList()) ); } |
Внимательно sequence()
аргументом sequence()
и возвращаемым типом. Реализация на удивление проста, уловка состоит в том, чтобы использовать существующую allOf()
но когда allDoneFuture
завершает работу (что означает, что все базовые фьючерсы сделаны), просто переберите все фьючерсы и join()
(блокируя ожидание) для каждого. Однако этот звонок гарантированно не блокируется, потому что к настоящему времени все фьючерсы завершены! Оснащенный таким утилитарным методом, мы можем наконец выполнить нашу задачу:
1
2
3
4
5
6
|
CompletableFuture<List<Double>> allDone = sequence(relevanceFutures); CompletableFuture<OptionalDouble> maxRelevance = allDone.thenApply(relevances -> relevances.stream(). mapToDouble(Double::valueOf). max() ); |
Это легко — когда allDone
завершает работу, применяем нашу функцию, которая считает максимальную релевантность во всем наборе. maxRelevance
— это еще будущее. К тому времени, когда ваша JVM достигнет этой линии, вероятно, ни один из веб-сайтов еще не загружен. Но мы инкапсулировали бизнес-логику поверх фьючерсов, составляя их в виде событий. Код остается читаемым (версия без лямбды и с обычными Future
s будет как минимум вдвое длиннее), но избегает блокировки основного потока. Конечно, allDone
может быть промежуточным этапом, мы можем и дальше его трансформировать, пока не получив результата.
Упущения
CompletableFuture
в Java 8 — огромный шаг вперед. От крошечной, тонкой абстракции над асинхронной задачей до полноценной, функциональной, многофункциональной утилиты. Однако после нескольких дней игры я обнаружил несколько незначительных недостатков:
-
CompletableFuture.allOf()
возвращаетCompletableFuture<Void>
обсуждался ранее. Я думаю, что было бы справедливо сказать, что, если я пропущу коллекцию фьючерсов и захочу подождать их всех, я также хотел бы извлечь результаты, когда они легко появятся. Еще хуже сCompletableFuture.anyOf()
. Если я жду, пока не завершится какое- либо из фьючерсов, я не могу представить, чтобы проходили фьючерсы разных типов, например,CompletableFuture<Car>
иCompletableFuture<Restaurant>
. Если мне все равно, какой из них завершается первым, как я должен обрабатывать возвращаемый тип? Обычно вы передаете коллекцию однородных фьючерсов (например,CompletableFuture<Car>
), а затемanyOf()
может просто вернуть будущее этого типа (вместоCompletableFuture<Void>
снова). - Смешивание настраиваемых и прослушиваемых абстракций. В Guava есть
ListenableFuture
иSettableFuture
расширяющие его.ListenableFuture
позволяет регистрировать обратные вызовы, аSettableFuture
добавляет возможность установить значение будущего (разрешить его) из произвольного потока и контекста.CompletableFuture
эквивалентнаSettableFuture
но не существует ограниченной версии, эквивалентнойListenableFuture
. Почему это проблема? Если API возвращаетCompletableFuture
а затем два потока ждут его завершения (в этом нет ничего плохого), один из этих потоков может разрешить это будущее и пробудить другой поток, в то время как это должна делать только реализация API. Но когда API пытается разрешить будущее позже, вызовcomplete()
игнорируется. Это может привести к действительно неприятным ошибкам, которых можно избежать в Гуаве, разделив эти две обязанности. -
CompletableFuture
игнорируется в JDK.ExecutorService
не была модифицирована для возвратаCompletableFuture
. БуквальноCompletableFuture
нигде не упоминается в JDK. Это действительно полезный класс, обратно совместимый сFuture
, но не продвигаемый в стандартной библиотеке. - Раздутый API (?) Всего пятьдесят методов, большинство в трех вариантах. Разделение settable и listenable (см. Выше) поможет. Также некоторые методы, такие как
runAfterBoth()
илиrunAfterEither()
IMHO, на самом деле не принадлежат ни одномуCompletableFuture
. Есть ли разница междуfast.runAfterBoth(predictable, ...)
иpredictable.runAfterBoth(fast, ...)
? Нет, но API предпочитает одно или другое. На самом деле я считаю, чтоrunAfterBoth(fast, predictable, ...)
гораздо лучше выражает мои намерения. -
CompletableFuture.getNow(T)
должен приниматьSupplier<T>
вместо необработанной ссылки. В приведенном ниже примереexpensiveAlternative()
всегда является кодом, независимо от того, закончилось ли будущее или нет:1future.getNow(expensiveAlternative());
Однако мы можем легко настроить это поведение (я знаю, что здесь есть небольшое условие гонки, но оригинальный getNow () также работает так):
123456789public
static
<T> T getNow(
CompletableFuture<T> future,
Supplier<T> valueIfAbsent)
throws
ExecutionException, InterruptedException {
if
(future.isDone()) {
return
future.get();
}
else
{
return
valueIfAbsent.get();
}
}
С помощью этого служебного метода мы можем избежать вызова
expensiveAlternative()
когда это не нужно:123getNow(future, () -> expensiveAlternative());
//or:
getNow(future,
this
::expensiveAlternative);
В целом CompletableFuture
— это замечательный новый инструмент в нашем поясе JDK. Незначительные проблемы API и иногда слишком подробный синтаксис из-за ограниченного вывода типов не должны мешать вам использовать его. По крайней мере, это прочная основа для улучшения абстракций и более надежного кода.