Статьи

Java 8: CompletableFuture против параллельного потока

В этом посте показано, как CompletableFuture Java 8 сравнивается с параллельными потоками при выполнении асинхронных вычислений.

Мы будем использовать следующий класс для моделирования долгосрочной задачи:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
class MyTask {
  private final int duration;
  public MyTask(int duration) {
    this.duration = duration;
  }
  public int calculate() {
    System.out.println(Thread.currentThread().getName());
    try {
      Thread.sleep(duration * 1000);
    } catch (final InterruptedException e) {
      throw new RuntimeException(e);
    }
    return duration;
  }
}

Давайте создадим десять задач, каждая продолжительностью 1 секунда:

1
2
3
List<MyTask> tasks = IntStream.range(0, 10)
                                    .mapToObj(i -> new MyTask(1))
                                    .collect(toList());

Как эффективно рассчитать список задач?

Подход 1: последовательно

Ваша первая мысль может состоять в том, чтобы вычислить задачи последовательно, следующим образом:

1
2
3
4
5
6
7
8
9
public static void runSequentially(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.stream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

Как и следовало ожидать, для запуска требуется 10 секунд, поскольку каждая задача запускается одна за другой в main потоке.

Подход 2: Использование параллельного потока

Быстрое улучшение заключается в преобразовании вашего кода в параллельный поток, как показано ниже:

1
2
3
4
5
6
7
8
9
public static void useParallelStream(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<Integer> result = tasks.parallelStream()
                              .map(MyTask::calculate)
                              .collect(toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

Выход

01
02
03
04
05
06
07
08
09
10
11
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main
Processed 10 tasks in 3043 millis

На этот раз это заняло 3 секунды, потому что 4 задачи выполнялись параллельно (с использованием трех потоков из ForkJoinPool плюс main поток).

Подход 3: Использование CompletableFutures

Давайте посмотрим, будут ли CompletableFuture работать лучше:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
public static void useCompletableFuture(List<MyTask> tasks) {
  long start = System.nanoTime();
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
           .collect(Collectors.toList());
 
  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
}

В приведенном выше коде мы сначала получаем список CompletableFuture а затем вызываем метод join для каждого будущего, чтобы дождаться их завершения по одному. Обратите внимание, что join — это то же самое, что и get , с той лишь разницей, что первый не выдает ни одного проверенного исключения, поэтому он удобнее в лямбда-выражении.

Кроме того, вы должны использовать два отдельных потоковых конвейера, а не размещать две операции карты друг за другом, потому что промежуточные потоковые операции являются ленивыми, и в итоге вы бы выполняли свои задачи последовательно! Вот почему вам сначала нужно собрать список CompletableFuture в список, чтобы они могли начать работу, прежде чем ждать их завершения.

Выход

01
02
03
04
05
06
07
08
09
10
11
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 4010 millis

На обработку 10 заданий ушло 4 секунды. Вы заметите, что использовались только 3 потока ForkJoinPool и что, в отличие от параллельного потока, main поток не использовался.

Подход 4. Использование CompletableFutures с пользовательским исполнителем

Одним из преимуществ CompletableFuture сравнению с параллельными потоками является то, что они позволяют вам указать другого Executor которому будут отправляться их задачи. Это означает, что вы можете выбрать более подходящее количество потоков в зависимости от вашего приложения. Так как мой пример не сильно Runtime.getRuntime().getAvailableProcessors() процессор, я могу увеличить число потоков до значения, превышающего Runtime.getRuntime().getAvailableProcessors() , как показано ниже:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
  long start = System.nanoTime();
  ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
  List<CompletableFuture<Integer>> futures =
      tasks.stream()
           .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
           .collect(Collectors.toList());
 
  List<Integer> result =
      futures.stream()
             .map(CompletableFuture::join)
             .collect(Collectors.toList());
  long duration = (System.nanoTime() - start) / 1_000_000;
  System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
  System.out.println(result);
  executor.shutdown();
}

Выход

01
02
03
04
05
06
07
08
09
10
11
pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1009 millis

После этого улучшения теперь требуется всего 1 секунда для обработки 10 задач.

Как видите, CompletableFuture обеспечивает больший контроль над размером пула потоков и должен использоваться, если ваши задачи связаны с вводом / выводом. Однако, если вы выполняете операции с интенсивным использованием ЦП, нет смысла иметь больше потоков, чем процессоров, поэтому используйте параллельный поток, поскольку его проще использовать.

Ссылка: Java 8: CompletableFuture vs Parallel Stream от нашего партнера по JCG Фахда Шарифа в блоге fahd.blog .