Статьи

Java8 Многопоточность ForkJoinPool: Работа с исключениями

Одной из основных причин появления лямбд Java8 была способность максимально легко использовать многоядерные процессоры (см. « Освоение лямбд: программирование Java в многоядерном мире» ). Просто изменив свой код с collection.stream()... на collection.parallelStream()... вы получаете мгновенную многопоточность, которая обеспечивает всю мощность процессора на вашей машине. (Давайте проигнорируем раздоры на этом этапе.)

Если вы распечатываете имена потоков, используемых parallelStream, вы заметите, что это те же потоки, которые используются в инфраструктуре ForkJoin, и выглядят примерно так:

1
2
[ForkJoinPool.commonPool-worker-1]
[ForkJoinPool.commonPool-worker-2]

Посмотрите блог Бенджамина Винтерберга для хорошо проработанного примера этого.

Теперь в Java 8 вы можете использовать этот commonPool напрямую с новым методом в ForkJoinPool commonPool () . Это возвращает экземпляр ForkJoinPool (который является ExecutorService) с commonPool потоков — те же, которые используются в parallelStream. Это означает, что любая работа, которую вы выполняете непосредственно с commonPool, будет очень хорошо играть с работой, выполняемой в parallelStream, особенно с планированием потоков и перехватом работ между потоками.

Давайте рассмотрим пример того, как вы используете ForkJoin, особенно в работе с хитрым предметом исключений.

Сначала получите экземпляр ForkJoin.commonPool() вызвав ForkJoin.commonPool() . Вы можете отправить ему задачи, используя метод submit() . Поскольку мы используем Java8, мы можем передавать лямбда-выражения, которые действительно аккуратны. Как и во всех реализациях ExecutorService вы можете передавать экземпляры Runnable или Callable в submit() . Когда вы передаете лямбду в метод submit, он автоматически превращает ее в Runnable или Callable сигнатуру метода.

Это приводит к интересной проблеме, которая подчеркивает, как работают лямбды. Предположим, что у вас есть метод возвращаемого типа void (например, Runnable), но выдает проверенное исключение (например, Callable). Смотрите метод throwException()
в листинге кода ниже для такого примера. Если вы напишите этот код, он не скомпилируется.

1
2
3
Future task1 = commonPool.submit(() -> {
            throwException("task 1");
        });

Причина этого в том, что компилятор предполагает, что из-за типа возврата void вы пытаетесь создать Runnable. Конечно, Runnable не может выдать исключение. Чтобы обойти эту проблему, вы должны заставить компилятор понять, что вы создаете Callable, которому разрешено генерировать исключение, используя этот трюк кода.

1
2
3
4
Future task1 = commonPool.submit(() -> {
            throwException("task 1");
            return null;
        });

Это немного грязно, но делает работу. Возможно, компилятор мог бы решить это сам.

Еще две вещи, которые нужно выделить в полном списке кода ниже. Во-первых, тот факт, что вы можете видеть, сколько потоков будет доступно в пуле, с помощью commonPool.getParallelism() . Это можно настроить с помощью параметра '-Djava.util.concurrent.ForkJoinPool.common.parallelism' . Во-вторых, обратите внимание, как вы можете разворачивать исключение ExecutionException, чтобы ваш код мог просто представить IOException своим вызывающим, а не довольно специфичным ExecutionException. Также обратите внимание, что этот код не выполняется при первом исключении. Если вы хотите собрать все исключения, вам нужно соответствующим образом структурировать код, возможно, возвращая список исключений. Или, может быть, более аккуратно генерировать пользовательское исключение, содержащее список основных исключений.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ForkJoinTest {
    public void run() throws IOException{
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
 
        Future task1 = commonPool.submit(() -> {
            throwException("task 1");
            return null;
        });
        Future task2 = commonPool.submit(() -> {
            throwException("task 2");
            return null;
        });
 
        System.out.println("Do something while tasks being " +
                "executed on " + commonPool.getParallelism()
                + " threads");
 
        try {
            //wait on the result from task2
            task2.get();
            //wait on the result from task1
            task1.get();
        } catch (InterruptedException e) {
            throw new AssertionError(e);
        } catch (ExecutionException e) {
            Throwable innerException = e.getCause();
            if (innerException instanceof RuntimeException) {
                innerException = innerException.getCause();
                if(innerException instanceof IOException){
                    throw (IOException) innerException;
                }
            }
            throw new AssertionError(e);
        }
    }
 
    public void throwException(String message) throws IOException,
            InterruptedException {
        Thread.sleep(100);
        System.out.println(Thread.currentThread()
 
            + " throwing IOException");
        throw new IOException("Throw exception for " + message);
    }
 
    public static void main(String[] args) throws IOException{
        new ForkJoinTest().run();
    }
}