Статьи

Использование ThreadPoolExecutor для распараллеливания независимых однопоточных задач

Инфраструктура выполнения задач , представленная в Java SE 5.0, является гигантским шагом вперед для упрощения проектирования и разработки многопоточных приложений. Инфраструктура предоставляет средства для управления концепцией задачи , управления жизненными циклами потоков и политикой их выполнения.

В этом сообщении мы опишем мощь, гибкость и простоту этой структуры, демонстрируя простой вариант использования.

Основы

Платформа executor представляет интерфейс для управления выполнением задач: Executor. Executor — это интерфейс, который вы используете для отправки задач, представленный как Runnable экземпляры. Этот интерфейс также изолирует отправку задачи от выполнения задачи : исполнители с разными политиками выполнения публикуют один и тот же интерфейс отправки: если вы измените свою политику выполнения, это изменение не повлияет на логику отправки.

Если вы хотите отправить экземпляр Runnable для выполнения, это так же просто, как:

1
2
Executor exec = …;
exec.execute(runnable);

Пулы потоков

Как было указано в предыдущем разделе, то, как исполнитель будет выполнять ваш исполняемый файл, не указано в контракте Executor: это зависит от конкретного типа исполнителя, который вы используете. Платформа предоставляет несколько различных типов исполнителей, каждый из которых имеет определенную политику выполнения, адаптированную для разных вариантов использования.

Наиболее распространенным типом исполнителей, с которыми вы будете иметь дело, являются исполнители пула потоков , которые являются экземплярами класса ThreadPoolExecutor (и его подклассов). Исполнители пула потоков управляют пулом потоков, то есть пулом рабочих потоков, которые будут выполнять задачи, и рабочей очередью .

Вы наверняка видели концепцию пула в других технологиях. Основным преимуществом использования пула является сокращение накладных расходов на создание ресурсов , повторное использование структур (в данном случае потоков), которые были освобождены после использования. Еще одним неявным преимуществом использования пула является возможность определения размера использования ресурсов : вы можете настроить размеры пула потоков для достижения желаемой нагрузки, не подвергая опасности системные ресурсы.

Инфраструктура предоставляет фабричный класс для пулов потоков, называемых Executors . Используя эту фабрику, вы сможете создавать пулы потоков с различными характеристиками. Часто базовая реализация часто одинакова ( ThreadPoolExecutor ), но класс фабрики помогает быстро настроить пул потоков без использования его более сложного конструктора. Заводские методы:

  • newFixedThreadPool : этот метод возвращает пул потоков, максимальный размер которого фиксирован . Это создаст новые потоки по мере необходимости до максимального настроенного размера. Когда число потоков достигает максимума, пул потоков будет поддерживать постоянный размер.
  • newCachedThreadPool : этот метод возвращает неограниченный пул потоков, то есть пул потоков без максимального размера. Однако этот тип пула потоков разрушит неиспользуемый поток, когда нагрузка уменьшится.
  • newSingleThreadedExecutor : этот метод возвращает исполнителя, который гарантирует, что задачи будут выполняться в одном потоке.
  • newScheduledThreadPool : этот метод возвращает пул потоков фиксированного размера, который поддерживает отложенное и синхронизированное выполнение задач.

Это только начало. Исполнители также предоставляют другие возможности, которые выходят за рамки данного руководства, и я настоятельно рекомендую вам изучить следующие вопросы:

  • Методы управления жизненным циклом, объявленные интерфейсом ExecutorService (такие как shutdown () и awaitTermination ()).
  • Службы завершения для опроса состояния задачи и получения ее возвращаемого значения, если применимо.

Интерфейс ExecutorService особенно важен, поскольку он предоставляет способ отключения пула потоков, что вы почти наверняка захотите сделать чисто . К счастью, интерфейс ExecutorService довольно прост и не требует пояснений, и я рекомендую вам тщательно изучить его JavaDoc.

По сути, вы отправляете сообщение shutdown () в ExecutorService , после чего оно не будет принимать новые отправленные задачи, но продолжит обработку уже помещенных в очередь заданий. Вы можете объединить статус завершения службы исполнителя с помощью isTermination () или дождаться завершения, используя метод awaitTermination (…). Однако метод awaitTermination не будет ждать вечно: вам нужно будет передать максимальное время ожидания в качестве параметра.

Предупреждение : источником ошибок и путаницы является понимание того, почему процесс JVM никогда не завершается. Если вы не завершите работу своих служб-исполнителей, тем самым разрушив базовые потоки, JVM никогда не выйдет: JVM завершится, когда завершится последний поток, не являющийся демоном.

Конфигурирование ThreadPoolExecutor

Если вы решите создать ThreadPoolExecutor вручную вместо использования фабричного класса Executors , вам нужно будет создать и настроить его, используя один из его конструкторов. Самый обширный конструктор этого класса:

1
2
3
4
5
6
7
public ThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAlive,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);

Как видите, вы можете настроить:

  • Размер основного пула (размер, который будет пытаться придерживаться пул потоков).
  • Максимальный размер пула.
  • Время поддержания активности, то есть время, после которого свободная нить может быть сорвана.
  • Рабочая очередь для хранения задач, ожидающих выполнения.
  • Политика, применяемая при отклонении представления задачи.

Ограничение количества задач в очереди

Ограничение числа выполняемых одновременно задач, определение размера пула потоков представляет собой огромное преимущество для вашего приложения и его среды выполнения с точки зрения предсказуемости и стабильности: создание неограниченного потока в конечном итоге приведет к исчерпанию ресурсов времени выполнения и, как следствие, к вашему приложению. серьезные проблемы с производительностью, которые могут привести даже к нестабильности приложения.

Это решение только одной части проблемы: вы ограничиваете количество выполняемых задач, но не ограничиваете количество заданий, которые можно отправить и поставить в очередь для последующего выполнения. Приложение будет испытывать нехватку ресурсов позже, но оно в конечном итоге будет испытывать это, если скорость отправки постоянно превышает скорость выполнения.

Решение этой проблемы:

  • Предоставление очереди блокировки исполнителю для удержания ожидающих задач. В случае заполнения очереди отправленное задание будет «отклонено».
  • RejectedExecutionHandler вызывается, когда отправка задачи отклоняется, и поэтому отклоненный глагол был указан в предыдущем элементе. Вы можете реализовать собственную политику отказа или использовать одну из встроенных политик, предоставляемых платформой.

В политиках отклонения по умолчанию исполнитель создает исключение RejectedExecutionException . Однако другие встроенные политики позволяют вам:

  • Откажитесь от работы молча.
  • Откажитесь от самой старой работы и попробуйте повторно отправить последнюю.
  • Выполните отклоненную задачу в потоке вызывающего.

Когда и почему можно использовать такую ​​конфигурацию пула потоков? Давайте посмотрим на пример.

Пример: распараллеливание независимых однопоточных задач

Недавно мне позвонили, чтобы решить проблему со старой работой, которую давным-давно выполнял мой клиент. По сути, задание состоит из компонента, который ожидает событий файловой системы в наборе иерархий каталогов. Всякий раз, когда происходит событие, файл должен быть обработан. Обработка файлов выполняется проприетарным однопоточным процессом. По правде говоря, по своей природе, даже если бы я мог, я не смог бы, если бы я мог распараллелить это. Частота прибытия событий очень высока в течение части дня, и нет необходимости обрабатывать файлы в режиме реального времени, они просто обрабатываются до следующего дня.

Текущая реализация представляла собой сочетание технологий, включая сценарий оболочки UNIX, который отвечал за сканирование огромных иерархий каталогов, чтобы определить, где были применены изменения. Когда эта реализация была внедрена, число ядер в среде выполнения было равно двум. Кроме того, частота событий была довольно низкой: в настоящее время они составляют порядка миллионов , что в общей сложности составляет от 1 до 2 терабайт необработанных данных для обработки.

Серверы, на которых клиент выполняет эти процессы, в настоящее время представляют собой двенадцать основных компьютеров: огромная возможность распараллелить эти старые однопоточные задачи. У нас есть в основном все ингредиенты для рецепта, нам просто нужно решить, как его построить и настроить. Перед тем, как написать какой-либо код, потребовались некоторые мысли, чтобы понять природу нагрузки, и вот те ограничения, которые я обнаружил:

  • Периодически проверяется огромное количество файлов: каждый каталог содержит от одного до двух миллионов файлов.
  • Алгоритм сканирования очень быстрый и может быть распараллелен.
  • Обработка файла займет не менее 1 секунды, с пиками даже 2 или 3 секунды.
  • При обработке файла не существует другого узкого места, кроме процессора.
  • Загрузка ЦП должна быть настраиваемой, чтобы использовать другой профиль нагрузки в зависимости от времени суток.

Поэтому мне понадобится пул потоков, размер которого определяется профилем загрузки, активным в момент запуска процесса. Я склонен к созданию исполнителя пула потоков фиксированного размера, настроенного в соответствии с политикой загрузки. Поскольку поток обработки связан только с процессором, его использование ядра составляет 100% и не требует никаких других ресурсов, политику загрузки очень легко рассчитать: достаточно взять количество ядер, доступных в среде обработки, и уменьшить его с помощью нагрузки. фактор, который активен в этот момент (и убедитесь, что в момент пика используется хотя бы одно ядро):

1
2
3
int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);

Затем мне нужно создать ThreadPoolExecutor, используя очередь блокировки, чтобы ограничить количество отправленных задач. Почему? Хорошо: алгоритмы сканирования каталогов очень быстрые и генерируют огромное количество файлов для очень быстрой обработки. Насколько огромный? Трудно предсказать, и его изменчивость довольно высока. Я не собираюсь позволять внутренней очереди моего исполнителя заполняться без разбора объектами, представляющими мои задачи (которые включают довольно большой файловый дескриптор). Я предпочитаю, чтобы исполнитель отклонял файлы, когда очередь заполняется.

Также я буду использовать ThreadPoolExecutor.CallerRunsPolicy в качестве политики отклонения. Почему? Ну, потому что, когда очередь заполнена и когда потоки в пулах заняты обработкой файла, у меня будет поток, который отправляет задачу, выполняющую его. Таким образом, сканирование прекращает обработку файла и возобновляет сканирование, как только завершает выполнение текущей задачи.

Вот код, который создает исполнителя:

1
2
3
4
5
6
7
8
ExecutorService executorService =
  new ThreadPoolExecutor(
    maxThreads, // core thread pool size
    maxThreads, // maximum thread pool size
    1, // time to wait before resizing pool
    TimeUnit.MINUTES,
    new ArrayBlockingQueue<Runnable>(maxThreads, true),
    new ThreadPoolExecutor.CallerRunsPolicy());

Скелет кода следующий (значительно упрощенный):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {
  File currentDir = dirsToProcess.pop();
 
 
  // listing children
  File[] children = currentDir.listFiles();
 
 
  // processing children
  for (final File currentFile : children) {
  // if it's a directory, defer processing
  if (currentFile.isDirectory()) {
    dirsToProcess.add(currentFile);
    continue;
  }
 
 
  executorService.submit(new Runnable() {
    @Override
    public void run() {
      try {
        // if it's a file, process it
        new ConvertTask(currentFile).perform();
      } catch (Exception ex) {
        // error management logic
      }
    }
  });
}
 
 
// ...
         
// wait for all of the executor threads to finish
executorService.shutdown();
         
try {
  if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
    // pool didn't terminate after the first try
    executorService.shutdownNow();
  }
 
 
  if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
    // pool didn't terminate after the second try
  }
} catch (InterruptedException ex) {
  executorService.shutdownNow();
  Thread.currentThread().interrupt();
}

Вывод

Как видите, API параллелизма Java очень прост в использовании, очень гибкий и чрезвычайно мощный. Несколько лет назад я бы приложил гораздо больше усилий, чтобы написать такую ​​простую программу. Таким образом, я мог бы быстро решить проблему масштабируемости, вызванную устаревшим однопоточным компонентом, за считанные часы.

Ссылка: Использование ThreadPoolExecutor для распараллеливания независимых однопоточных задач от нашего партнера по JCG Энрико Кризостомо из The Grey Blog .

Связанные фрагменты:

Статьи по Теме :