Статьи

Как реализовать пул потоков в Java

Поток — это путь выполнения независимой программы. В Java каждый поток расширяет класс java.lang.Thread или реализует java.lang.Runnable.

Многопоточность относится к выполнению двух или более потоков одновременно в рамках одной задачи. В многопоточности каждая задача может иметь много потоков, и эти потоки могут выполняться одновременно, асинхронно или синхронно. Вы можете найти больше информации о потоке и многопоточности в другом уроке, который я написал о многопоточности здесь .

1. Что такое пул потоков

Пул потоков представляет собой группу рабочих потоков, которые выполняют задачи, каждый поток можно использовать много раз. Если новая задача отправляется, когда все потоки активны, они будут ждать в очереди, пока поток не станет доступным. Реализация пула потоков внутренне использует LinkedBlockingQueue для добавления и удаления задач в очередь.
Обычно нам нужна рабочая очередь, объединенная с фиксированной группой рабочих потоков, которая использует wait() и notify() чтобы сигнализировать ожидающим потокам о поступлении новой работы. В следующем примере показана простая рабочая очередь, представляющая собой очередь объектов Runnable . Это общее соглашение для планировщиков и рабочих очередей, хотя API-интерфейс Thread не требует особой необходимости использовать интерфейс Runnable .
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package tutorials;
 
import java.util.concurrent.LinkedBlockingQueue;
 
public class ThreadPool {
    private final int nThreads;
    private final PoolWorker[] threads;
    private final LinkedBlockingQueue queue;
 
    public ThreadPool(int nThreads) {
        this.nThreads = nThreads;
        queue = new LinkedBlockingQueue();
        threads = new PoolWorker[nThreads];
 
        for (int i = 0; i < nThreads; i++) {
            threads[i] = new PoolWorker();
            threads[i].start();
        }
    }
 
    public void execute(Runnable task) {
        synchronized (queue) {
            queue.add(task);
            queue.notify();
        }
    }
 
    private class PoolWorker extends Thread {
        public void run() {
            Runnable task;
 
            while (true) {
                synchronized (queue) {
                    while (queue.isEmpty()) {
                        try {
                            queue.wait();
                        } catch (InterruptedException e) {
                            System.out.println("An error occurred while queue is waiting: " + e.getMessage());
                        }
                    }
                    task = queue.poll();
                }
 
                // If we don't catch RuntimeException,
                // the pool could leak threads
                try {
                    task.run();
                } catch (RuntimeException e) {
                    System.out.println("Thread pool is interrupted due to an issue: " + e.getMessage());
                }
            }
        }
    }
}

При работе с очередью важно использовать блок синхронизации, чтобы контролировать доступ потоков к очереди.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
package tutorials;
 
public class Task implements Runnable {
 
    private int num;
 
    public Task(int n) {
        num = n;
    }
 
    public void run() {
        System.out.println("Task " + num + " is running.");
    }
}
01
02
03
04
05
06
07
08
09
10
11
12
13
import tutorials.Task;
import tutorials.ThreadPool;
 
public class Main {
 
    public static void main(String[] args) {
        ThreadPool pool = new ThreadPool(7);
 
        for (int i = 0; i < 5; i++) {
            Task task = new Task(i);
            pool.execute(task);
        }
}

В приведенном выше примере мы использовали notifyAll() вместо notifyAll() . Потому что notify() имеет более желательные характеристики производительности, чем notifyAll() ; в частности, notify() вызывает гораздо меньше переключений контекста, что важно в серверном приложении. Но важно убедиться, что при использовании notify() в других ситуациях существуют тонкие риски, связанные с использованием notify() , и его целесообразно использовать только при определенных конкретных условиях.

На следующем рисунке показана структура пула потоков в приведенном выше примере.

Фигура 1. Дизайн пула потоков

2. Эффективное использование потоковых пулов

Пул потоков — это мощный механизм структурирования многопоточных приложений, но он не без риска. Приложения, созданные с помощью пулов потоков, могут иметь все те же риски параллелизма, что и любые другие многопоточные приложения, такие как взаимоблокировка , перегрузка ресурсов, ошибки синхронизации или параллелизма, утечка потоков и перегрузка запросов .

Вот несколько моментов:

  • Не ставьте в очередь задачи, которые синхронно ждут другие задачи, так как это может привести к тупику.
  • Если задача требует ожидания ресурса, такого как ввод-вывод, задайте максимальное время ожидания, а затем выполните сбой или возобновите выполнение задачи. Это гарантирует, что некоторый прогресс будет достигнут, освободив поток для другой задачи, которая может успешно завершиться.
  • Эффективно настройте размер пула потоков и поймите, что наличие слишком малого или слишком большого количества потоков может вызвать проблемы. Оптимальный размер пула потоков зависит от количества доступных процессоров и характера задач в рабочей очереди.

3. Вывод

Пул потоков полезен для организации серверных приложений, и очень важно правильно его реализовать, чтобы предотвратить любые проблемы, такие как взаимоблокировка и сложность использования для wait() или notify() . Поэтому рекомендуется рассмотреть возможность использования одного из классов Executor из util.concurrent , такого как ThreadPoolExecutor , а не писать пул потоков с нуля. Если требуется создать потоки для обработки краткосрочных задач, вы можете вместо этого использовать пул потоков.

4. Загрузите исходный код

Это было руководство для пула потоков, чтобы скачать исходный код, нажмите здесь .