Пул потоков управляет пулом рабочих потоков, он содержит очередь, которая удерживает задачи, ожидающие выполнения. Пул потоков управляет коллекцией потоков Runnable, а рабочие потоки выполняют Runnable из очереди. java.util.concurrent.Executors обеспечивают реализацию интерфейса java.util.concurrent.Executor для создания пула потоков в java.
Давайте напишем простую программу, чтобы объяснить, как она работает.
Сначала нам нужно иметь класс Runnable.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package com.journaldev.threadpool;public class WorkerThread implements Runnable { private String command; public WorkerThread(String s){ this.command=s; } @Override public void run() { System.out.println(Thread.currentThread().getName()+' Start. Command = '+command); processCommand(); System.out.println(Thread.currentThread().getName()+' End.'); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString(){ return this.command; }} |
Вот тестовая программа, в которой мы создаем фиксированный пул потоков из среды Executors.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
package com.journaldev.threadpool;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class SimpleThreadPool { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { Runnable worker = new WorkerThread('' + i); executor.execute(worker); } executor.shutdown(); while (!executor.isTerminated()) { } System.out.println('Finished all threads'); }} |
В приведенной выше программе мы создаем пул потоков фиксированного размера из 5 рабочих потоков. Затем мы отправляем 10 заданий в этот пул, поскольку размер пула равен 5, он начнет работать на 5 заданиях, а другие задания будут в состоянии ожидания, как только одно из заданий будет завершено, другое задание из очереди ожидания будет быть подхваченным рабочим потоком и выполненным.
Вот вывод вышеуказанной программы.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
pool-1-thread-2 Start. Command = 1pool-1-thread-4 Start. Command = 3pool-1-thread-1 Start. Command = 0pool-1-thread-3 Start. Command = 2pool-1-thread-5 Start. Command = 4pool-1-thread-4 End.pool-1-thread-5 End.pool-1-thread-1 End.pool-1-thread-3 End.pool-1-thread-3 Start. Command = 8pool-1-thread-2 End.pool-1-thread-2 Start. Command = 9pool-1-thread-1 Start. Command = 7pool-1-thread-5 Start. Command = 6pool-1-thread-4 Start. Command = 5pool-1-thread-2 End.pool-1-thread-4 End.pool-1-thread-3 End.pool-1-thread-5 End.pool-1-thread-1 End.Finished all threads |
Вывод подтверждает, что в пуле есть пять потоков, названных из «pool-1-thread-1? «пул-1-нить-5»? и они несут ответственность за выполнение представленных задач в пул.
Класс Executors обеспечивает простую реализацию ExecutorService с использованием ThreadPoolExecutor, но ThreadPoolExecutor предоставляет гораздо больше возможностей, чем это. Мы можем указать количество потоков, которые будут активны при создании экземпляра ThreadPoolExecutor, и мы можем ограничить размер пула потоков и создать нашу собственную реализацию RejectedExecutionHandler для обработки заданий, которые не помещаются в очередь рабочих.
Вот наша пользовательская реализация интерфейса RejectedExecutionHandler.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
package com.journaldev.threadpool;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + ' is rejected'); }} |
ThreadPoolExecutor предоставляет несколько методов, с помощью которых мы можем узнать текущее состояние исполнителя, размер пула, количество активных потоков и количество задач. Итак, у меня есть поток монитора, который будет печатать информацию об исполнителе через определенный промежуток времени.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.journaldev.threadpool;import java.util.concurrent.ThreadPoolExecutor;public class MyMonitorThread implements Runnable{ private ThreadPoolExecutor executor; private int seconds; private boolean run=true; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds=delay; } public void shutdown(){ this.run=false; } @Override public void run() { while(run){ System.out.println( String.format('[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s', this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(), this.executor.getCompletedTaskCount(), this.executor.getTaskCount(), this.executor.isShutdown(), this.executor.isTerminated())); try { Thread.sleep(seconds*1000); } catch (InterruptedException e) { e.printStackTrace(); } } }} |
Вот пример реализации пула потоков с использованием ThreadPoolExecutor.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package com.journaldev.threadpool;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Executors;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class WorkerPool { public static void main(String args[]) throws InterruptedException{ //RejectedExecutionHandler implementation RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); //start the monitoring thread MyMonitorThread monitor = new MyMonitorThread(executorPool, 3); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for(int i=0; i<10; i++){ executorPool.execute(new WorkerThread('cmd'+i)); } Thread.sleep(30000); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep(5000); monitor.shutdown(); }} |
Обратите внимание, что при инициализации ThreadPoolExecutor мы сохраняем начальный размер пула как 2, максимальный размер пула до 4 и размер рабочей очереди равным 2. Поэтому, если есть 4 запущенных задачи и отправлено больше задач, рабочая очередь будет содержать только 2 из них. а остальные из них будут обработаны RejectedExecutionHandlerImpl.
Вот вывод вышеуказанной программы, который подтверждает приведенное выше утверждение.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
pool-1-thread-1 Start. Command = cmd0pool-1-thread-4 Start. Command = cmd5cmd6 is rejectedpool-1-thread-3 Start. Command = cmd4pool-1-thread-2 Start. Command = cmd1cmd7 is rejectedcmd8 is rejectedcmd9 is rejected[monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: falsepool-1-thread-4 End.pool-1-thread-1 End.pool-1-thread-2 End.pool-1-thread-3 End.pool-1-thread-1 Start. Command = cmd3pool-1-thread-4 Start. Command = cmd2[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: falsepool-1-thread-1 End.pool-1-thread-4 End.[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true |
Обратите внимание на изменение количества активных, выполненных и общих выполненных задач исполнителя. Мы можем вызвать метод shutdown (), чтобы завершить выполнение всех представленных задач и завершить пул потоков.
Ссылка: Пример пула потоков Java с использованием Executors и ThreadPoolExecutor от нашего партнера по JCG Панкаджа Кумара в блоге Developer Recipes .