Пулы потоков полезны, когда вам нужно одновременно ограничить количество потоков, работающих в вашем приложении. С запуском нового потока связано снижение производительности, и каждому потоку также выделяется часть памяти для его стека и т. Д.
Вместо запуска нового потока для каждой задачи, выполняемой одновременно, задача может быть передана в пул потоков. Как только в пуле есть свободные потоки, задача назначается одному из них и выполняется. Внутренне задачи вставляются в очередь блокировки, из которой выходят из очереди потоки в пуле. Когда новая задача вставляется в очередь, один из незанятых потоков успешно удалит ее из очереди и выполнит. Остальные незанятые потоки в пуле будут заблокированы в ожидании завершения задач.
Пулы потоков часто используются в многопоточных серверах. Каждое соединение, поступающее на сервер через сеть, упаковывается как задача и передается в пул потоков. Потоки в пуле потоков будут обрабатывать запросы на соединения одновременно. Позднее мы расскажем подробнее о реализации многопоточных серверов на Java.
Java 5 поставляется со встроенными пулами потоков в java.util.concurrent
пакете, поэтому вам не нужно реализовывать свой собственный пул потоков. Тем не менее, в любом случае полезно знать немного о реализации пула потоков.
Вот простая реализация пула потоков:
public class ThreadPool { private BlockingQueue taskQueue = null; private List<PoolThread> threads = new ArrayList<PoolThread>(); private boolean isStopped = false; public ThreadPool(int noOfThreads, int maxNoOfTasks){ taskQueue = new BlockingQueue(maxNoOfTasks); for(int i=0; i<noOfThreads; i++){ threads.add(new PoolThread(taskQueue)); } for(PoolThread thread : threads){ thread.start(); } } public void synchronized execute(Runnable task){ if(this.isStopped) throw new IllegalStateException("ThreadPool is stopped"); this.taskQueue.enqueue(task); } public synchronized void stop(){ this.isStopped = true; for(PoolThread thread : threads){ thread.stop(); } }}
public class PoolThread extends Thread { private BlockingQueue taskQueue = null; private boolean isStopped = false; public PoolThread(BlockingQueue queue){ taskQueue = queue; } public void run(){ while(!isStopped()){ try{ Runnable runnable = (Runnable) taskQueue.dequeue(); runnable.run(); } catch(Exception e){ //log or otherwise report exception, //but keep pool thread alive. } } } public synchronized void stop(){ isStopped = true; this.interrupt(); //break pool thread out of dequeue() call. } public synchronized void isStopped(){ return isStopped; }}
Реализация пула потоков состоит из двух частей. ThreadPool
Класс , который является общим интерфейсом для пула потоков, и PoolThread
класс , который реализует резьбу , которые выполняют задачи.
Для выполнения задачи ThreadPool.execute(Runnable r)
вызывается метод с Runnable
реализацией в качестве параметра. Внутренне Runnable
находится в очереди на блокировку , ожидая удаления из очереди.
Runnable
Будет удалено из очереди на холостой ход PoolThread
и казнены. Вы можете увидеть это в PoolThread.run()
методе. После выполнения PoolThread
зацикливается и пытается снова снять задачу, пока не остановится.
Для остановки вызывается ThreadPool
метод ThreadPool.stop()
. Вызванная остановка отмечена внутри isStopped
члена. Затем каждый поток в пуле останавливается вызовом PoolThread.stop()
. Обратите внимание, как execute()
метод будет вызывать IllegalStateException
if execute()
, вызванный после того, stop()
как был вызван.
Потоки остановятся после завершения любой задачи, которую они выполняют в настоящее время. Обратите внимание на this.interrupt()
звонок PoolThread.stop()
. Это гарантирует, что поток, заблокированный в wait()
вызове внутри taskQueue.dequeue()
вызова, прерывает wait()
вызов и оставляет dequeue()
вызов метода InterruptedException
брошенным. Это исключение отлавливается в PoolThread.run()
методе, сообщается, а затем isStopped
проверяется переменная. Так isStopped
как теперь это правда, PoolThread.run()
выход будет завершен , и поток умирает.