Статьи

Fork / Join Framework

Эта статья является частью нашего курса Академии под названием Основы параллелизма Java .

В этом курсе вы погрузитесь в магию параллелизма. Вы познакомитесь с основами параллелизма и параллельного кода и узнаете о таких понятиях, как атомарность, синхронизация и безопасность потоков. Проверьте это здесь !

1. Введение

В этой статье дается введение в Fork / Join Framework, которая является частью JDK начиная с версии 1.7. Он описывает основные функции фреймворков и предоставляет несколько примеров, чтобы обеспечить некоторый практический опыт.

2. Форк / Регистрация

Базовым классом Fork / Join Framework является java.util.concurrent.ForkJoinPool . Этот класс реализует два интерфейса Executor и ExecutorService и подклассы AbstractExecutorService . Следовательно, ForkJoinPool — это в основном пул потоков, который выполняет особые задачи, а именно ForkJoinTask . Этот класс реализует уже известный интерфейс Future и вместе с тем такие методы, как get() , cancel() и isDone() . Помимо этого этот класс также предоставляет два метода, которые дали всей платформе свое имя: fork() и join() .

В то время как вызов fork() запускает асинхронное выполнение задачи, вызов join() будет ожидать завершения задачи и извлекать ее результат. Следовательно, мы можем разбить данную задачу на несколько небольших задач, разветвить каждую задачу и, наконец, дождаться завершения всех задач. Это облегчает реализацию сложных задач.

В информатике этот подход также известен как подход «разделяй и властвуй». Каждый раз, когда проблема слишком сложна, чтобы решить ее сразу, она делится на несколько более мелких и более простых для решения проблем. Это можно записать в псевдокоде так:

1
2
3
4
5
6
7
8
9
if(problem.getSize() > THRESHOLD) {
    SmallerProblem smallerProblem1 = new SmallerProblem();
    smallerProblem1.fork();
    SmallerProblem smallerProblem2 = new SmallerProblem();
    smallerProblem2.fork();
    return problem.solve(smallerProblem1.join(), smallerProblem2.join());
} else {
    return problem.solve();
}

Сначала мы проверяем, превышает ли текущий размер проблемы заданный порог. Если это так, мы делим проблему на более мелкие задачи, fork() каждую новую задачу и затем ждем результатов, вызывая join() . Так как join() возвращает результаты для каждой подзадачи, мы должны найти лучшее решение небольших проблем и вернуть его как наше лучшее решение. Эти шаги повторяются до тех пор, пока заданный порог не станет слишком низким и проблема не станет настолько маленькой, что мы сможем вычислить ее решение напрямую без дальнейшего разделения.

2.1. RecursiveTask

Чтобы немного лучше понять эту процедуру, мы реализуем алгоритм, который находит наименьшее число в массиве целочисленных значений. Эту проблему вы не решите в своей повседневной работе с помощью ForkJoinPool , но следующая реализация очень четко показывает основные принципы. В методе main() мы устанавливаем целочисленный массив со случайными значениями и создаем новый ForkJoinPool .

Первый параметр, передаваемый его конструктору, является индикатором уровня желаемого параллелизма. Здесь мы запрашиваем у Runtime количество доступных процессорных ядер. Затем мы вызываем метод invoke() и передаем экземпляр FindMin . FindMin расширяет класс RecursiveTask , который сам является подклассом упомянутой ранее ForkJoinTask . Класс ForkJoinTask имеет фактически два подкласса: один предназначен для задач, которые возвращают значение ( RecursiveTask ), а другой — для задач без возвращаемого значения ( RecursiveAction ). Суперкласс заставляет нас реализовать compute() . Здесь мы рассмотрим данный фрагмент целочисленного массива и решим, является ли текущая проблема слишком большой для немедленного решения или нет.

При нахождении наименьшего числа в массиве наименьший размер проблемы, который должен быть решен напрямую, заключается в сравнении двух элементов друг с другом и возвращении наименьшего значения из них. Если в настоящее время у нас более двух элементов, мы делим массив на две части и снова находим наименьшее число в этих двух частях. Это делается путем создания двух новых экземпляров FindMin .

Конструктор снабжается массивом и начальным и конечным индексом. Затем мы запускаем выполнение этих двух задач асинхронно, вызывая fork() . Этот вызов отправляет две задачи в очередь пула потоков. Пул потоков реализует стратегию, называемую похищением работы, то есть, если все другие потоки имеют достаточно для выполнения, текущие потоки крадут свою работу от одной из других задач. Это гарантирует, что задачи выполняются максимально быстро.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class FindMin extends RecursiveTask<Integer> {
    private static final long serialVersionUID = 1L;
    private int[] numbers;
    private int startIndex;
    private int endIndex;
 
    public FindMin(int[] numbers, int startIndex, int endIndex) {
        this.numbers = numbers;
        this.startIndex = startIndex;
        this.endIndex = endIndex;
    }
 
    @Override
    protected Integer compute() {
        int sliceLength = (endIndex - startIndex) + 1;
        if (sliceLength > 2) {
            FindMin lowerFindMin = new FindMin(numbers, startIndex, startIndex + (sliceLength / 2) - 1);
            lowerFindMin.fork();
            FindMin upperFindMin = new FindMin(numbers, startIndex + (sliceLength / 2), endIndex);
            upperFindMin.fork();
            return Math.min(lowerFindMin.join(), upperFindMin.join());
        } else {
            return Math.min(numbers[startIndex], numbers[endIndex]);
        }
    }
 
    public static void main(String[] args) {
        int[] numbers = new int[100];
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = random.nextInt(100);
        }
        ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        Integer min = pool.invoke(new FindMin(numbers, 0, numbers.length - 1));
        System.out.println(min);
    }
}

2.2. RecursiveAction

Как упоминалось выше, рядом с RecursiveTask у нас также есть класс RecursiveAction . В отличие от RecursiveTask он не должен возвращать значение, поэтому его можно использовать для асинхронных вычислений, которые могут быть непосредственно выполнены для данной структуры данных. Такой пример — вычисление изображения в градациях серого из цветного изображения. Все, что нам нужно сделать, это перебрать каждый пиксель изображения и вычислить значение градаций серого из значения RGB, используя следующую формулу:

1
gray = 0.2126 * red + 0.7152 * green + 0.0722 * blue

Числа с плавающей запятой представляют, насколько конкретный цвет способствует нашему восприятию серого человеком. Поскольку наибольшее значение используется для зеленого, мы можем заключить, что изображение в градациях серого вычисляется почти до 3/4 только от зеленой части. Таким образом, базовая реализация будет выглядеть следующим образом, предполагая, что изображение — это наш объект, представляющий фактические данные пикселя, а методы setRGB() и getRGB() используются для получения фактического значения RGB:

1
2
3
4
5
6
for (int row = 0; row < height; row++) {
    for (int column = 0; column < bufferedImage.getWidth(); column++) {
        int grayscale = computeGrayscale(image.getRGB(column, row));
        image.setRGB(column, row, grayscale);
    }
}

Вышеприведенная реализация отлично работает на одном процессоре. Но если у нас есть более одного доступного процессора, мы могли бы хотеть распределить эту работу по доступным ядрам. Поэтому вместо того, чтобы повторять два вложенных цикла for для всех пикселей, мы можем использовать ForkJoinPool и отправить новое задание для каждой строки (или столбца) изображения. Когда одна строка преобразована в оттенки серого, текущий поток может работать с другой строкой.

Этот принцип реализован в следующем примере:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class GrayscaleImageAction extends RecursiveAction {
    private static final long serialVersionUID = 1L;
    private int row;
    private BufferedImage bufferedImage;
 
    public GrayscaleImageAction(int row, BufferedImage bufferedImage) {
        this.row = row;
        this.bufferedImage = bufferedImage;
    }
 
    @Override
    protected void compute() {
        for (int column = 0; column < bufferedImage.getWidth(); column++) {
            int rgb = bufferedImage.getRGB(column, row);
            int r = (rgb >> 16) & 0xFF;
            int g = (rgb >> 8) & 0xFF;
            int b = (rgb & 0xFF);
            int gray = (int) (0.2126 * (float) r + 0.7152 * (float) g + 0.0722 * (float) b);
            gray = (gray << 16) + (gray << 8) + gray;
            bufferedImage.setRGB(column, row, gray);
        }
    }
 
    public static void main(String[] args) throws IOException {
        ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        BufferedImage bufferedImage = ImageIO.read(new File(args[0]));
        for (int row = 0; row < bufferedImage.getHeight(); row++) {
            GrayscaleImageAction action = new GrayscaleImageAction(row, bufferedImage);
            pool.execute(action);
        }
        pool.shutdown();
        ImageIO.write(bufferedImage, "jpg", new File(args[1]));
    }
}

В нашем методе main () мы читаем изображение, используя Java-класс ImageIO . Возвращенный экземпляр BufferedImage имеет все методы, которые нам нужны. Мы можем запросить количество строк и столбцов, а также получить и установить значение RGB для каждого пикселя. Поэтому все, что мы делаем, — это перебираем все строки и отправляем новый GrayscaleImageAction в наш ForkJoinPool . Последний получил подсказку о доступных процессорах в качестве параметра своего конструктора.

ForkJoinPool теперь запускает задачи асинхронно, вызывая их метод compute() . В этом методе мы перебираем каждую строку и обновляем соответствующие значения RGB по значению в градациях серого. После отправки всех задач в пул мы ожидаем завершения работы всего пула в главном потоке, а затем записываем обновленный BufferedImage обратно на диск с помощью метода ImageIO.write() .

Удивительно, но нам нужно всего несколько строк кода больше, чем нам нужно без использования доступных процессоров. Это еще раз показывает, сколько работы мы можем сэкономить, используя доступные ресурсы пакета java.util.concurrent .

ForkJoinPool предлагает три различных способа отправки задачи:

  • execute(ForkJoinTask) : этот метод выполняет заданную задачу асинхронно. У него нет возвращаемого значения.
  • invoke(ForkJoinTask) : этот метод ожидает возвращаемого значения задачи.
  • submit(ForkJoinTask) : этот метод выполняет заданную задачу асинхронно. Возвращает ссылку на саму задачу. Следовательно, ссылка на задачу может использоваться для запроса результата (поскольку он реализует интерфейс Future ).

С этим знанием ясно, почему мы передали вышеописанное действие GrayscaleImageAction используя метод execute (). Если бы мы использовали взамен invoke() , основной поток ждал бы завершения задачи, и мы бы не использовали доступный уровень параллелизма.

Мы находим те же различия, если более внимательно посмотреть на ForkJoinTask-API:

  • ForkJoinTask.fork() : ForkJoinTask выполняется асинхронно. У него нет возвращаемого значения.
  • ForkJoinTask.invoke() : немедленно выполняет ForkJoinTask и возвращает результат после завершения.

2,3. ForkJoinPool и ExecutorService

Теперь, когда мы знаем ExecutorService и ForkJoinPool , вы можете спросить себя, почему мы должны использовать ForkJoinPool а не ExecutorService . Разница между ними не так уж велика. Оба имеют методы execute() и submit() и используют экземпляры какого-либо общего интерфейса, такого как Runnable , Callable , RecursiveAction или RecursiveTask .

Чтобы лучше понять разницу, давайте попробуем реализовать класс FindMin сверху, используя ExecutorService :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class FindMinTask implements Callable<Integer> {
    private int[] numbers;
    private int startIndex;
    private int endIndex;
    private ExecutorService executorService;
 
    public FindMinTask(ExecutorService executorService, int[] numbers, int startIndex, int endIndex) {
        this.executorService = executorService;
        this.numbers = numbers;
        this.startIndex = startIndex;
        this.endIndex = endIndex;
    }
 
    public Integer call() throws Exception {
        int sliceLength = (endIndex - startIndex) + 1;
        if (sliceLength > 2) {
            FindMinTask lowerFindMin = new FindMinTask(executorService, numbers, startIndex, startIndex + (sliceLength / 2) - 1);
            Future<Integer> futureLowerFindMin = executorService.submit(lowerFindMin);
            FindMinTask upperFindMin = new FindMinTask(executorService, numbers, startIndex + (sliceLength / 2), endIndex);
            Future<Integer> futureUpperFindMin = executorService.submit(upperFindMin);
            return Math.min(futureLowerFindMin.get(), futureUpperFindMin.get());
        } else {
            return Math.min(numbers[startIndex], numbers[endIndex]);
        }
    }
     
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int[] numbers = new int[100];
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = random.nextInt(100);
        }
        ExecutorService executorService = Executors.newFixedThreadPool(64);
        Future<Integer> futureResult = executorService.submit(new FindMinTask(executorService, numbers, 0, numbers.length-1));
        System.out.println(futureResult.get());
        executorService.shutdown();
    }
}

Код выглядит очень похоже, ожидайте, что мы submit() наши задачи в ExecutorService а затем используем возвращенный экземпляр Future для wait() результата. Основное различие между обеими реализациями можно найти в том месте, где создается пул потоков. В приведенном выше примере мы создаем фиксированный пул потоков с 64 (!) Потоками. Почему я выбрал такой большой номер? Причина в том, что вызов get() для каждого возвращаемого Future блокирует текущий поток, пока не будет доступен результат. Если бы мы предоставили столько пулов для пула, сколько у нас было бы доступных процессоров, у программы не хватило бы ресурсов и зависание на неопределенный срок.

ForkJoinPool реализует уже упомянутую стратегию кражи работы, то есть каждый раз, когда работающий поток должен ждать какого-то результата; поток удаляет текущую задачу из рабочей очереди и выполняет другую задачу, готовую к запуску. Таким образом, текущий поток не блокируется и может использоваться для выполнения других задач. Как только результат для первоначально приостановленной задачи был вычислен, задача снова выполняется, и метод join () возвращает результат. Это важное отличие от обычного ExecutorService где вам придется блокировать текущий поток в ожидании результата.