Статьи

Реализация Java 8 CompletionStage (часть 1)

В Java 8 появилась стандартная поддержка быстрой асинхронной обработки. С помощью CompletableFuture s и lambdas мы можем написать такой код:

CompletableFuture<LaunchResult> missilesLaunched =
  getAuthorizationCode().thenApply(this::launchMissiles);

missilesLaunched
  .thenApply(this::generateDamageReport)
  .thenAccept(this::updateMainScreen)
  .exceptionally(this::playAlertSound);

...

missilesLaunched.thenAccept(LavaLamp::turnOn);

Примером является самодокументирование, я получаю CompletableFuture, а затем я последовательно чередую действия. Весь уродливый асинхронный код скрыт. Неважно, что для создания отчета о повреждении требуются часы, когда он готов, вызывается следующий этап.

Разве это не здорово? Теперь нам просто нужно добавить поддержку CompletableFutures во все библиотеки, а затем мы можем написать высокомасштабируемый асинхронный код, который легко соединяет асинхронные сервлеты с асинхронными драйверами HTTP, асинхронными драйверами JDBC и драйверами асинхронных ракетных пусковых установок. Это будет последний удар по Node.js, ура.

К сожалению, это будет не так просто. Прежде всего, потребуется время для переноса всех библиотек для использования CompletableFutures. Но это не то, что я хочу написать сегодня. Я хочу написать о недостатках в реализации CompletableFuture и о том, как я пытался их преодолеть.

Первая проблема, с которой я столкнулся в CompletableFuture, заключается в том, что она тесно связана с инфраструктурой fork-join, а инфраструктура fork-join предназначена для использования в основном для задач с интенсивным использованием процессора, Но если подумать, обычный вариант использования асинхронной обработки противоположен. Мы хотим использовать его в основном для блокировки задач, мы не хотим блокировать поток во время ожидания сетевой операции или запуска ракеты. Конечно, вам не нужно использовать исполнителей fork-join с CompletableFutures. Вы просто должны быть осторожны, так как это выбор по умолчанию для асинхронных методов.

Другая проблема, которую я имею с CompletableFutures, — это реализация. Просто проверьте код и убедитесь сами. Не поймите меня неправильно, я считаю, что это хорошо протестированный и действительно хорошо оптимизированный код. Но, честно говоря, пройдёт ли он проверку кода в вашей компании? Вы можете прочитать и посмотреть, как это работает?

К счастью, есть CompletionStageинтерфейс, поэтому мы можем использовать альтернативную реализацию. Просто Google для «альтернативной реализации CompletionStage», и вы получите … ничего.

Круто, это дает мне возможность реализовать это . Я помогу человечеству и наконец стану знаменитым. Шутки в сторону, это оказалось действительно интересным и познавательным упражнением, если у вас есть несколько свободных дней, я действительно рекомендую это

Обратные вызовы FTW!

Первая проблема, которую нужно решить, — это как интегрировать CompletionStage с источником данных. Моей первой попыткой было использовать только функции без сохранения состояния, зарегистрированные как обратные вызовы. Я создал следующий интерфейс

public interface Listenable<T> {
  public void addCallbacks(Consumer<? super T> onSuccess, Consumer<Throwable> onFailure);  
}

В основном это позволяет каждому регистрировать обратные вызовы. Это все, что нам нужно для реализации CompletionStage, нам просто нужно уведомить, когда предыдущая фаза прошла успешно или не удалась. Например, чтобы создать CompletionStage из Spring ListenableFuture, вам просто нужно сделать это

ListenableFuture<String> springListenableFuture = createSpringListenableFuture();

CompletionStage<Object> completionStage = factory.createCompletionStage(
     (onSuccessCallback, onFailureCallback) -> {
            springListenableFuture.addCallback(new ListenableFutureCallback<String>() {
                @Override
                public void onSuccess(String result) {
                    onSuccessCallback.accept(result);
                }

                @Override
                public void onFailure(Throwable t) {
                    onFailureCallback.accept(t);
                }
            });
        });

Код использует фабрику для создания стадии завершения. Лямбда-параметр — это Listenabe, который в основном заставляет Spring ListenableFuture вызывать onSuccessCallback при завершении и onFailureCallback при сбое. Признаюсь, это трудно читать, но у меня была веская причина для выбора этого решения. Это позволило мне зарегистрировать столько обратных вызовов, которые мне понравились, и, таким образом, делегировать всю тяжелую работу Spring ListenableFuture.

Вы видите, что большинство асинхронных библиотек уже имеют весь код, необходимый для управления обратными вызовами, я хотел использовать его повторно. Каждый из вызовов метода CompletionStage просто добавляет новый обратный вызов к источнику. Возьмите этот код для примера

CompletionStage<LaunchResult> missilesLaunched =
  getAuthorizationCode()
  .thenApply(this::launchMissiles);

missilesLaunched
  .thenApply(this::generateDamageReport)
  .thenAccept(this::updateMainScreen)
  .exceptionally(this::playAlertSound);

...
missilesLaunched.thenAccept(LavaLamp::turnOn);

Вызов метода thenApply (this :: launchMissiles) добавит новый обратный вызов, который запустит ракеты. Следующий thenApply будет генерировать еще один обратный вызов и так далее. Обратные вызовы были бы простыми преобразованиями без сохранения состояния, каждое из которых основывалось на предыдущих преобразованиях. Все состояние управлялось бы источником, в моем случае Spring ListenableFuture.

Обратные вызовы WTF?

Но, к сожалению, это невозможно. Проблема в том, что вы не хотите выполнять промежуточные шаги несколько раз. В нашем примере мы хотим создать отчет об ущербе и включить лавовую лампу на основе результатов запуска ракеты. Это приведет к двум обратным вызовам и, следовательно, к двум вызовам метода launchMissiles. Это не то, что мы хотим, очевидно, мы хотим вызвать метод только один раз. Особенно этот.

Обойти это невозможно, мы должны иметь дело с состоянием на каждом из этапов. Взгляните на последнюю строку примера: missilesLaunched.thenAccept (LavaLamp :: turnOn). Если результат запуска ракеты уже известен, missilesLaunched CompletionStage должен запомнить его, чтобы он мог передать его лавовой лампе. Нет другого способа выяснить результат без повторного запуска ракет. Если результат еще не известен, missilesLaunched должен зарегистрировать обратный вызов, который включит лампу лавы, как только результат будет доступен. Мы не можем делегировать его дальше, предыдущие этапы не имеют доступа к результату.

Если вы думаете об этом, проблема вызвана побочными эффектами. Если бы все мои методы были чисто функциональными, у меня не было бы этой проблемы. Я не против вызова методов несколько раз. Но запуск ракеты не является побочным эффектом, и в Java я не могу заставить всех пользователей библиотеки использовать только чистые функции.

Так что концепция прослушиваемого обратного вызова не сработала. В конце я полностью удалил его. Мало того, что это не работало, это было действительно трудно рассуждать. Посмотрите еще раз на пример с Spring ListenableFuture. Это довольно запутанно. Это было еще более запутанным внутренне, я закончил тем, что создал обратные вызовы в обратных вызовах в обратных вызовах.

Более того, версия без обратных вызовов намного проще в использовании для клиентов.

CompletableCompletionStage<Object> completionStage = factory.createCompletionStage();
springListenableFuture.addCallback(new ListenableFutureCallback<String>() {
    @Override
    public void onSuccess(String result) {
        completionStage.complete(result);
    }
    @Override
    public void onFailure(Throwable t) {
        completionStage.completeExceptionally(t);
    }
});

Теперь вам не нужно создавать новый Listenable, вы просто вызываете «complete» метод. Это более естественно и по совпадению CompletableFutures используется таким же образом.

Есть более интересные вещи, которые я изучил, но эта статья уже становится слишком длинной, поэтому я оставлю ее в другой раз. А пока вы можете проверить исходный код здесь .