Я уже много писал о InterruptedException и прерывании потоков . Короче говоря, если вы вызываете Future.cancel()
не по заданию, Future
прекратит работу в ожидании get()
, но также попытается прервать основной поток. Это довольно важная функция, которая позволяет лучше использовать пул потоков. Я также написал, что всегда предпочитаю CompletableFuture
стандартному Future
. Оказывается, более мощный младший брат Future
не так элегантно обрабатывает метод cancel()
. Рассмотрим следующую задачу, которую мы будем использовать позже во время тестов:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
class InterruptibleTask implements Runnable { private final CountDownLatch started = new CountDownLatch( 1 ) private final CountDownLatch interrupted = new CountDownLatch( 1 ) @Override void run() { started.countDown() try { Thread.sleep(10_000) } catch (InterruptedException ignored) { interrupted.countDown() } } void blockUntilStarted() { started.await() } void blockUntilInterrupted() { assert interrupted.await( 1 , TimeUnit.SECONDS) } } |
Потоки клиента могут проверить InterruptibleTask
чтобы увидеть, запущен он или был прерван. Сначала давайте посмотрим, как InterruptibleTask
реагирует на cancel()
извне:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
def "Future is cancelled without exception" () { given: def task = new InterruptibleTask() def future = myThreadPool.submit(task) task.blockUntilStarted() and: future.cancel( true ) when: future.get() then: thrown(CancellationException) } def "CompletableFuture is cancelled via CancellationException" () { given: def task = new InterruptibleTask() def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool) task.blockUntilStarted() and: future.cancel( true ) when: future.get() then: thrown(CancellationException) } |
Все идет нормально. Очевидно, что и Future
и CompletableFuture
работают практически одинаково — при получении результата после его отмены возникает CancellationException
. Но как насчет потока в myThreadPool
? Я думал, что это будет прервано и таким образом переработано бассейном, насколько я был неправ!
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
def "should cancel Future" () { given: def task = new InterruptibleTask() def future = myThreadPool.submit(task) task.blockUntilStarted() when: future.cancel( true ) then: task.blockUntilInterrupted() } @Ignore ( "Fails with CompletableFuture" ) def "should cancel CompletableFuture" () { given: def task = new InterruptibleTask() def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool) task.blockUntilStarted() when: future.cancel( true ) then: task.blockUntilInterrupted() } |
Первый тест отправляет обычный Runnable
в ExecutorService
и ждет, пока он не запустится. Позже мы отменяем Future
и ждем, пока не появится InterruptedException
. blockUntilInterrupted()
вернется, когда основной поток будет прерван. Второй тест, однако, не проходит. CompletableFuture.cancel()
никогда не прервет основной поток, поэтому, несмотря на то, что Future
выглядит так, как будто он был отменен, резервный поток все еще работает, и из sleep()
не InterruptedException
исключение InterruptedException
. Баг или фича? Это задокументировано , так что, к сожалению, особенность:
Параметры:
mayInterruptIfRunning
— это значение не имеет эффекта в этой реализации, поскольку прерывания не используются для управления обработкой.
RTFM, вы говорите, но почему CompletableFuture
работает таким образом? Сначала давайте рассмотрим, как «старые» реализации Future
отличаются от CompletableFuture
. FutureTask
, возвращаемый из ExecutorService.submit()
имеет следующую реализацию cancel()
(я удалил Unsafe
с похожим не потокобезопасным Java-кодом, поэтому рассматривал его только как псевдокод):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
public boolean cancel( boolean mayInterruptIfRunning) { if (state != NEW) return false ; state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null ) t.interrupt(); } finally { // final state state = INTERRUPTED; } } } finally { finishCompletion(); } return true ; } |
FutureTask
имеет переменную state
которая следует за этой диаграммой состояний:
В случае cancel()
мы можем либо перейти в состояние CANCELLED
либо перейти в INTERRUPTED
через INTERRUPTING
. Основная часть — это то, где мы берем поток runner
(если существует, то есть, если задача выполняется в данный момент), и мы пытаемся прервать его. Эта ветвь заботится об энергичном и принудительном прерывании уже запущенного потока. В конце мы должны уведомить все потоки, заблокированные в Future.get()
в finishCompletion()
(не имеет значения здесь). Так что довольно очевидно, сколько лет Future
отменяет уже запущенные задачи. Что насчет CompletableFuture
? Псевдокод cancel()
:
1
2
3
4
5
6
7
8
9
|
public boolean cancel( boolean mayInterruptIfRunning) { boolean cancelled = false ; if (result == null ) { result = new AltResult( new CancellationException()); cancelled = true ; } postComplete(); return cancelled || isCancelled(); } |
Весьма разочаровывает, мы едва установили result
в CancellationException
, игнорируя флаг mayInterruptIfRunning
. postComplete()
выполняет роль, аналогичную функции finishCompletion()
— уведомляет обо всех ожидающих обратных finishCompletion()
зарегистрированных в этом будущем. Его реализация довольно неприятна (с использованием неблокирующего стека Treiber ), но она определенно не прерывает какой-либо основной поток.
Причины и последствия
Ограниченная cancel()
в случае CompletableFuture
— это не ошибка, а дизайнерское решение. CompletableFuture
по своей природе не привязан ни к одному потоку, в то время как Future
почти всегда представляет фоновую задачу. Совершенно нормально создавать CompletableFuture
с нуля ( new CompletableFuture<>()
), где просто нет нижележащего потока для отмены. Тем не менее, я не могу избавиться от ощущения, что у большинства CompletableFuture
есть связанные задачи и фоновый поток. В этом случае сбой cancel()
является потенциальной проблемой. Я больше не советую слепо заменять Future
на CompletableFuture
как это может изменить поведение приложений, использующих метод cancel()
. Это означает, что CompletableFuture
преднамеренно нарушает принцип подстановки Лискова — и это серьезный смысл для рассмотрения.
Ссылка: | CompletableFuture не может быть прервана нашим партнером по JCG Томашем Нуркевичем из блога Java и соседей . |