Статьи

Пример пула потоков Java с использованием Executors и ThreadPoolExecutor

Пул потоков управляет пулом рабочих потоков, он содержит очередь, которая удерживает задачи, ожидающие выполнения. Пул потоков управляет коллекцией потоков Runnable, а рабочие потоки выполняют Runnable из очереди. java.util.concurrent.Executors обеспечивают реализацию интерфейса java.util.concurrent.Executor для создания пула потоков в java.
Давайте напишем простую программу, чтобы объяснить, как она работает.

Сначала нам нужно иметь класс Runnable.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.journaldev.threadpool;
 
public class WorkerThread implements Runnable {
 
    private String command;
 
    public WorkerThread(String s){
        this.command=s;
    }
 
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+' Start. Command = '+command);
        processCommand();
        System.out.println(Thread.currentThread().getName()+' End.');
    }
 
    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    @Override
    public String toString(){
        return this.command;
    }
}

Вот тестовая программа, в которой мы создаем фиксированный пул потоков из среды Executors.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
package com.journaldev.threadpool;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class SimpleThreadPool {
 
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            Runnable worker = new WorkerThread('' + i);
            executor.execute(worker);
          }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println('Finished all threads');
    }
 
}

В приведенной выше программе мы создаем пул потоков фиксированного размера из 5 рабочих потоков. Затем мы отправляем 10 заданий в этот пул, поскольку размер пула равен 5, он начнет работать на 5 заданиях, а другие задания будут в состоянии ожидания, как только одно из заданий будет завершено, другое задание из очереди ожидания будет быть подхваченным рабочим потоком и выполненным.

Вот вывод вышеуказанной программы.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
pool-1-thread-2 Start. Command = 1
pool-1-thread-4 Start. Command = 3
pool-1-thread-1 Start. Command = 0
pool-1-thread-3 Start. Command = 2
pool-1-thread-5 Start. Command = 4
pool-1-thread-4 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
pool-1-thread-3 End.
pool-1-thread-3 Start. Command = 8
pool-1-thread-2 End.
pool-1-thread-2 Start. Command = 9
pool-1-thread-1 Start. Command = 7
pool-1-thread-5 Start. Command = 6
pool-1-thread-4 Start. Command = 5
pool-1-thread-2 End.
pool-1-thread-4 End.
pool-1-thread-3 End.
pool-1-thread-5 End.
pool-1-thread-1 End.
Finished all threads

Вывод подтверждает, что в пуле есть пять потоков, названных из «pool-1-thread-1? «пул-1-нить-5»? и они несут ответственность за выполнение представленных задач в пул.

Класс Executors обеспечивает простую реализацию ExecutorService с использованием ThreadPoolExecutor, но ThreadPoolExecutor предоставляет гораздо больше возможностей, чем это. Мы можем указать количество потоков, которые будут активны при создании экземпляра ThreadPoolExecutor, и мы можем ограничить размер пула потоков и создать нашу собственную реализацию RejectedExecutionHandler для обработки заданий, которые не помещаются в очередь рабочих.

Вот наша пользовательская реализация интерфейса RejectedExecutionHandler.

01
02
03
04
05
06
07
08
09
10
11
12
13
package com.journaldev.threadpool;
 
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
 
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
 
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + ' is rejected');
    }
 
}

ThreadPoolExecutor предоставляет несколько методов, с помощью которых мы можем узнать текущее состояние исполнителя, размер пула, количество активных потоков и количество задач. Итак, у меня есть поток монитора, который будет печатать информацию об исполнителе через определенный промежуток времени.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.journaldev.threadpool;
 
import java.util.concurrent.ThreadPoolExecutor;
 
public class MyMonitorThread implements Runnable
{
    private ThreadPoolExecutor executor;
 
    private int seconds;
 
    private boolean run=true;
 
    public MyMonitorThread(ThreadPoolExecutor executor, int delay)
    {
        this.executor = executor;
        this.seconds=delay;
    }
 
    public void shutdown(){
        this.run=false;
    }
 
    @Override
    public void run()
    {
        while(run){
                System.out.println(
                    String.format('[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s',
                        this.executor.getPoolSize(),
                        this.executor.getCorePoolSize(),
                        this.executor.getActiveCount(),
                        this.executor.getCompletedTaskCount(),
                        this.executor.getTaskCount(),
                        this.executor.isShutdown(),
                        this.executor.isTerminated()));
                try {
                    Thread.sleep(seconds*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
 
    }
}

Вот пример реализации пула потоков с использованием ThreadPoolExecutor.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.journaldev.threadpool;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class WorkerPool {
 
    public static void main(String args[]) throws InterruptedException{
        //RejectedExecutionHandler implementation
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        //Get the ThreadFactory implementation to use
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //creating the ThreadPoolExecutor
        ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
        //start the monitoring thread
        MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
        Thread monitorThread = new Thread(monitor);
        monitorThread.start();
        //submit work to the thread pool
        for(int i=0; i<10; i++){
            executorPool.execute(new WorkerThread('cmd'+i));
        }
 
        Thread.sleep(30000);
        //shut down the pool
        executorPool.shutdown();
        //shut down the monitor thread
        Thread.sleep(5000);
        monitor.shutdown();
 
    }
}

Обратите внимание, что при инициализации ThreadPoolExecutor мы сохраняем начальный размер пула как 2, максимальный размер пула до 4 и размер рабочей очереди равным 2. Поэтому, если есть 4 запущенных задачи и отправлено больше задач, рабочая очередь будет содержать только 2 из них. а остальные из них будут обработаны RejectedExecutionHandlerImpl.

Вот вывод вышеуказанной программы, который подтверждает приведенное выше утверждение.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
pool-1-thread-1 Start. Command = cmd0
pool-1-thread-4 Start. Command = cmd5
cmd6 is rejected
pool-1-thread-3 Start. Command = cmd4
pool-1-thread-2 Start. Command = cmd1
cmd7 is rejected
cmd8 is rejected
cmd9 is rejected
[monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-4 End.
pool-1-thread-1 End.
pool-1-thread-2 End.
pool-1-thread-3 End.
pool-1-thread-1 Start. Command = cmd3
pool-1-thread-4 Start. Command = cmd2
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
[monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false
pool-1-thread-1 End.
pool-1-thread-4 End.
[monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
[monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true

Обратите внимание на изменение количества активных, выполненных и общих выполненных задач исполнителя. Мы можем вызвать метод shutdown (), чтобы завершить выполнение всех представленных задач и завершить пул потоков.

Ссылка: Пример пула потоков Java с использованием Executors и ThreadPoolExecutor от нашего партнера по JCG Панкаджа Кумара в блоге Developer Recipes .