Пул потоков управляет пулом рабочих потоков, он содержит очередь, которая удерживает задачи, ожидающие выполнения. Пул потоков управляет коллекцией потоков Runnable, а рабочие потоки выполняют Runnable из очереди. java.util.concurrent.Executors обеспечивают реализацию интерфейса java.util.concurrent.Executor для создания пула потоков в java.
Давайте напишем простую программу, чтобы объяснить, как она работает.
Сначала нам нужно иметь класс Runnable.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package com.journaldev.threadpool; public class WorkerThread implements Runnable { private String command; public WorkerThread(String s){ this .command=s; } @Override public void run() { System.out.println(Thread.currentThread().getName()+ ' Start. Command = ' +command); processCommand(); System.out.println(Thread.currentThread().getName()+ ' End.' ); } private void processCommand() { try { Thread.sleep( 5000 ); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString(){ return this .command; } } |
Вот тестовая программа, в которой мы создаем фиксированный пул потоков из среды Executors.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
package com.journaldev.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SimpleThreadPool { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool( 5 ); for ( int i = 0 ; i < 10 ; i++) { Runnable worker = new WorkerThread( '' + i); executor.execute(worker); } executor.shutdown(); while (!executor.isTerminated()) { } System.out.println( 'Finished all threads' ); } } |
В приведенной выше программе мы создаем пул потоков фиксированного размера из 5 рабочих потоков. Затем мы отправляем 10 заданий в этот пул, поскольку размер пула равен 5, он начнет работать на 5 заданиях, а другие задания будут в состоянии ожидания, как только одно из заданий будет завершено, другое задание из очереди ожидания будет быть подхваченным рабочим потоком и выполненным.
Вот вывод вышеуказанной программы.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
pool-1-thread-2 Start. Command = 1 pool-1-thread-4 Start. Command = 3 pool-1-thread-1 Start. Command = 0 pool-1-thread-3 Start. Command = 2 pool-1-thread-5 Start. Command = 4 pool-1-thread-4 End. pool-1-thread-5 End. pool-1-thread-1 End. pool-1-thread-3 End. pool-1-thread-3 Start. Command = 8 pool-1-thread-2 End. pool-1-thread-2 Start. Command = 9 pool-1-thread-1 Start. Command = 7 pool-1-thread-5 Start. Command = 6 pool-1-thread-4 Start. Command = 5 pool-1-thread-2 End. pool-1-thread-4 End. pool-1-thread-3 End. pool-1-thread-5 End. pool-1-thread-1 End. Finished all threads |
Вывод подтверждает, что в пуле есть пять потоков, названных из «pool-1-thread-1? «пул-1-нить-5»? и они несут ответственность за выполнение представленных задач в пул.
Класс Executors обеспечивает простую реализацию ExecutorService с использованием ThreadPoolExecutor, но ThreadPoolExecutor предоставляет гораздо больше возможностей, чем это. Мы можем указать количество потоков, которые будут активны при создании экземпляра ThreadPoolExecutor, и мы можем ограничить размер пула потоков и создать нашу собственную реализацию RejectedExecutionHandler для обработки заданий, которые не помещаются в очередь рабочих.
Вот наша пользовательская реализация интерфейса RejectedExecutionHandler.
01
02
03
04
05
06
07
08
09
10
11
12
13
|
package com.journaldev.threadpool; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + ' is rejected' ); } } |
ThreadPoolExecutor предоставляет несколько методов, с помощью которых мы можем узнать текущее состояние исполнителя, размер пула, количество активных потоков и количество задач. Итак, у меня есть поток монитора, который будет печатать информацию об исполнителе через определенный промежуток времени.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.journaldev.threadpool; import java.util.concurrent.ThreadPoolExecutor; public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run= true ; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this .executor = executor; this .seconds=delay; } public void shutdown(){ this .run= false ; } @Override public void run() { while (run){ System.out.println( String.format( '[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s' , this .executor.getPoolSize(), this .executor.getCorePoolSize(), this .executor.getActiveCount(), this .executor.getCompletedTaskCount(), this .executor.getTaskCount(), this .executor.isShutdown(), this .executor.isTerminated())); try { Thread.sleep(seconds* 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } |
Вот пример реализации пула потоков с использованием ThreadPoolExecutor.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package com.journaldev.threadpool; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class WorkerPool { public static void main(String args[]) throws InterruptedException{ //RejectedExecutionHandler implementation RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor ThreadPoolExecutor executorPool = new ThreadPoolExecutor( 2 , 4 , 10 , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( 2 ), threadFactory, rejectionHandler); //start the monitoring thread MyMonitorThread monitor = new MyMonitorThread(executorPool, 3 ); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for ( int i= 0 ; i< 10 ; i++){ executorPool.execute( new WorkerThread( 'cmd' +i)); } Thread.sleep( 30000 ); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep( 5000 ); monitor.shutdown(); } } |
Обратите внимание, что при инициализации ThreadPoolExecutor мы сохраняем начальный размер пула как 2, максимальный размер пула до 4 и размер рабочей очереди равным 2. Поэтому, если есть 4 запущенных задачи и отправлено больше задач, рабочая очередь будет содержать только 2 из них. а остальные из них будут обработаны RejectedExecutionHandlerImpl.
Вот вывод вышеуказанной программы, который подтверждает приведенное выше утверждение.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
pool-1-thread-1 Start. Command = cmd0 pool-1-thread-4 Start. Command = cmd5 cmd6 is rejected pool-1-thread-3 Start. Command = cmd4 pool-1-thread-2 Start. Command = cmd1 cmd7 is rejected cmd8 is rejected cmd9 is rejected [monitor] [0 /2 ] Active: 4, Completed: 0, Task: 6, isShutdown: false , isTerminated: false [monitor] [4 /2 ] Active: 4, Completed: 0, Task: 6, isShutdown: false , isTerminated: false pool-1-thread-4 End. pool-1-thread-1 End. pool-1-thread-2 End. pool-1-thread-3 End. pool-1-thread-1 Start. Command = cmd3 pool-1-thread-4 Start. Command = cmd2 [monitor] [4 /2 ] Active: 2, Completed: 4, Task: 6, isShutdown: false , isTerminated: false [monitor] [4 /2 ] Active: 2, Completed: 4, Task: 6, isShutdown: false , isTerminated: false pool-1-thread-1 End. pool-1-thread-4 End. [monitor] [4 /2 ] Active: 0, Completed: 6, Task: 6, isShutdown: false , isTerminated: false [monitor] [2 /2 ] Active: 0, Completed: 6, Task: 6, isShutdown: false , isTerminated: false [monitor] [2 /2 ] Active: 0, Completed: 6, Task: 6, isShutdown: false , isTerminated: false [monitor] [2 /2 ] Active: 0, Completed: 6, Task: 6, isShutdown: false , isTerminated: false [monitor] [2 /2 ] Active: 0, Completed: 6, Task: 6, isShutdown: false , isTerminated: false [monitor] [2 /2 ] Active: 0, Completed: 6, Task: 6, isShutdown: false , isTerminated: false [monitor] [0 /2 ] Active: 0, Completed: 6, Task: 6, isShutdown: true , isTerminated: true [monitor] [0 /2 ] Active: 0, Completed: 6, Task: 6, isShutdown: true , isTerminated: true |
Обратите внимание на изменение количества активных, выполненных и общих выполненных задач исполнителя. Мы можем вызвать метод shutdown (), чтобы завершить выполнение всех представленных задач и завершить пул потоков.
Ссылка: Пример пула потоков Java с использованием Executors и ThreadPoolExecutor от нашего партнера по JCG Панкаджа Кумара в блоге Developer Recipes .