Ты не забудешь свои корни
Executor — это корневой интерфейс с единственным методом execute. Все, что реализует интерфейс Runnable, может быть передано в качестве параметра. Однако Silly Executor не поддерживает Callable.
Хорошие новости: интерфейс ExecutorService, который расширяет Executor, добавляет поддержку Callable. Его класс реализации — ThreadPoolExecutor.
Я собираюсь притвориться, что интерфейс ScheduledExecutorService и его класс реализации ScheduledThreadPoolExecutor не существует, поскольку они просто добавляют возможности планирования поверх ExecutorService и ThreadPoolExecutor. Но помните этот класс, когда мощного, но скучного java.util.Timer
недостаточно, а полноценного внешнего планировщика слишком много.
Если вы новичок в параллелизме или забыли разницу между Callable и Runnable, вы можете прочитать немного, прежде чем читать дальше. Манекен гид здесь
Факты ExecutorService.submit:
Три варианта отправки:
Будущее представить (вызываемое задание)
Future submit (Выполненная задача)
Future submit (Выполненная задача, T результат)
- Метод
submit
ExecutorService перегружен и принимает либоCallable
либоRunnable
. - Поскольку метод
run
для Runnable возвращает void, неудивительно, чтоFuture.get
всегда возвращает null, когда задача завершена.1Future<?> submit(Runnable task)
- Другой перегруженный метод
submit
который принимаетRunnable
и универсальный, возвращает результат, который вы передали в качестве второго параметра в качестве результата.1<T> Future<T> submit(Runnable task, T result)
Фактически, открыв код ( FutureTask
), вы заметите, что вложенный класс Executors
класса RunnableAdapter
просто хранит результат и возвращает тот же результат после завершения метода run.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this .task = task; this .result = result; } public T [More ...] call() { task.run(); return result; } } |
В обоих случаях, если вы хотите (вы должны!) Завершить программу вместо потока вашего исполнителя, блокирующего программу и входящего в занятый цикл , вы должны вызвать метод shutdown, как в
1
|
executorService.shutdown() |
факты закрытия
Вы можете представить себе shutdown
как полузакрытую дверь торгового центра. Новые клиенты не будут впущены, но существующие клиенты могут покинуть торговый центр, как только они будут сделаны.
Чтобы повторить,
-
shutdown
— это вежливый метод. Это фактически не закрывает задачи в пуле немедленно. Это просто говорит о том, что никакие новые задачи не будут приняты. - Если вы не выполняете свои задачи с помощью
invokeAll
, вам нужно будет дождаться завершения всех выполняемых задач. Это достигается путем вызова методаawaitTermination
. (invokeAll и отправьте примеры внизу поста) - Как только все текущие задачи выполнены, служба исполнителя завершает работу.
Если вам нужен невежливый, навязчивый метод, который не заботится о том, выполняются ли текущие потоки с их задачами, тогда shutdownNow — ваш парень Тем не менее, нет никакой гарантии, что метод остановит службу на точке, но это самое близкое к вам немедленное отключение.
На awaitTermination вы можете указать период ожидания, до которого основной поток должен ждать потоков пула для выполнения своих задач.
1
2
3
4
5
6
7
8
9
|
ExecutorService executorService=Executors.newFixedThreadPool( 10 ); … future = executorService.submit(getInstanceOfCallable(count,sum)); … executorService.shutdown(); if (executorService.awaitTermination( 10 , TimeUnit.SECONDS)){ System.out.println( 'All threads done with their jobs' ); } |
Исполнители — заводской парень
Классы выше, все классные. Но, скажем, вы хотели создать однопотокового исполнителя, вы бы написали что-то вроде
1
|
new ThreadPoolExecutor( 1 , 1 ,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); |
Сравните это с
1
|
Executors.newSingleThreadExecutor() |
Итак, поехали. Executors
— это класс с фабричными методами для создания различных форм службы executor с некоторыми часто используемыми значениями по умолчанию. Обратите внимание, что, кроме удивительных фабричных методов, он не приносит никаких новых возможностей.
Рекомендуется быстро взглянуть на реализацию фабричных методов и проверить, соответствует ли она вашим потребностям.
Вызов все и отправка
All
часть метода invokeAll
в ExecutorService
не удивляет. Это просто говорит о том, что вам нужно передать в коллекцию Callable
s. Опять же, как и ожидалось, метод не вернется, пока все потоки не выполнят свои задачи. Таким образом, для случаев, когда вы заинтересованы в результате только после завершения всех заданий, invokeAll
— ваш парень.
С другой стороны, метод submit
возвращается сразу после того, как вызываемый объект передается в службу executor. Если вы ничего не делаете в своем методе call
вашего Callable
, ваши рабочие потоки в идеале должны работать, когда возвращается метод submit
.
Следующие примеры могут быть полезны для вашей справки. Программа просто пытается найти сумму всех натуральных чисел до 100 (грубая сила, конечно)
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package me.rerun.incubator; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; public class ExecutorInvokeAll { public void runApp() throws InterruptedException, ExecutionException{ //variable to store the sum AtomicInteger sum= new AtomicInteger(); //Use our friendly neighbourhood factory method of the Executors. ExecutorService executorService=Executors.newFixedThreadPool( 10 ); List<Callable<AtomicInteger>> callableList= new ArrayList<Callable<AtomicInteger>>(); for ( int count = 0 ; count <= 100 ;count++) { callableList.add(getInstanceOfCallable(count,sum)); } //returns only after all tasks are complete List<Future<AtomicInteger>> resultFuture = executorService.invokeAll(callableList); //Prints 5050 all through for (Future<AtomicInteger> future : resultFuture) { //Didn't deliberately put a timeout here for the get method. Remember, the invoke All does not return until the task is done. System.out.println( "Status of future : " + future.isDone() + ". Result of future : " +future.get().get()); } executorService.shutdown(); // You might as well call a resultFuture.get(0).get().get() and that would give you the same //result since all your worker threads hold reference to the same atomicinteger sum. System.out.println( "Final Sum : " +sum); } //Adds count to the sum and returns the reference of the sum as the result private Callable<AtomicInteger> getInstanceOfCallable( final int count, final AtomicInteger sum) { Callable<AtomicInteger> clientPlanCall= new Callable<AtomicInteger>(){ public AtomicInteger call() { sum.addAndGet(count); System.out.println( "Intermediate sum :" +sum); return sum; } }; return clientPlanCall; } public static void main(String[] args) throws ExecutionException { try { new ExecutorInvokeAll().runApp(); } catch (InterruptedException e) { e.printStackTrace(); } } } |
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
package me.rerun.incubator; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; public class ExecutorSubmit { public void runApp() throws InterruptedException, ExecutionException{ //holder for the total sum AtomicInteger sum= new AtomicInteger(); //Use the factory method of Executors ExecutorService executorService=Executors.newFixedThreadPool( 10 ); Future<AtomicInteger> future = null ; for ( int count = 0 ; count <= 100 ; count++) { future = executorService.submit(getInstanceOfCallable(count,sum)); //prints intermediate sum try { System.out.println( "Status of future : " + future.isDone() + ". Result of future : " +future.get( 1000 , TimeUnit.MILLISECONDS).get()); } catch (TimeoutException e) { System.out.println( "<IGNORE> Timeout exception for count : " +count); //e.printStackTrace(); } //System.out.println("Result of future : "+future.get().get() +".Status of future : " + future.isDone()); } executorService.shutdown(); if (executorService.awaitTermination( 10 , TimeUnit.SECONDS)){ System.out.println( "All threads done with their jobs" ); } //exec System.out.println( "Final Sum : " +sum); } //Adds count to the sum and returns the reference of the sum as the result private Callable<AtomicInteger> getInstanceOfCallable( final int count, final AtomicInteger sum) { Callable<AtomicInteger> clientPlanCall= new Callable<AtomicInteger>(){ public AtomicInteger call() { sum.addAndGet(count); //System.out.println("Intermediate sum :"+sum); return sum; } }; return clientPlanCall; } public static void main(String[] args) throws ExecutionException { try { new ExecutorSubmit().runApp(); } catch (InterruptedException e) { e.printStackTrace(); } } } |
Дальнейшее чтение :
Потрясающий блог Алекса Миллера
Статья Фогеллы о сравнении с оригинальным API
Хорошее введение в параллелизм в целом
Настоятельно рекомендуется книга по параллелизму Java
Приятного кодирования и не забудьте поделиться!
Ссылка: Ленивые разработчики Введение в Java Параллельные исполнители от нашего партнера JCG Аруна Маниваннана в блоге Rerun.me .