Статьи

API Java 8 Streams как дружественный фасад ForkJoinPool

Одна из функций, которые мне больше всего нравятся в Java 8, — это API потоков. Наконец, он устраняет практически все циклы из кода и позволяет вам писать код, который будет намного более выразительным и целенаправленным.

Сегодня я понял, что это можно использовать для чего-то другого: в качестве хорошего интерфейса для ForkJoinPool .

Проблема: Исполнители Бойлерплейт

Допустим, мы хотим запустить несколько задач параллельно. Ничего особенного, скажем, каждый из них просто выводит имя исполняющего потока (так что мы можем видеть, что он работает параллельно). Мы хотим возобновить выполнение после того, как они все сделали.

Если вы хотите запустить несколько задач параллельно, используя ExecutorService , вам, вероятно, нужно сделать что-то вроде следующего:

01
02
03
04
05
06
07
08
09
10
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
    executor.submit(() -> System.out.println(Thread.currentThread()));
}
executor.shutdown();
try {
    executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
    // TODO handle...
}

Теперь это много кода! Но мы можем сделать лучше.

Решение: Stream API

В конце концов я придумал эту утилиту:

1
2
3
void doInParallelNTimes(int times, Runnable op) {
    IntStream.range(0, times).parallel().forEach(i -> op.run());
}

Многоразовые и все. Назовите это как:

1
doInParallelNTimes(5, () -> System.out.println(Thread.currentThread()));

Готово.

Этот распечатывает следующее. Обратите внимание, что он на самом деле также использует основной поток — так как он все равно остается заложником и не может возобновить работу до завершения выполнения.

1
2
3
4
5
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]

Другой пример: параллельные вычисления

Вот еще один пример. Вместо того, чтобы делать то же самое N раз, мы можем использовать потоковый API для параллельной обработки ряда различных задач. Мы можем создать («затравить») поток с любой коллекцией или набором значений, иметь функцию, выполняемую на них параллельно, и, наконец, агрегировать результаты (собрать в коллекцию, уменьшить до одного значения и т. Д.)

Давайте посмотрим, как мы можем вычислить сумму первых 45 чисел Фибоначчи:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
public class Tester {
    public static void main(String[] args) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        IntStream.range(1, 45).parallel().map(Tester::fib).sum();
        System.out.println("Parallel took " + stopwatch.elapsed(MILLISECONDS) + " ms");
 
        stopwatch.reset();
        stopwatch.start();
        IntStream.range(1, 45).map(Tester::fib).sum();
        System.out.println("Sequential took " + stopwatch.elapsed(MILLISECONDS) + " ms");
    }
 
    private static int fib(int n) {
        if (n == 1 || n == 2) {
            return 1;
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

Распечатывает:

1
2
Parallel took 3078 ms
Sequential took 7327 ms

Он многого достигает в одной строке кода. Сначала он создает поток с описаниями всех задач, которые мы хотим выполнять параллельно. Затем он вызывает функцию для всех из них параллельно. Наконец, он возвращает сумму всех этих результатов.

Это не все, что придумано. Я легко могу представить создание потока с произвольными значениями (включая богатые объекты Java) и выполнение нетривиальной операции над ними. Это не имеет значения, организуя все, что будет выглядеть одинаково.

Когда это сделать?

Я думаю, что это решение довольно хорошо для всех случаев, когда вы знаете загрузку заранее, и вы хотите разветвить выполнение на несколько потоков и продолжить после того, как все они будут выполнены. Я нуждался в этом для некоторого тестового кода, но он, вероятно, хорошо работал бы во многих других сценариях разветвления / соединения или «разделяй и властвуй».

Очевидно, что это не работает, если вы хотите запустить что-то в фоновом режиме и возобновить выполнение или если вы хотите, чтобы фоновый исполнитель работал в течение длительного периода времени.

Ссылка: Java 8 Streams API как дружественный фасад ForkJoinPool от нашего партнера JCG Конрада Гаруса в блоге Белки .