Статьи

Потоковые истории: о надежных пулах потоков

Еще один блог из моей серии потоков. На этот раз речь идет о пулах потоков, в частности о надежных настройках пула потоков. В пуле потоков Java реализован класс ThreadPoolExecutor, представленный в Java 5. Javadoc этого класса очень хорошо организован. Поэтому я избавляю меня от усилий, чтобы дать общее представление здесь. По сути, ThreadPoolExecutor создает и управляет потоками, обрабатывающими выполняемые задачи, которые были отправлены в рабочую очередь произвольным клиентом. Это механизм для асинхронного выполнения работы, который является важной возможностью во времена многоядерных машин и облачных вычислений.

Чтобы быть полезным в широком диапазоне контекстов, ThreadPoolExecutor предоставляет некоторые настраиваемые параметры. Это хорошо, но это также оставляет нам, разработчикам, решение выбрать правильные настройки для наших конкретных случаев. Вот самый большой конструктор для ThreadPoolExecutor.

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) { ... }

Типы пула потоков

Некоторые из параметров, показанных выше, очень разумны с точки зрения потребления ресурсов и стабильности системы. Основываясь на различных настройках параметров конструктора, можно выделить несколько основных категорий пулов потоков. Вот некоторые настройки пула потоков по умолчанию, предлагаемые классом Executors.

01
02
03
04
05
06
07
08
09
10
11
public static ExecutorService newCachedThreadPool() {
   return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                 60L, TimeUnit.SECONDS,
                                 new SynchronousQueue<Runnable>());
}
 
public static ExecutorService newFixedThreadPool(int nThreads) {
   return new ThreadPoolExecutor(nThreads, nThreads,
                                 0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>());
}

В «кэшированном пуле потоков» количество потоков не ограничено. Это вызвано maxPoolSize Integer.MAX_VALUE в сочетании с SynchronousQueue. Если вы отправите задачи в пакете в этот пул потоков, он, скорее всего, создаст поток для каждой отдельной задачи. В этом сценарии созданные потоки завершаются, когда они простаивают в течение 60 секунд. Во втором примере показан «фиксированный пул потоков», где для MaximumPoolSize задано определенное фиксированное значение. Количество потоков в пулах никогда не будет превышать это значение. Если задачи выполняются в пакете и все потоки заняты, то они будут поставлены в очередь в рабочей очереди (здесь LinkedBlockingQueue). Потоки в этом фиксированном пуле потоков никогда не умирают. Недостаток неограниченных пулов очевиден: обе настройки могут вызвать проблемы с памятью JVM (вы получаете OutOfMemoryErrors — если вам повезет).

Давайте посмотрим на некоторые настройки пула потоков:

01
02
03
04
05
06
07
08
09
10
11
ThreadPoolExecutor pool =
       new ThreadPoolExecutor(0, 50,
                              60, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>());
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
 
ThreadPoolExecutor pool =
       new ThreadPoolExecutor(50, 50,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>(100000));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Первый фрагмент создает пул потоков, с числом потоков которого ограничено значение 50. Если задачи поступают в пакете и все потоки заняты, то вызов метода ThreadPoolExecutor.execute () теперь будет отклонен путем выдачи RejectedExecutionException. Часто это не то, что я обычно хочу, поэтому я изменяю политику насыщения, устанавливая обработчик отклоненного выполнения в CallerRunsPolicy. Эта политика отодвигает работу обратно к абоненту. То есть клиентский поток, выдавший задачу для асинхронного выполнения, теперь будет запускать задачу синхронно. Вы можете разработать собственную политику насыщения, внедрив собственный RejectedExecutionHandler. Второй фрагмент создает фиксированный пул потоков с 50 потоками и рабочей очередью, которая ограничена значением 100000 задач. Если рабочая очередь заполнена, политика насыщения возвращает работу клиенту. Кэшированный пул создает потоки по требованию и прерывает потоки, если они простаивают в течение 60 секунд. Фиксированный пул поддерживает потоки живыми.

Границы пула потоков

Как показано выше, существует два основных подхода к определению пулов потоков: ограниченные и неограниченные пулы потоков. Неограниченные пулы потоков, такие как стандартные классы класса Executors, работают нормально, если вы не отправляете слишком много задач в пакете. Если это произойдет, неограниченные пулы потоков могут повредить стабильности вашей системы. Либо слишком много потоков создается в кэшированном пуле потоков, либо слишком много задач ставятся в очередь в фиксированном пуле потоков. Письмо сложнее достичь, но все же возможно. Для производственного использования может быть лучше установить границы для некоторых значимых значений, как в последних двух настройках пула потоков. Поскольку может быть сложно определить эти «значимые границы», я разработал небольшую программу, которая работает для меня.

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
 * A class that calculates the optimal thread pool boundaries. It takes the desired target utilization and the desired
 * work queue memory consumption as input and retuns thread count and work queue capacity.
 *
 * @author Niklas Schlimm
 *
 */
public abstract class PoolSizeCalculator {
 
 /**
  * The sample queue size to calculate the size of a single {@link Runnable} element.
  */
 private final int SAMPLE_QUEUE_SIZE = 1000;
 
 /**
  * Accuracy of test run. It must finish within 20ms of the testTime otherwise we retry the test. This could be
  * configurable.
  */
 private final int EPSYLON = 20;
 
 /**
  * Control variable for the CPU time investigation.
  */
 private volatile boolean expired;
 
 /**
  * Time (millis) of the test run in the CPU time calculation.
  */
 private final long testtime = 3000;
 
 /**
  * Calculates the boundaries of a thread pool for a given {@link Runnable}.
  *
  * @param targetUtilization
  *            the desired utilization of the CPUs (0 <= targetUtilization <= 1)
  * @param targetQueueSizeBytes
  *            the desired maximum work queue size of the thread pool (bytes)
  */
 protected void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) {
  calculateOptimalCapacity(targetQueueSizeBytes);
  Runnable task = creatTask();
  start(task);
  start(task); // warm up phase
  long cputime = getCurrentThreadCPUTime();
  start(task); // test intervall
  cputime = getCurrentThreadCPUTime() - cputime;
  long waittime = (testtime * 1000000) - cputime;
  calculateOptimalThreadCount(cputime, waittime, targetUtilization);
 }
 
 private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) {
  long mem = calculateMemoryUsage();
  BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(mem), RoundingMode.HALF_UP);
  System.out.println("Target queue memory usage (bytes): " + targetQueueSizeBytes);
  System.out.println("createTask() produced " + creatTask().getClass().getName() + " which took " + mem
    + " bytes in a queue");
  System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem);
  System.out.println("* Recommended queue capacity (bytes): " + queueCapacity);
 }
 
 /**
  * Brian Goetz' optimal thread count formula, see 'Java Concurrency in Practice' (chapter 8.2)
  *
  * @param cpu
  *            cpu time consumed by considered task
  * @param wait
  *            wait time of considered task
  * @param targetUtilization
  *            target utilization of the system
  */
 private void calculateOptimalThreadCount(long cpu, long wait, BigDecimal targetUtilization) {
  BigDecimal waitTime = new BigDecimal(wait);
  BigDecimal computeTime = new BigDecimal(cpu);
  BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime().availableProcessors());
  BigDecimal optimalthreadcount = numberOfCPU.multiply(targetUtilization).multiply(
    new BigDecimal(1).add(waitTime.divide(computeTime, RoundingMode.HALF_UP)));
  System.out.println("Number of CPU: " + numberOfCPU);
  System.out.println("Target utilization: " + targetUtilization);
  System.out.println("Elapsed time (nanos): " + (testtime * 1000000));
  System.out.println("Compute time (nanos): " + cpu);
  System.out.println("Wait time (nanos): " + wait);
  System.out.println("Formula: " + numberOfCPU + " * " + targetUtilization + " * (1 + " + waitTime + " / "
    + computeTime + ")");
  System.out.println("* Optimal thread count: " + optimalthreadcount);
 }
 
 /**
  * Runs the {@link Runnable} over a period defined in {@link #testtime}. Based on Heinz Kabbutz' ideas
  *
  * @param task
  *            the runnable under investigation
  */
 public void start(Runnable task) {
  long start = 0;
  int runs = 0;
  do {
   if (++runs > 5) {
    throw new IllegalStateException("Test not accurate");
   }
   expired = false;
   start = System.currentTimeMillis();
   Timer timer = new Timer();
   timer.schedule(new TimerTask() {
    public void run() {
     expired = true;
    }
   }, testtime);
   while (!expired) {
    task.run();
   }
   start = System.currentTimeMillis() - start;
   timer.cancel();
  } while (Math.abs(start - testtime) > EPSYLON);
  collectGarbage(3);
 }
 
 private void collectGarbage(int times) {
  for (int i = 0; i < times; i++) {
   System.gc();
   try {
    Thread.sleep(10);
   } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    break;
   }
  }
 }
 
 /**
  * Calculates the memory usage of a single element in a work queue. Based on Heinz Kabbutz' ideas
  *
  * @return memory usage of a single {@link Runnable} element in the thread pools work queue
  */
 public long calculateMemoryUsage() {
  BlockingQueue<Runnable> queue = createWorkQueue();
  for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
   queue.add(creatTask());
  }
  long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
  long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
  queue = null;
  collectGarbage(15);
  mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
  queue = createWorkQueue();
  for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
   queue.add(creatTask());
  }
  collectGarbage(15);
  mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
  return (mem1 - mem0) / SAMPLE_QUEUE_SIZE;
 }
 
 /**
  * Create your runnable task here.
  *
  * @return an instance of your runnable task under investigation
  */
 protected abstract Runnable creatTask();
 
 /**
  * Return an instance of the queue used in the thread pool.
  *
  * @return queue instance
  */
 protected abstract BlockingQueue<Runnable> createWorkQueue();
 
 /**
  * Calculate current cpu time. Various frameworks may be used here, depending on the operating system in use. (e.g.
  * http://www.hyperic.com/products/sigar). The more accurate the CPU time measurement, the more accurate the results
  * for thread count boundaries.
  *
  * @return current cpu time of current thread
  */
 protected abstract long getCurrentThreadCPUTime();
 
}

Программа найдет идеальные границы пула потоков для максимальной емкости вашей рабочей очереди и необходимого количества потоков. Алгоритмы основаны на работах Брайана Гетца и доктора Хайнца Кабуца, ссылки на которые вы можете найти в Javadoc. Расчет емкости, необходимой вашей рабочей очереди в фиксированном пуле потоков, относительно прост. Все, что вам нужно, это желаемый целевой размер рабочей очереди в байтах, деленный на средний размер отправленных задач в байтах. К сожалению, вычисление максимального количества потоков * не * точная наука. Однако, если вы используете формулы в программе, вы избегаете вредных крайностей слишком больших рабочих очередей и слишком большого количества потоков. Расчет идеального размера пула зависит от времени ожидания, чтобы вычислить соотношение времени вашей задачи. Чем больше время ожидания, тем больше потоков требуется для достижения заданного использования. PoolSizeCalculator требует желаемого целевого использования и желаемого максимального потребления памяти рабочей очереди в качестве входных данных. Основываясь на исследовании размеров объектов и времени процессора, он возвращает идеальные настройки для максимального числа потоков и емкости рабочей очереди в пуле потоков.

Давайте рассмотрим пример. В следующем фрагменте показано, как использовать PoolSizeCalculator в сценарии с желаемым использованием 1,0 (= 100%) и максимальным размером рабочей очереди 100 000 байт.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyPoolSizeCalculator extends PoolSizeCalculator {
 
 public static void main(String[] args) throws InterruptedException,
                                               InstantiationException,
                                               IllegalAccessException,
                                               ClassNotFoundException {
  MyThreadSizeCalculator calculator = new MyThreadSizeCalculator();
  calculator.calculateBoundaries(new BigDecimal(1.0),
                                 new BigDecimal(100000));
 }
 
 protected long getCurrentThreadCPUTime() {
  return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
 }
 
 protected Runnable creatTask() {
  return new AsynchronousTask(0, "IO", 1000000);
 }
  
 protected BlockingQueue<Runnable> createWorkQueue() {
  return new LinkedBlockingQueue<>();
 }
 
}

MyPoolSizeCalculator расширяет абстрактный PoolSizeCalculator. Вам нужно реализовать три метода шаблона: getCurrentThreadCPUTime, creatTask, createWorkQueue. Фрагмент применяет стандартные расширения управления Java для измерения времени процессора (строка 13). Если JMX недостаточно точен, то могут быть рассмотрены другие платформы (например, SIGAR API). Пулы потоков работают лучше всего, когда задачи однородны и независимы. Поэтому метод createTask создает экземпляр задачи Runnable одного типа (строка 17). Эта задача будет исследована для расчета отношения времени ожидания к времени процессора. Наконец, мне нужно создать экземпляр рабочей очереди для расчета использования памяти представленной задачей (строка 21). Выходные данные этой программы показывают идеальные параметры для емкости рабочей очереди и максимального размера пула (количества потоков). Это результаты моего интенсивного AsynchronousTask ввода-вывода на двухъядерной машине.

01
02
03
04
05
06
07
08
09
10
11
Target queue memory usage (bytes): 100000 
createTask() produced com.schlimm.java7.nio.threadpools.AsynchronousTask which took 40 bytes in a queue 
Formula: 100000 / 40 
* Recommended queue capacity (bytes): 2500 
Number of CPU: 2 
Target utilization: 1.0 
Elapsed time (nanos): 3000000000 
Compute time (nanos): 906250000 
Wait time (nanos): 2093750000 
Formula: 2 * 1.0 * (1 + 2093750000 / 906250000) 
* Optimal thread count: 6.0 

«Рекомендуемая емкость очереди» и «оптимальное количество потоков» являются важными значениями. Идеальная настройка для моей AsynchronousTask была бы следующей:

1
2
3
4
5
ThreadPoolExecutor pool =
       new ThreadPoolExecutor(6, 6,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>(2500));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Используя эти настройки, ваша рабочая очередь не может быть больше, чем требуемые 100000 байтов. И поскольку желаемое использование равно 1,0 (100%), нет смысла делать пул больше, чем 6 потоков (время ожидания для вычисления соотношения времени равно трем — для каждого интервала времени вычисления следуют три интервала времени ожидания). Результаты программы во многом зависят от типа задач, которые вы обрабатываете. Если задачи являются однородными и интенсивными по вычислениям, программа, вероятно, порекомендует установить размер пула в соответствии с количеством доступных процессоров. Но если у задачи есть время ожидания, как в интенсивных задачах ввода / вывода, программе рекомендуется увеличить число потоков, чтобы достичь 100% использования. Также обратите внимание, что некоторые задачи изменяют время ожидания для вычисления соотношения времени после некоторого времени обработки, например, если размер файла операции ввода-вывода увеличивается. Этот факт предлагает создать самонастраивающийся пул потоков (один из моих последующих блогов). В любом случае вы должны настроить размеры пула потоков, чтобы их можно было изменять во время выполнения.

Хорошо, это все с точки зрения надежных пулов потоков на данный момент. Надеюсь, тебе это немного понравилось. И не вините меня, если формула не на 100% точна с точки зрения максимального размера пула. Как я уже сказал, это не точная наука, а представление о идеальном размере пула.

Ссылка: «Истории потоков: о надежных пулах потоков» от нашего партнера JCG Никласа.