Однажды я переписывал плохо реализованный многопоточный код, который в какой-то момент блокировал Future.get()
:
1
2
3
4
5
6
7
8
9
|
public void serve() throws InterruptedException, ExecutionException, TimeoutException { final Future<Response> responseFuture = asyncCode(); final Response response = responseFuture.get( 1 , SECONDS); send(response); } private void send(Response response) { //... } |
На самом деле это было приложение Akka, написанное на Java с пулом потоков в 1000 потоков (так!) — все они заблокированы при вызове get()
. В противном случае система не сможет успевать за количеством одновременных запросов. После рефакторинга мы избавились от всех этих потоков и представили только один, что значительно сократило использование памяти. Давайте немного упростим и покажем примеры в Java 8. Первым шагом является введение CompletableFuture
вместо простого Future
(см. Совет 9 ). Это просто, если:
- Вы управляете тем, как задачи передаются в
ExecutorService
: просто используйтеCompletableFuture.supplyAsync(..., executorService)
вместоexecutorService.submit(...)
- Вы имеете дело с API на основе обратного вызова: используйте обещания
В противном случае (если у вас уже есть блокирующий API или Future<T>
), будет заблокирован какой-то поток. Вот почему сейчас так много асинхронных API-интерфейсов. Допустим, мы каким-то образом переписали наш код для получения CompletableFuture
:
1
2
3
4
5
|
public void serve() throws InterruptedException, ExecutionException, TimeoutException { final CompletableFuture<Response> responseFuture = asyncCode(); final Response response = responseFuture.get( 1 , SECONDS); send(response); } |
Очевидно, что это ничего не исправляет, мы должны воспользоваться новым реактивным стилем программирования:
1
2
3
4
|
public void serve() { final CompletableFuture<Response> responseFuture = asyncCode(); responseFuture.thenAccept( this ::send); } |
Это функционально эквивалентно, но теперь serve()
должен выполняться в кратчайшие сроки (без блокировки или ожидания). Просто помните, что this::send
будет выполняться в том же потоке, который завершил responseFuture
. Если вы не хотите где-то перегружать какой-либо произвольный пул потоков, или send()
стоит дорого, рассмотрите для этого отдельный пул потоков: thenAcceptAsync(this::send, sendPool)
. Отлично, но мы потеряли два важных свойства: распространение ошибок и время ожидания. Распространение ошибок сложно, потому что мы изменили API. Когда метод serve()
завершается, асинхронные операции, вероятно, еще не завершены. Если вы заботитесь об исключениях, рассмотрите возможность возврата responseFuture
или какой-нибудь альтернативный механизм. Как минимум, зарегистрируйте исключение, потому что в противном случае оно будет проглочено:
1
2
3
4
5
|
final CompletableFuture<Response> responseFuture = asyncCode(); responseFuture.exceptionally(throwable -> { log.error( "Unrecoverable error" , throwable); return null ; }); |
Будьте осторожны с приведенным выше кодом: exceptionally()
пытается восстановить после сбоя, возвращая альтернативный результат. Это работает здесь, но если вы thenAccept()
exceptionally()
с помощью thenAccept()
то thenAccept()
send()
будет вызвана даже в случае сбоя, но с null
аргументом (или с тем, что мы возвращаем из exceptionally()
:
1
2
3
4
5
6
|
responseFuture .exceptionally(throwable -> { log.error( "Unrecoverable error" , throwable); return null ; }) .thenAccept( this ::send); //probably not what you think |
Проблема с потерянным 1-секундным таймаутом неуловима. Наш оригинальный код ожидал (блокировал) не более 1 секунды, пока Future
закончится. В противном случае TimeoutException
было выброшено. Мы потеряли эту функциональность, еще хуже юнит-тесты на тайм-ауты неудобны и часто пропускаются. Чтобы портировать тайм-ауты без ущерба для духа, управляемого событиями, нам нужен один дополнительный строительный блок: будущее, которое всегда терпит неудачу по истечении определенного времени:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
public static <T> CompletableFuture<T> failAfter(Duration duration) { final CompletableFuture<T> promise = new CompletableFuture<>(); scheduler.schedule(() -> { final TimeoutException ex = new TimeoutException( "Timeout after " + duration); return promise.completeExceptionally(ex); }, duration.toMillis(), MILLISECONDS); return promise; } private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 , new ThreadFactoryBuilder() .setDaemon( true ) .setNameFormat( "failAfter-%d" ) .build()); |
Это просто: мы создаем обещание (будущее без базовой задачи или пула потоков) и завершаем его с TimeoutException
после заданного java.time.Duration
. Если вы get()
такое будущее где-нибудь, TimeoutException
будет выброшено после блокировки, по крайней мере, в течение этого времени. На самом деле, это будет ExecutionException
оборачивающее TimeoutException
, никак не обойти это. Обратите внимание, что я использую фиксированный пул потоков scheduler
только с одним потоком. Это не только в образовательных целях: « 1 потока должно быть достаточно для всех » [1] в этом сценарии. failAfter()
по себе failAfter()
довольно бесполезен, но объедините его с нашим responseFuture
и у нас есть решение!
1
2
3
4
5
6
7
8
|
final CompletableFuture<Response> responseFuture = asyncCode(); final CompletableFuture<Response> oneSecondTimeout = failAfter(Duration.ofSeconds( 1 )); responseFuture .acceptEither(oneSecondTimeout, this ::send) .exceptionally(throwable -> { log.error( "Problem" , throwable); return null ; }); |
Здесь много чего происходит. После получения responseFuture
с нашей фоновой задачей мы также создаем «синтетическое» будущее oneSecondTimeout
, которое никогда не завершится успешно, но всегда завершается ошибкой через 1 секунду. Теперь мы объединяем их, вызывая acceptEither
. Этот оператор выполнит блок кода с первым завершенным будущим, responseFuture
или oneSecondTimeout
и просто проигнорирует результат более медленного. Если asyncCode()
завершится в течение 1 секунды, будет вызван this::send
, а исключение из oneSecondTimeout
будет проигнорировано. Однако! Если asyncCode()
действительно медленный, oneSecondTimeout
первым. Но так как это происходит с исключением, то вместо this::send
вызывается exceptionally
обработчик ошибок. Вы можете принять как должное, что будет вызываться либо send()
либо в exceptionally
, но не оба. Конечно, если бы у нас было два «обычных» фьючерса, завершающихся нормально, send()
вызывался бы с ответом первого, отбрасывая последнее.
Это было не самое чистое решение. Чистее можно обернуть оригинальное будущее и убедиться, что оно закончится в течение заданного времени. Такой оператор доступен в com.twitter.util.Future
(Scala; com.twitter.util.Future
within()
), однако отсутствует в scala.concurrent.Future
(предположительно, вдохновлен первым). Давайте оставим Scala и реализуем аналогичный оператор для CompletableFuture
. Он принимает одно будущее в качестве входных данных и возвращает будущее, которое завершается после завершения базового. Однако, если это занимает слишком много времени, чтобы завершить будущее, генерируется исключение:
1
2
3
4
|
public static <T> CompletableFuture<T> within(CompletableFuture<T> future, Duration duration) { final CompletableFuture<T> timeout = failAfter(duration); return future.applyToEither(timeout, Function.identity()); } |
Это приводит к окончательному, чистому и гибкому решению:
1
2
3
4
5
6
7
8
|
final CompletableFuture<Response> responseFuture = within( asyncCode(), Duration.ofSeconds( 1 )); responseFuture .thenAccept( this ::send) .exceptionally(throwable -> { log.error( "Unrecoverable error" , throwable); return null ; }); |
Надеюсь, вам понравилась эта статья, поскольку вы можете видеть, что реактивное программирование на Java больше не будущее (без каламбура).
Ссылка: | Асинхронные тайм-ауты с CompletableFuture от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей . |