Статьи

Java Советы: создание дружественного к мониторингу ExecutorService

В этой статье мы будем расширять реализацию ExecutorService с возможностями мониторинга. Эта возможность мониторинга поможет нам измерить ряд параметров пула, таких как активные потоки, размер рабочей очереди и т. Д. В реальной производственной среде. Это также позволит нам измерить время выполнения задачи, количество успешных задач и количество неудачных задач.

Библиотека мониторинга

Что касается библиотеки мониторинга, мы будем использовать метрики . Для простоты мы будем использовать ConsoleReporter, который будет сообщать наши метрики в консоль. Для приложений промышленного уровня мы должны использовать расширенный репортер (т. Е. Graphite reporter). Если вы не знакомы с Metrics, я рекомендую вам ознакомиться с руководством по началу работы .

Давайте начнем.

Расширение ThreadPoolExecutor

Мы будем использовать ThreadPoolExecutor в качестве базового класса для нашего нового типа. Давайте назовем это MonitoredThreadPoolExecutor . Этот класс будет принимать MetricRegistry в качестве одного из параметров конструктора —

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
  private final MetricRegistry metricRegistry;
 
  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      MetricRegistry metricRegistry
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    this.metricRegistry = metricRegistry;
  }
 
  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      MetricRegistry metricRegistry
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    this.metricRegistry = metricRegistry;
  }
 
  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      RejectedExecutionHandler handler,
      MetricRegistry metricRegistry
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    this.metricRegistry = metricRegistry;
  }
 
  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      RejectedExecutionHandler handler,
      MetricRegistry metricRegistry
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    this.metricRegistry = metricRegistry;
  }
}

Регистрация датчиков для измерения параметров пула

Датчик — это мгновенное измерение значения. Мы будем использовать его для измерения различных параметров пула, таких как количество активных потоков, размер очереди задач и т. Д.

Прежде чем мы сможем зарегистрировать датчик, мы должны решить, как рассчитать имя метрики для нашего пула потоков. Каждая метрика, будь то датчик, таймер или просто метр, имеет уникальное имя. Это имя используется для идентификации источника метрики. Соглашение здесь состоит в том, чтобы использовать пунктирную строку, которая часто составляется из полностью определенного имени отслеживаемого класса.

Для нашего пула потоков мы будем использовать его полное имя в качестве префикса к именам наших метрик. Дополнительно мы добавим еще один параметр конструктора
poolName, который будет использоваться клиентами для указания специфичных для экземпляра идентификаторов.

После внесения этих изменений класс выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
  private final MetricRegistry metricRegistry;
  private final String metricsPrefix;
 
  public MonitoredThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      MetricRegistry metricRegistry,
      String poolName
  ) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    this.metricRegistry = metricRegistry;
    this.metricsPrefix = MetricRegistry.name(getClass(), poolName);
  }
 
  // Rest of the constructors
}

Теперь мы готовы зарегистрировать наши датчики. Для этого мы определим приватный метод —

1
2
3
4
5
6
private void registerGauges() {
  metricRegistry.register(MetricRegistry.name(metricsPrefix, "corePoolSize"), (Gauge<Integer>) this::getCorePoolSize);
  metricRegistry.register(MetricRegistry.name(metricsPrefix, "activeThreads"), (Gauge<Integer>) this::getActiveCount);
  metricRegistry.register(MetricRegistry.name(metricsPrefix, "maxPoolSize"), (Gauge<Integer>) this::getMaximumPoolSize);
  metricRegistry.register(MetricRegistry.name(metricsPrefix, "queueSize"), (Gauge<Integer>) () -> getQueue().size());
}

В нашем примере мы измеряем размер основного пула, количество активных потоков, максимальный размер пула и размер очереди задач. В зависимости от требований к мониторингу мы можем зарегистрировать больше / меньше датчиков для измерения различных свойств.

Этот приватный метод теперь будет вызываться из всех конструкторов —

01
02
03
04
05
06
07
08
09
10
11
12
13
14
public MonitoredThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    MetricRegistry metricRegistry,
    String poolName
) {
  super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  this.metricRegistry = metricRegistry;
  this.metricsPrefix = MetricRegistry.name(getClass(), poolName);
  registerGauges();
}

Измерение времени выполнения задачи

Чтобы измерить время выполнения задачи, мы переопределим два метода жизненного цикла, которые предоставляет ThreadPoolExecutorbeforeExecute и afterExecute .

Как следует из названия, обратный вызов beforeExecute вызывается перед выполнением задачи потоком, который выполнит задачу. Реализация этого обратного вызова по умолчанию ничего не делает.

Точно так же обратный вызов afterExecute вызывается после выполнения каждой задачи потоком, выполнившим задачу. Реализация этого обратного вызова по умолчанию также ничего не делает. Даже если задача выдает неперехваченную исключительную ситуацию RuntimeException или Error , этот обратный вызов будет вызван.

Мы будем запускать Timer в нашем переопределении beforeExecute , которое затем будет использоваться в нашем переопределении afterExecute для получения общего времени выполнения задачи. Чтобы сохранить ссылку на Timer, мы введем новое поле ThreadLocal в нашем классе.

Реализация обратных вызовов дана ниже —

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
  private final MetricRegistry metricRegistry;
  private final String metricsPrefix;
  private ThreadLocal<Timer.Context> taskExecutionTimer = new ThreadLocal<>();
 
  // Constructors
 
  @Override
  protected void beforeExecute(Thread thread, Runnable task) {
    super.beforeExecute(thread, task);
    Timer timer = metricRegistry.timer(MetricRegistry.name(metricsPrefix, "task-execution"));
    taskExecutionTimer.set(timer.time());
  }
 
  @Override
  protected void afterExecute(Runnable task, Throwable throwable) {
    Timer.Context context = taskExecutionTimer.get();
    context.stop();
    super.afterExecute(task, throwable);
  }
}

Запись количества невыполненных задач из-за необработанных исключений

Второй параметр обратного вызова afterExecuteThrowable . Если не ноль , этот Throwable ссылается на неперехваченную исключительную ситуацию RuntimeException или Error, которая вызвала завершение выполнения. Мы можем использовать эту информацию для частичного подсчета общего количества задач, которые были внезапно завершены из-за необработанных исключений.

Чтобы получить общее количество невыполненных задач, нужно рассмотреть еще один случай. Задачи, переданные с использованием метода execute, будут выбрасывать любые необработанные исключения и будут доступны в качестве второго аргумента для обратного вызова afterExecute . Однако задачи, отправленные с использованием метода submit , проглатываются службой-исполнителем. Это ясно объясняется в JavaDoc (выделено мной) —


Примечание. Когда действия включены в задачи (такие как FutureTask) либо явно, либо с помощью таких методов, как submit, эти объекты задач перехватывают и поддерживают вычислительные исключения, поэтому они не вызывают внезапного завершения, а внутренние исключения не передаются этому методу. , Если вы хотите перехватить оба вида сбоев в этом методе, вы можете дополнительно исследовать такие случаи, как в этом примере подкласса, который печатает либо непосредственную причину, либо основное исключение, если задача была прервана. К счастью, тот же самый документ также предлагает решение для этого, которое состоит в том, чтобы исследовать runnable, чтобы видеть, является ли это Будущим , и затем получить основное исключение.

Комбинируя эти подходы, мы можем изменить наш метод afterExecute следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
  Timer.Context context = taskExecutionTimer.get();
  context.stop();
 
  super.afterExecute(runnable, throwable);
  if (throwable == null && runnable instanceof Future && ((Future) runnable).isDone()) {
    try {
      ((Future) runnable).get();
    } catch (CancellationException ce) {
      throwable = ce;
    } catch (ExecutionException ee) {
      throwable = ee.getCause();
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
    }
  }
  if (throwable != null) {
    Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));
    failedTasksCounter.inc();
  }
}

Подсчет общего количества успешных заданий

Предыдущий подход также можно использовать для подсчета общего количества успешных задач: задач, которые были выполнены без каких-либо исключений или ошибок —

01
02
03
04
05
06
07
08
09
10
11
12
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
  // Rest of the method body .....
 
  if (throwable != null) {
    Counter failedTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "failed-tasks"));
    failedTasksCounter.inc();
  } else {
    Counter successfulTasksCounter = metricRegistry.counter(MetricRegistry.name(metricsPrefix, "successful-tasks"));
    successfulTasksCounter.inc();
  }
}

Вывод

В этой статье мы рассмотрели несколько удобных для мониторинга настроек для реализации ExecutorService. Как всегда, любые предложения / улучшения / исправления ошибок будут высоко оценены. Что касается примера исходного кода, он был загружен в
Github .

Опубликовано на Java Code Geeks с разрешения Саима Ахмеда, партнера нашей программы JCG . См. Оригинальную статью здесь: Советы по Java: создание дружественного к мониторингу ExecutorService

Мнения, высказанные участниками Java Code Geeks, являются их собственными.