Одной из основных причин появления лямбд Java8 была способность максимально легко использовать многоядерные процессоры (см. « Освоение лямбд: программирование Java в многоядерном мире» ). Просто изменив свой код с collection.stream()...
на collection.parallelStream()...
вы получаете мгновенную многопоточность, которая обеспечивает всю мощность процессора на вашей машине. (Давайте проигнорируем раздоры на этом этапе.)
Если вы распечатываете имена потоков, используемых parallelStream, вы заметите, что это те же потоки, которые используются в инфраструктуре ForkJoin, и выглядят примерно так:
1
2
|
[ForkJoinPool.commonPool-worker-1] [ForkJoinPool.commonPool-worker-2] |
Посмотрите блог Бенджамина Винтерберга для хорошо проработанного примера этого.
Теперь в Java 8 вы можете использовать этот commonPool напрямую с новым методом в ForkJoinPool commonPool () . Это возвращает экземпляр ForkJoinPool (который является ExecutorService) с commonPool потоков — те же, которые используются в parallelStream. Это означает, что любая работа, которую вы выполняете непосредственно с commonPool, будет очень хорошо играть с работой, выполняемой в parallelStream, особенно с планированием потоков и перехватом работ между потоками.
Давайте рассмотрим пример того, как вы используете ForkJoin, особенно в работе с хитрым предметом исключений.
Сначала получите экземпляр ForkJoin.commonPool()
вызвав ForkJoin.commonPool()
. Вы можете отправить ему задачи, используя метод submit()
. Поскольку мы используем Java8, мы можем передавать лямбда-выражения, которые действительно аккуратны. Как и во всех реализациях ExecutorService
вы можете передавать экземпляры Runnable
или Callable
в submit()
. Когда вы передаете лямбду в метод submit, он автоматически превращает ее в Runnable
или Callable
сигнатуру метода.
Это приводит к интересной проблеме, которая подчеркивает, как работают лямбды. Предположим, что у вас есть метод возвращаемого типа void (например, Runnable), но выдает проверенное исключение (например, Callable). Смотрите метод throwException()
в листинге кода ниже для такого примера. Если вы напишите этот код, он не скомпилируется.
1
2
3
|
Future task1 = commonPool.submit(() -> { throwException( "task 1" ); }); |
Причина этого в том, что компилятор предполагает, что из-за типа возврата void вы пытаетесь создать Runnable. Конечно, Runnable не может выдать исключение. Чтобы обойти эту проблему, вы должны заставить компилятор понять, что вы создаете Callable, которому разрешено генерировать исключение, используя этот трюк кода.
1
2
3
4
|
Future task1 = commonPool.submit(() -> { throwException( "task 1" ); return null ; }); |
Это немного грязно, но делает работу. Возможно, компилятор мог бы решить это сам.
Еще две вещи, которые нужно выделить в полном списке кода ниже. Во-первых, тот факт, что вы можете видеть, сколько потоков будет доступно в пуле, с помощью commonPool.getParallelism()
. Это можно настроить с помощью параметра '-Djava.util.concurrent.ForkJoinPool.common.parallelism'
. Во-вторых, обратите внимание, как вы можете разворачивать исключение ExecutionException, чтобы ваш код мог просто представить IOException своим вызывающим, а не довольно специфичным ExecutionException. Также обратите внимание, что этот код не выполняется при первом исключении. Если вы хотите собрать все исключения, вам нужно соответствующим образом структурировать код, возможно, возвращая список исключений. Или, может быть, более аккуратно генерировать пользовательское исключение, содержащее список основных исключений.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
public class ForkJoinTest { public void run() throws IOException{ ForkJoinPool commonPool = ForkJoinPool.commonPool(); Future task1 = commonPool.submit(() -> { throwException( "task 1" ); return null ; }); Future task2 = commonPool.submit(() -> { throwException( "task 2" ); return null ; }); System.out.println( "Do something while tasks being " + "executed on " + commonPool.getParallelism() + " threads" ); try { //wait on the result from task2 task2.get(); //wait on the result from task1 task1.get(); } catch (InterruptedException e) { throw new AssertionError(e); } catch (ExecutionException e) { Throwable innerException = e.getCause(); if (innerException instanceof RuntimeException) { innerException = innerException.getCause(); if (innerException instanceof IOException){ throw (IOException) innerException; } } throw new AssertionError(e); } } public void throwException(String message) throws IOException, InterruptedException { Thread.sleep( 100 ); System.out.println(Thread.currentThread() + " throwing IOException" ); throw new IOException( "Throw exception for " + message); } public static void main(String[] args) throws IOException{ new ForkJoinTest().run(); } } |
Ссылка: | Java8 Многопоточность ForkJoinPool: Работа с исключениями от нашего партнера по JCG Дэниела Шая в блоге Rational Java . |