Статьи

Реализация Java 8 CompletionStage (Часть II)

В первой части мы обсудили причины реализации Java 8 CompletionStage и почему мы не можем использовать чисто функциональные обратные вызовы для реализации. Сегодня я хотел бы наконец погрузиться в код.

Как это устроено?

Интерфейс CompletionStage имеет более 30 методов, но мы можем просто проиллюстрировать внутреннюю работу только по нескольким из них.

@Override
public boolean complete(T result) {
    return callbackRegistry.success(result);
}
@Override
public boolean completeExceptionally(Throwable ex) {
    return callbackRegistry.failure(ex);
}
@Override
public <U> CompletionStage<U> thenApplyAsync(
        Function<? super T, ? extends U> fn,
        Executor executor)
{
    SimpleCompletionStage<U> nextStage
            = new SimpleCompletionStage<>(defaultExecutor);
    callbackRegistry.addCallbacks(
            // when the result is ready, transform it and pass 
            // it to the next completion stage
            result -> {
                try {
                    nextStage.complete(fn.apply(result));
                } catch (Throwable e) {
                    // transformation fails, next stage has to 
                    // complete exceptionally
                    nextStage.completeExceptionally(wrapException(e));
                }
            },
            // exception from previous stage is passed to the next stage
            e -> nextStage.completeExceptionally(wrapException(e)),
            executor
    );
    return nextStage;
}

CalbackRegistry отслеживает обратные вызовы, если кто-то вызывает метод «complete», значение распространяется на все ранее зарегистрированные обратные вызовы. Если новый обратный вызов добавляется после вызова метода «success», значение сразу передается обратному вызову. CalbackRegistry является единственным классом с полным состоянием во всем механизме, остальное — без состояния. Этот класс был сильно вдохновлен подобным классом весной.

Давайте перейдем к методу thenApplyAsync. Цитировать JavaDoc « Возвращает новый CompletionStage, который, когда этот этап завершается нормально, выполняется с результатом этого этапа в качестве аргумента предоставленной функции. «Мы должны вернуть новый этап завершения, поэтому давайте создадим его.

Затем мы регистрируем два обратных вызова, один для нормального и один для исключительного завершения. Когда кто-то звонит «завершить», используется первый обратный вызов. Он принимает результат, применяет функцию и передает преобразованное значение на следующий этап. Обратите внимание, что код в обратных вызовах не выполняется напрямую, он будет выполняться только после вызова метода «complete» на этом этапе.

Если функция преобразования выдает исключение или если вызывается метод completeExceptionally, следующий этап также завершается в исключительном порядке.

Давайте упростим это

Несмотря на то, что предыдущий код четко описывает намерение, оно может быть еще более упрощено. После некоторого рефакторинга я дошел до этого

@Override
public <U> CompletionStage<U> thenApplyAsync(
        Function<? super T, ? extends U> fn,
        Executor executor
) {
    SimpleCompletionStage<U> nextStage = newSimpleCompletionStage();
    addCallbacks(
            result -> nextStage.acceptResult(() -> fn.apply(result)),
            nextStage::handleFailure,
            executor
    );
    return nextStage;
}

private void acceptResult(Supplier<? extends T> supplier) {
    try {
        complete(supplier.get());
    } catch (Throwable e) {
        handleFailure(e);
    }
}

Это тот же код, что и раньше, обработка исключений просто оборачивается в повторно используемые методы. Метод acceptResult принимает поставщика, который предоставляет значение для отправки на следующий этап. Если Поставщик выдает исключение, следующий этап завершается исключительно. Имея такую ​​поддержку, реализация других методов довольно проста.

Возьмите метод «handleAsync», например. JavaDoc говорит: « Возвращает новый CompletionStage, который, когда этот этап завершается нормально или исключительно, выполняется с результатом и исключением этого этапа в качестве аргументов предоставленной функции. Данная функция вызывается с результатом (или нулевым, если нет) и исключением (или нулевым, если нет) этого этапа, когда завершается в качестве аргументов. »

@Override
public <U> CompletionStage<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn,
        Executor executor) {
    SimpleCompletionStage<U> nextStage = newSimpleCompletionStage();
    addCallbacks(
            result -> nextStage.acceptResult(() -> fn.apply(result, null)),
            // exceptions are treated as success
            e -> nextStage.acceptResult(() -> fn.apply(null, e)),
            executor
    );
    return nextStage;
}

Код делает именно то, что говорит в JavaDoc. Если этот этап завершается нормально, мы применяем функцию к значению и передаем результат на следующий этап. В случае исключения мы вызываем одну и ту же функцию с разными аргументами и передаем результат также на следующий этап. Просто, не правда ли?

Преобразование функций легко

Хотя интерфейс CompletionStage выглядит пугающе, в конце я обнаружил, что большинство методов могут быть реализованы путем повторного использования существующих методов после применения простого преобразования. Например, «thenRunAsync» совпадает с «thenApplyAsync», но вместо функции принимает исполняемый файл. Легко преобразовать runnable в функцию и повторно использовать «thenApplyAsync».

public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor) {
    return thenApplyAsync(convertRunnableToFunction(action), executor);
}

private Function<T, Void> convertRunnableToFunction(Runnable action) {
    return result -> {
        action.run();
        return null;
    };
}

Runnable — это функция, которая игнорирует свой параметр и ничего не возвращает. Вы можете видеть, что естественно выразить идею, используя лямбды.

Применить к любому

Не все методы настолько просты, самым хитрым было «applyToEither», которое принимает два этапа завершения и выбирает только один результат, игнорируя другой. Звучит сложно, но даже этот метод может быть легко реализован.

@Override
public <U> CompletionStage<U> applyToEitherAsync(
        CompletionStage<? extends T> other,
        Function<? super T, U> fn,
        Executor executor) {
    SimpleCompletionStage<T> nextStage = newSimpleCompletionStage();
    // only the first result is accepted by completion stage,
    // the other one is ignored
    this.thenAccept(nextStage::complete).exceptionally(nextStage::handleFailure);
    other.thenAccept(nextStage::complete).exceptionally(nextStage::handleFailure);
    return nextStage.thenApplyAsync(fn, executor);
}

Мы повторно используем существующие методы «thenAccept» и «исключительно» для вызова «complete» или «handleFailure» на следующем этапе. Реализация этапа завершения nextStage гарантирует, что он принимает только первый результат и игнорирует дальнейшие вызовы методов complete *. Это означает, что применяется только первый результат. Чтобы использовать функцию, представленную в параметре, мы вызываем «thenApplyAsync» (последняя строка) на следующем этапе, и все готово. Обратите внимание, что мы создаем дополнительные CompletionStage, поэтому код не так эффективен, как мог бы быть, но я предпочитаю ясность, а не незначительное повышение производительности.

Тесты обязательны

There are so many edge cases and combinations that the work would have been impossible without good tests. I have created a common test suite that can be executed on top of CompletableFuture and my implementation of CompletionStage. It’s incredibly useful, it allows to keep both implementations in sync and to troubleshoot the tests. Moreover, I was able to find a strange behavior (possible bug) in CompletableFuture.
The code has near 100% test coverage but unfortunatelly the interface is so complex that it’s not possible to be sure about the correctness.

It’s useful to write about the code

It’s interesting how writing about the problem switches the mind to a different mode. While writing this post, I have uncovered several bugs and figured-out several simplifications. I had fun writing the code but without writing this post the code would have been much more complicated and buggy.
So feel free to check the code and more importantly, try similar exercise for yourself, it’s really fun.