Статьи

CompletableFuture не может быть прервано

Я уже много писал о InterruptedException и прерывании потоков . Короче говоря, если вы вызываете Future.cancel() не по заданию, Future прекратит работу в ожидании get() , но также попытается прервать основной поток. Это довольно важная функция, которая позволяет лучше использовать пул потоков. Я также написал, что всегда предпочитаю CompletableFuture стандартному Future . Оказывается, более мощный младший брат Future не так элегантно обрабатывает метод cancel() . Рассмотрим следующую задачу, которую мы будем использовать позже во время тестов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class InterruptibleTask implements Runnable {
  
    private final CountDownLatch started = new CountDownLatch(1)
    private final CountDownLatch interrupted = new CountDownLatch(1)
  
    @Override
    void run() {
        started.countDown()
        try {
            Thread.sleep(10_000)
        } catch (InterruptedException ignored) {
            interrupted.countDown()
        }
    }
  
    void blockUntilStarted() {
        started.await()
    }
  
    void blockUntilInterrupted() {
        assert interrupted.await(1, TimeUnit.SECONDS)
    }
  
}

Потоки клиента могут проверить InterruptibleTask чтобы увидеть, запущен он или был прерван. Сначала давайте посмотрим, как InterruptibleTask реагирует на cancel() извне:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def "Future is cancelled without exception"() {
    given:
        def task = new InterruptibleTask()
        def future = myThreadPool.submit(task)
        task.blockUntilStarted()
    and:
        future.cancel(true)
    when:
        future.get()
    then:
        thrown(CancellationException)
}
  
def "CompletableFuture is cancelled via CancellationException"() {
    given:
        def task = new InterruptibleTask()
        def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
        task.blockUntilStarted()
    and:
        future.cancel(true)
    when:
        future.get()
    then:
        thrown(CancellationException)
}

Все идет нормально. Очевидно, что и Future и CompletableFuture работают практически одинаково — при получении результата после его отмены возникает CancellationException . Но как насчет потока в myThreadPool ? Я думал, что это будет прервано и таким образом переработано бассейном, насколько я был неправ!

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
def "should cancel Future"() {
    given:
        def task = new InterruptibleTask()
        def future = myThreadPool.submit(task)
        task.blockUntilStarted()
    when:
        future.cancel(true)
    then:
        task.blockUntilInterrupted()
}
  
@Ignore("Fails with CompletableFuture")
def "should cancel CompletableFuture"() {
    given:
        def task = new InterruptibleTask()
        def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
        task.blockUntilStarted()
    when:
        future.cancel(true)
    then:
        task.blockUntilInterrupted()
}

Первый тест отправляет обычный Runnable в ExecutorService и ждет, пока он не запустится. Позже мы отменяем Future и ждем, пока не появится InterruptedException . blockUntilInterrupted() вернется, когда основной поток будет прерван. Второй тест, однако, не проходит. CompletableFuture.cancel() никогда не прервет основной поток, поэтому, несмотря на то, что Future выглядит так, как будто он был отменен, резервный поток все еще работает, и из sleep() не InterruptedException исключение InterruptedException . Баг или фича? Это задокументировано , так что, к сожалению, особенность:

Параметры: mayInterruptIfRunning — это значение не имеет эффекта в этой реализации, поскольку прерывания не используются для управления обработкой.

RTFM, вы говорите, но почему CompletableFuture работает таким образом? Сначала давайте рассмотрим, как «старые» реализации Future отличаются от CompletableFuture . FutureTask , возвращаемый из ExecutorService.submit() имеет следующую реализацию cancel() (я удалил Unsafe с похожим не потокобезопасным Java-кодом, поэтому рассматривал его только как псевдокод):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
        return false;
    state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                state = INTERRUPTED;
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

FutureTask имеет переменную state которая следует за этой диаграммой состояний:

будущее состояние диаграмма

В случае cancel() мы можем либо перейти в состояние CANCELLED либо перейти в INTERRUPTED через INTERRUPTING . Основная часть — это то, где мы берем поток runner (если существует, то есть, если задача выполняется в данный момент), и мы пытаемся прервать его. Эта ветвь заботится об энергичном и принудительном прерывании уже запущенного потока. В конце мы должны уведомить все потоки, заблокированные в Future.get() в finishCompletion() (не имеет значения здесь). Так что довольно очевидно, сколько лет Future отменяет уже запущенные задачи. Что насчет CompletableFuture ? Псевдокод cancel() :

1
2
3
4
5
6
7
8
9
public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = false;
    if (result == null) {
        result = new AltResult(new CancellationException());
        cancelled = true;
    }
    postComplete();
    return cancelled || isCancelled();
}

Весьма разочаровывает, мы едва установили result в CancellationException , игнорируя флаг mayInterruptIfRunning . postComplete() выполняет роль, аналогичную функции finishCompletion() — уведомляет обо всех ожидающих обратных finishCompletion() зарегистрированных в этом будущем. Его реализация довольно неприятна (с использованием неблокирующего стека Treiber ), но она определенно не прерывает какой-либо основной поток.

Причины и последствия

Ограниченная cancel() в случае CompletableFuture — это не ошибка, а дизайнерское решение. CompletableFuture по своей природе не привязан ни к одному потоку, в то время как Future почти всегда представляет фоновую задачу. Совершенно нормально создавать CompletableFuture с нуля ( new CompletableFuture<>() ), где просто нет нижележащего потока для отмены. Тем не менее, я не могу избавиться от ощущения, что у большинства CompletableFuture есть связанные задачи и фоновый поток. В этом случае сбой cancel() является потенциальной проблемой. Я больше не советую слепо заменять Future на CompletableFuture как это может изменить поведение приложений, использующих метод cancel() . Это означает, что CompletableFuture преднамеренно нарушает принцип подстановки Лискова — и это серьезный смысл для рассмотрения.

Ссылка: CompletableFuture не может быть прервана нашим партнером по JCG Томашем Нуркевичем из блога Java и соседей .