Статьи

Какой поток выполняет задачи и обратные вызовы CompletableFuture?

CompletableFuture — все еще относительно свежая концепция, несмотря на то, что она была введена почти два года назад (!) В марте 2014 года в Java 8. Но, возможно, хорошо, что этот класс не так хорошо известен, так как им можно легко злоупотреблять, особенно в отношении потоков и пулы потоков, которые участвуют в пути. Эта статья призвана описать, как потоки используются с CompletableFuture.

Выполнение задач

Это фундаментальная часть API. Существует удобный метод supplyAsync (), который похож на ExecutorService.submit (), но возвращает CompletableFuture:

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {
                log.info("Downloading");
                return IOUtils.toString(is, StandardCharsets.UTF_8);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });

Проблема заключается в том, что supplyAsync () по умолчанию использует ForkJoinPool.commonPool (), пул потоков, совместно используемый всеми CompletableFutures, всеми параллельными потоками и всеми приложениями, развернутыми на одной и той же JVM (если вы неудачно все еще используете сервер приложений со многими развернутыми артефактами). Этот жестко запрограммированный, неконфигурируемый пул потоков находится вне нашего контроля, его трудно контролировать и масштабировать. Поэтому вы всегда должны указывать своего собственного исполнителя, как здесь (и взглянуть на мои  несколько советов, как его создать ):

ExecutorService pool = Executors.newFixedThreadPool(10);

final CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            //...
        }, pool);

Но это только начало …

Обратные вызовы и преобразования

Предположим, что вы хотите преобразовать данное CompletableFuture, например, извлечь длину строки:

CompletableFuture<Integer> intFuture =
    future.thenApply(s -> s.length());

Кто именно вызывает код s.length ()? Честно говоря, мои дорогие разработчики, нам наплевать [1] . Пока лямбда-выражение внутри всех операторов, таких как thenApply, дешево, нам все равно, кто его называет. Но что, если это выражение занимает немного процессорного времени для завершения или выполняет блокирующий сетевой вызов?

Прежде всего, что происходит по умолчанию? Подумайте об этом: у нас есть фоновая задача типа String, и мы хотим применить некоторое конкретное преобразование асинхронно, когда это значение завершится. Самый простой способ реализовать это — обернуть исходную задачу (вернуть строку) и перехватить ее, когда она завершится. Когда внутренняя задача заканчивается, наш обратный вызов включается, применяет преобразование и возвращает измененное значение. Это как аспект, который находится между нашим кодом и исходным результатом вычислений. При этом должно быть совершенно очевидно, что преобразование s.length () будет выполняться в том же потоке, что и исходная задача, а? Не совсем!

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> {
            sleepSeconds(2);
            return "ABC";
        }, pool);

future.thenApply(s -> {
    log.info("First transformation");
    return s.length();
});

future.get();
pool.shutdownNow();
pool.awaitTermination(1, TimeUnit.MINUTES);

future.thenApply(s -> {
    log.info("Second transformation");
    return s.length();
});

Первое преобразование в thenApply () регистрируется, пока задача еще выполняется. Таким образом, он будет выполнен сразу после завершения задачи в том же потоке, что и задача. Однако, прежде чем регистрировать второе преобразование, мы ждем, пока задача фактически завершится. Хуже того, мы полностью закрываем пул потоков, чтобы гарантировать, что никакой другой код там не может быть выполнен. Так в каком потоке будет запущено второе преобразование? Мы знаем, что это должно произойти немедленно, поскольку в будущем мы регистрируем обратный вызов уже завершенным. Оказывается, по умолчанию используется клиентский поток (!)! Вывод следующий:

pool-1-thread-1 | First transformation main | Second transformation

Второе преобразование, когда оно зарегистрировано, понимает, что CompletableFuture уже завершено, поэтому оно выполняет преобразование немедленно. Вокруг нет другого потока, поэтому thenApply () вызывается в контексте текущего основного потока. Основная причина, почему это поведение склонно к ошибкам, проявляется, когда фактическое преобразование является дорогостоящим. Представьте себе лямбда-выражение внутри thenApply (), выполняющее тяжелые вычисления или блокирующее сетевой вызов. Внезапно наш асинхронный CompletableFuture блокирует вызывающий поток!

Управление пулом потоков обратного вызова

Есть два метода, чтобы контролировать, какой поток выполняет наши обратные вызовы и преобразования. Обратите внимание, что эти решения необходимы только в том случае, если ваши преобразования являются дорогостоящими. В противном случае разница незначительна. Итак, прежде всего мы можем выбрать * Async версии операторов, например:

future.thenApplyAsync(s -> {
    log.info("Second transformation");
    return s.length();
});

На этот раз второе преобразование было автоматически выгружено нашему другу, ForkJoinPool.commonPool ():

pool-1-thread-1                  | First transformation
ForkJoinPool.commonPool-worker-1 | Second transformation

Но нам не нравится commonPool, поэтому мы поставляем свои собственные:

future.thenApplyAsync(s -> {
    log.info("Second transformation");
    return s.length();
}, pool2);

Обратите внимание, что был использован другой пул потоков (пул-1 против пула-2):

pool-1-thread-1 | First transformation
pool-2-thread-1 | Second transformation

Рассматривать обратный вызов как еще один шаг вычислений

Но я считаю, что если у вас возникают проблемы с длительными обратными вызовами и преобразованиями (помните, что эта статья относится почти ко всем другим методам в CompletableFuture), вам следует просто использовать другой явный CompletableFuture, как здесь:

//Imagine this is slow and costly
CompletableFuture<Integer> strLen(String s) {
    return CompletableFuture.supplyAsync(
            () -> s.length(),
            pool2);
}

//...

CompletableFuture<Integer> intFuture = 
        future.thenCompose(s -> strLen(s));

Этот подход более явный. Зная, что наше преобразование имеет значительную стоимость, мы не рискуем запустить его в каком-либо произвольном или неконтролируемом потоке. Вместо этого мы явно моделируем его как асинхронную операцию от String до CompletableFuture <Integer>. Однако мы должны заменить thenApply () на thenCompose (), в противном случае мы получим CompletableFuture <CompletableFuture <Integer >>.

Но что, если у нашего преобразования нет версии, которая хорошо работает с вложенным CompletableFuture, например applyToEither (), которая ожидает завершения первого Future и применяет преобразование?

CompletableFuture<CompletableFuture<Integer>> poor = 
        future1.applyToEither(future2, s -> strLen(s));

Существует удобный прием для «разворачивания» такой непонятной структуры данных, называемой flatten, которая легко реализуется с помощью flatMap (identity) (или flatMap (x -> x)). В нашем случае flatMap () называется thenCompose ( duh! ):

CompletableFuture<Integer> good = 
        poor.thenCompose(x -> x);

Я оставляю это на ваше усмотрение, как и почему это работает. Я надеюсь, что эта статья прояснила, как потоки участвуют в CompletableFuture.