CompletableFuture
— все еще относительно свежая концепция, несмотря на то, что она была введена почти два года назад (!) В марте 2014 года в Java 8. Но, возможно, хорошо, что этот класс не так хорошо известен, так как им можно легко злоупотреблять, особенно в отношении потоков и пулы потоков, которые участвуют в пути. Эта статья призвана описать, как потоки используются с CompletableFuture
.
Запуск задач
Это фундаментальная часть API. Существует удобный supplyAsync()
который похож на ExecutorService.submit()
, но возвращает CompletableFuture
:
1
2
3
4
5
6
7
8
9
|
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { log.info( "Downloading" ); return IOUtils.toString(is, StandardCharsets.UTF_8); } catch (IOException e) { throw new RuntimeException(e); } }); |
Проблема в том, что supplyAsync()
по умолчанию использует ForkJoinPool.commonPool()
, пул потоков, совместно используемый всеми CompletableFuture
, всеми параллельными потоками и всеми приложениями, развернутыми на одной и той же JVM (если вам не удастся по-прежнему использовать сервер приложений со многими развернутыми артефактами) , Этот жестко запрограммированный, неконфигурируемый пул потоков находится вне нашего контроля, его трудно контролировать и масштабировать. Поэтому вы всегда должны указывать своего собственного Executor
, как здесь (и взглянуть на мои несколько советов, как его создать ):
1
2
3
4
5
6
|
ExecutorService pool = Executors.newFixedThreadPool( 10 ); final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { //... }, pool); |
Но это только начало …
Обратные вызовы и преобразования
Предположим, что вы хотите преобразовать данное CompletableFuture
, например, извлечь длину String
:
1
2
|
CompletableFuture<Integer> intFuture = future.thenApply(s -> s.length()); |
Кто именно вызывает код s.length()
? Честно говоря, мои дорогие разработчики, нам наплевать [1] . Пока лямбда-выражение внутри всех операторов, таких как thenApply
, дешево, нам все равно, кто его называет. Но что, если это выражение занимает немного процессорного времени для завершения или выполняет блокирующий сетевой вызов?
Прежде всего, что происходит по умолчанию? Подумайте об этом: у нас есть фоновая задача типа String
и мы хотим применить некоторое конкретное преобразование асинхронно, когда это значение завершится. Самый простой способ реализовать это — обернуть исходную задачу (вернуть String
) и перехватить ее, когда она завершится. Когда внутренняя задача заканчивается, наш обратный вызов включается, применяет преобразование и возвращает измененное значение. Это как аспект, который находится между нашим кодом и исходным результатом вычислений. При этом должно быть совершенно очевидно, что преобразование s.length()
будет выполняться в том же потоке, что и исходная задача, а? Не совсем!
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { sleepSeconds( 2 ); return "ABC" ; }, pool); future.thenApply(s -> { log.info( "First transformation" ); return s.length(); }); future.get(); pool.shutdownNow(); pool.awaitTermination( 1 , TimeUnit.MINUTES); future.thenApply(s -> { log.info( "Second transformation" ); return s.length(); }); |
Первое преобразование в thenApply()
регистрируется, пока задача еще выполняется. Таким образом, он будет выполнен сразу после завершения задачи в том же потоке, что и задача. Однако, прежде чем регистрировать второе преобразование, мы ждем, пока задача фактически завершится. Хуже того, мы полностью закрываем пул потоков, чтобы гарантировать, что никакой другой код там не может быть выполнен. Так в каком потоке будет запущено второе преобразование? Мы знаем, что это должно произойти немедленно, поскольку в future
мы регистрируем обратный вызов уже завершенным Оказывается, по умолчанию используется клиентский поток (!)! Вывод следующий:
1
|
pool-1-thread-1 | First transformation main | Second transformation |
Второе преобразование, когда оно зарегистрировано, понимает, что CompletableFuture
уже завершено, поэтому оно выполняет преобразование немедленно. Вокруг нет другого потока, поэтому thenApply()
вызывается в контексте текущего main
потока. Основная причина, почему это поведение склонно к ошибкам, проявляется, когда фактическое преобразование является дорогостоящим. Представьте себе лямбда-выражение внутри thenApply()
выполняющее тяжелые вычисления или блокирующее сетевой вызов. Внезапно наш асинхронный CompletableFuture
блокирует вызывающий поток!
Управление пулом потоков обратного вызова
Есть два метода, чтобы контролировать, какой поток выполняет наши обратные вызовы и преобразования. Обратите внимание, что эти решения необходимы только в том случае, если ваши преобразования являются дорогостоящими. В противном случае разница незначительна. Итак, прежде всего мы можем выбрать *Async
версии операторов, например:
1
2
3
4
|
future.thenApplyAsync(s -> { log.info( "Second transformation" ); return s.length(); }); |
На этот раз второе преобразование было автоматически ForkJoinPool.commonPool()
нашему другу, ForkJoinPool.commonPool()
:
1
2
|
pool- 1 -thread- 1 | First transformation ForkJoinPool.commonPool-worker- 1 | Second transformation |
Но нам не нравится commonPool
поэтому мы поставляем свои собственные:
1
2
3
4
|
future.thenApplyAsync(s -> { log.info( "Second transformation" ); return s.length(); }, pool2); |
Обратите внимание, что был использован другой пул потоков ( pool-1
против pool-2
):
1
2
|
pool- 1 -thread- 1 | First transformation pool- 2 -thread- 1 | Second transformation |
Рассматривать обратный вызов как еще один шаг вычислений
Но я считаю, что если у вас возникают проблемы с длительными обратными вызовами и преобразованиями (помните, что эта статья относится почти ко всем другим методам в CompletableFuture
), вам следует просто использовать другой явный CompletableFuture
, как здесь:
01
02
03
04
05
06
07
08
09
10
11
|
//Imagine this is slow and costly CompletableFuture<Integer> strLen(String s) { return CompletableFuture.supplyAsync( () -> s.length(), pool2); } //... CompletableFuture<Integer> intFuture = future.thenCompose(s -> strLen(s)); |
Этот подход более явный. Зная, что наше преобразование имеет значительную стоимость, мы не рискуем запустить его в каком-либо произвольном или неконтролируемом потоке. Вместо этого мы явно моделируем его как асинхронную операцию от String
до CompletableFuture<Integer>
. Однако мы должны заменить thenApply()
на thenCompose()
, в противном случае мы получим CompletableFuture<CompletableFuture<Integer>>
.
Но что, если у нашего преобразования нет версии, которая хорошо работает с вложенным CompletableFuture
, например applyToEither()
которая ожидает завершения первого Future
и применяет преобразование?
1
2
|
CompletableFuture<CompletableFuture<Integer>> poor = future1.applyToEither(future2, s -> strLen(s)); |
Существует удобный прием для «разворачивания» такой непонятной структуры данных, называемой flatten
, которая легко реализуется с помощью flatMap(identity)
(или flatMap(x -> x)
). В нашем случае flatMap()
называется thenCompose
( duh! ):
1
2
|
CompletableFuture<Integer> good = poor.thenCompose(x -> x); |
Я оставляю это на ваше усмотрение, как и почему это работает. Я надеюсь, что эта статья прояснила, как потоки участвуют в CompletableFuture
.
Ссылка: | Какой поток выполняет задачи и обратные вызовы CompletableFuture? от нашего партнера JCG Томаша Нуркевича в блоге о Java и соседстве . |