Пулы потоков полезны, когда вам нужно одновременно ограничить количество потоков, работающих в вашем приложении. С запуском нового потока связано снижение производительности, и каждому потоку также выделяется часть памяти для его стека и т. Д.
Вместо запуска нового потока для каждой задачи, выполняемой одновременно, задача может быть передана в пул потоков. Как только в пуле есть свободные потоки, задача назначается одному из них и выполняется. Внутренне задачи вставляются в очередь блокировки, из которой выходят из очереди потоки в пуле. Когда новая задача вставляется в очередь, один из незанятых потоков успешно удалит ее из очереди и выполнит. Остальные незанятые потоки в пуле будут заблокированы в ожидании завершения задач.
Пулы потоков часто используются в многопоточных серверах. Каждое соединение, поступающее на сервер через сеть, упаковывается как задача и передается в пул потоков. Потоки в пуле потоков будут обрабатывать запросы на соединения одновременно. Позднее мы расскажем подробнее о реализации многопоточных серверов на Java.
Java 5 поставляется со встроенными пулами потоков в java.util.concurrentпакете, поэтому вам не нужно реализовывать свой собственный пул потоков. Тем не менее, в любом случае полезно знать немного о реализации пула потоков.
Вот простая реализация пула потоков:
public class ThreadPool { private BlockingQueue taskQueue = null; private List<PoolThread> threads = new ArrayList<PoolThread>(); private boolean isStopped = false; public ThreadPool(int noOfThreads, int maxNoOfTasks){ taskQueue = new BlockingQueue(maxNoOfTasks); for(int i=0; i<noOfThreads; i++){ threads.add(new PoolThread(taskQueue)); } for(PoolThread thread : threads){ thread.start(); } } public void synchronized execute(Runnable task){ if(this.isStopped) throw new IllegalStateException("ThreadPool is stopped"); this.taskQueue.enqueue(task); } public synchronized void stop(){ this.isStopped = true; for(PoolThread thread : threads){ thread.stop(); } }}
public class PoolThread extends Thread { private BlockingQueue taskQueue = null; private boolean isStopped = false; public PoolThread(BlockingQueue queue){ taskQueue = queue; } public void run(){ while(!isStopped()){ try{ Runnable runnable = (Runnable) taskQueue.dequeue(); runnable.run(); } catch(Exception e){ //log or otherwise report exception, //but keep pool thread alive. } } } public synchronized void stop(){ isStopped = true; this.interrupt(); //break pool thread out of dequeue() call. } public synchronized void isStopped(){ return isStopped; }}
Реализация пула потоков состоит из двух частей. ThreadPoolКласс , который является общим интерфейсом для пула потоков, и PoolThreadкласс , который реализует резьбу , которые выполняют задачи.
Для выполнения задачи ThreadPool.execute(Runnable r)вызывается метод с Runnableреализацией в качестве параметра. Внутренне Runnableнаходится в очереди на блокировку , ожидая удаления из очереди.
RunnableБудет удалено из очереди на холостой ход PoolThreadи казнены. Вы можете увидеть это в PoolThread.run()методе. После выполнения PoolThreadзацикливается и пытается снова снять задачу, пока не остановится.
Для остановки вызывается ThreadPoolметод ThreadPool.stop(). Вызванная остановка отмечена внутри isStoppedчлена. Затем каждый поток в пуле останавливается вызовом PoolThread.stop(). Обратите внимание, как execute()метод будет вызывать IllegalStateExceptionif execute(), вызванный после того, stop()как был вызван.
Потоки остановятся после завершения любой задачи, которую они выполняют в настоящее время. Обратите внимание на this.interrupt()звонок PoolThread.stop(). Это гарантирует, что поток, заблокированный в wait()вызове внутри taskQueue.dequeue()вызова, прерывает wait()вызов и оставляет dequeue()вызов метода InterruptedExceptionброшенным. Это исключение отлавливается в PoolThread.run()методе, сообщается, а затем isStoppedпроверяется переменная. Так isStoppedкак теперь это правда, PoolThread.run()выход будет завершен , и поток умирает.