Статьи

Учебник по параллелизму Java: Пулы потоков

Пулы потоков полезны, когда вам нужно одновременно ограничить количество потоков, работающих в вашем приложении. С запуском нового потока связано снижение производительности, и каждому потоку также выделяется часть памяти для его стека и т. Д.

Вместо запуска нового потока для каждой задачи, выполняемой одновременно, задача может быть передана в пул потоков. Как только в пуле есть свободные потоки, задача назначается одному из них и выполняется. Внутренне задачи вставляются в очередь блокировки, из которой выходят из очереди потоки в пуле. Когда новая задача вставляется в очередь, один из незанятых потоков успешно удалит ее из очереди и выполнит. Остальные незанятые потоки в пуле будут заблокированы в ожидании завершения задач.

Пулы потоков часто используются в многопоточных серверах. Каждое соединение, поступающее на сервер через сеть, упаковывается как задача и передается в пул потоков. Потоки в пуле потоков будут обрабатывать запросы на соединения одновременно. Позднее мы расскажем подробнее о реализации многопоточных серверов на Java.

Java 5 поставляется со встроенными пулами потоков в java.util.concurrentпакете, поэтому вам не нужно реализовывать свой собственный пул потоков. Тем не менее, в любом случае полезно знать немного о реализации пула потоков.

Вот простая реализация пула потоков:

public class ThreadPool {  private BlockingQueue taskQueue = null;  private List<PoolThread> threads = new ArrayList<PoolThread>();  private boolean isStopped = false;  public ThreadPool(int noOfThreads, int maxNoOfTasks){    taskQueue = new BlockingQueue(maxNoOfTasks);    for(int i=0; i<noOfThreads; i++){      threads.add(new PoolThread(taskQueue));    }    for(PoolThread thread : threads){      thread.start();    }  }  public void synchronized execute(Runnable task){    if(this.isStopped) throw      new IllegalStateException("ThreadPool is stopped");    this.taskQueue.enqueue(task);  }  public synchronized void stop(){    this.isStopped = true;    for(PoolThread thread : threads){      thread.stop();    }  }}
public class PoolThread extends Thread {  private BlockingQueue taskQueue = null;  private boolean       isStopped = false;  public PoolThread(BlockingQueue queue){    taskQueue = queue;  }  public void run(){    while(!isStopped()){      try{        Runnable runnable = (Runnable) taskQueue.dequeue();        runnable.run();      } catch(Exception e){        //log or otherwise report exception,        //but keep pool thread alive.      }    }  }  public synchronized void stop(){    isStopped = true;    this.interrupt(); //break pool thread out of dequeue() call.  }  public synchronized void isStopped(){    return isStopped;  }}

Реализация пула потоков состоит из двух частей. ThreadPoolКласс , который является общим интерфейсом для пула потоков, и PoolThreadкласс , который реализует резьбу , которые выполняют задачи.

Для выполнения задачи ThreadPool.execute(Runnable r)вызывается метод с Runnableреализацией в качестве параметра. Внутренне Runnableнаходится в очереди на блокировку , ожидая удаления из очереди.

RunnableБудет удалено из очереди на холостой ход PoolThreadи казнены. Вы можете увидеть это в PoolThread.run()методе. После выполнения PoolThreadзацикливается и пытается снова снять задачу, пока не остановится.

Для остановки вызывается ThreadPoolметод ThreadPool.stop(). Вызванная остановка отмечена внутри isStoppedчлена. Затем каждый поток в пуле останавливается вызовом PoolThread.stop(). Обратите внимание, как execute()метод будет вызывать IllegalStateExceptionif execute(), вызванный после того, stop()как был вызван.

Потоки остановятся после завершения любой задачи, которую они выполняют в настоящее время. Обратите внимание на this.interrupt()звонок PoolThread.stop(). Это гарантирует, что поток, заблокированный в wait()вызове внутри taskQueue.dequeue()вызова, прерывает wait()вызов и оставляет dequeue()вызов метода InterruptedExceptionброшенным. Это исключение отлавливается в PoolThread.run()методе, сообщается, а затем isStoppedпроверяется переменная. Так isStoppedкак теперь это правда, PoolThread.run()выход будет завершен , и поток умирает.