В этом посте показано, как CompletableFuture
Java 8 сравнивается с параллельными потоками при выполнении асинхронных вычислений.
Мы будем использовать следующий класс для моделирования долгосрочной задачи:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
class MyTask { private final int duration; public MyTask( int duration) { this .duration = duration; } public int calculate() { System.out.println(Thread.currentThread().getName()); try { Thread.sleep(duration * 1000 ); } catch ( final InterruptedException e) { throw new RuntimeException(e); } return duration; } } |
Давайте создадим десять задач, каждая продолжительностью 1 секунда:
1
2
3
|
List<MyTask> tasks = IntStream.range( 0 , 10 ) .mapToObj(i -> new MyTask( 1 )) .collect(toList()); |
Как эффективно рассчитать список задач?
Подход 1: последовательно
Ваша первая мысль может состоять в том, чтобы вычислить задачи последовательно, следующим образом:
1
2
3
4
5
6
7
8
9
|
public static void runSequentially(List<MyTask> tasks) { long start = System.nanoTime(); List<Integer> result = tasks.stream() .map(MyTask::calculate) .collect(toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf( "Processed %d tasks in %d millis\n" , tasks.size(), duration); System.out.println(result); } |
Как и следовало ожидать, для запуска требуется 10 секунд, поскольку каждая задача запускается одна за другой в main
потоке.
Подход 2: Использование параллельного потока
Быстрое улучшение заключается в преобразовании вашего кода в параллельный поток, как показано ниже:
1
2
3
4
5
6
7
8
9
|
public static void useParallelStream(List<MyTask> tasks) { long start = System.nanoTime(); List<Integer> result = tasks.parallelStream() .map(MyTask::calculate) .collect(toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf( "Processed %d tasks in %d millis\n" , tasks.size(), duration); System.out.println(result); } |
Выход
01
02
03
04
05
06
07
08
09
10
11
|
main ForkJoinPool.commonPool-worker- 1 ForkJoinPool.commonPool-worker- 3 ForkJoinPool.commonPool-worker- 2 ForkJoinPool.commonPool-worker- 3 ForkJoinPool.commonPool-worker- 2 main ForkJoinPool.commonPool-worker- 1 ForkJoinPool.commonPool-worker- 1 main Processed 10 tasks in 3043 millis |
На этот раз это заняло 3 секунды, потому что 4 задачи выполнялись параллельно (с использованием трех потоков из ForkJoinPool
плюс main
поток).
Подход 3: Использование CompletableFutures
Давайте посмотрим, будут ли CompletableFuture
работать лучше:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
public static void useCompletableFuture(List<MyTask> tasks) { long start = System.nanoTime(); List<CompletableFuture<Integer>> futures = tasks.stream() .map(t -> CompletableFuture.supplyAsync(() -> t.calculate())) .collect(Collectors.toList()); List<Integer> result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf( "Processed %d tasks in %d millis\n" , tasks.size(), duration); System.out.println(result); } |
В приведенном выше коде мы сначала получаем список CompletableFuture
а затем вызываем метод join
для каждого будущего, чтобы дождаться их завершения по одному. Обратите внимание, что join
— это то же самое, что и get
, с той лишь разницей, что первый не выдает ни одного проверенного исключения, поэтому он удобнее в лямбда-выражении.
Кроме того, вы должны использовать два отдельных потоковых конвейера, а не размещать две операции карты друг за другом, потому что промежуточные потоковые операции являются ленивыми, и в итоге вы бы выполняли свои задачи последовательно! Вот почему вам сначала нужно собрать список CompletableFuture
в список, чтобы они могли начать работу, прежде чем ждать их завершения.
Выход
01
02
03
04
05
06
07
08
09
10
11
|
ForkJoinPool.commonPool-worker- 1 ForkJoinPool.commonPool-worker- 2 ForkJoinPool.commonPool-worker- 3 ForkJoinPool.commonPool-worker- 1 ForkJoinPool.commonPool-worker- 2 ForkJoinPool.commonPool-worker- 3 ForkJoinPool.commonPool-worker- 1 ForkJoinPool.commonPool-worker- 2 ForkJoinPool.commonPool-worker- 3 ForkJoinPool.commonPool-worker- 1 Processed 10 tasks in 4010 millis |
На обработку 10 заданий ушло 4 секунды. Вы заметите, что использовались только 3 потока ForkJoinPool и что, в отличие от параллельного потока, main
поток не использовался.
Подход 4. Использование CompletableFutures с пользовательским исполнителем
Одним из преимуществ CompletableFuture
сравнению с параллельными потоками является то, что они позволяют вам указать другого Executor
которому будут отправляться их задачи. Это означает, что вы можете выбрать более подходящее количество потоков в зависимости от вашего приложения. Так как мой пример не сильно Runtime.getRuntime().getAvailableProcessors()
процессор, я могу увеличить число потоков до значения, превышающего Runtime.getRuntime().getAvailableProcessors()
, как показано ниже:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) { long start = System.nanoTime(); ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10 )); List<CompletableFuture<Integer>> futures = tasks.stream() .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor)) .collect(Collectors.toList()); List<Integer> result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf( "Processed %d tasks in %d millis\n" , tasks.size(), duration); System.out.println(result); executor.shutdown(); } |
Выход
01
02
03
04
05
06
07
08
09
10
11
|
pool- 1 -thread- 2 pool- 1 -thread- 4 pool- 1 -thread- 3 pool- 1 -thread- 1 pool- 1 -thread- 5 pool- 1 -thread- 6 pool- 1 -thread- 7 pool- 1 -thread- 8 pool- 1 -thread- 9 pool- 1 -thread- 10 Processed 10 tasks in 1009 millis |
После этого улучшения теперь требуется всего 1 секунда для обработки 10 задач.
Как видите, CompletableFuture
обеспечивает больший контроль над размером пула потоков и должен использоваться, если ваши задачи связаны с вводом / выводом. Однако, если вы выполняете операции с интенсивным использованием ЦП, нет смысла иметь больше потоков, чем процессоров, поэтому используйте параллельный поток, поскольку его проще использовать.
Ссылка: | Java 8: CompletableFuture vs Parallel Stream от нашего партнера по JCG Фахда Шарифа в блоге fahd.blog . |