Статьи

Мониторинг и измерение реактивного приложения с Dropwizard Metrics

В предыдущей статье мы создали простой код индексации, который забивает ElasticSearch тысячами одновременных запросов. Единственный способ контролировать производительность нашей системы — это протоколирование старой школы:

1
2
3
.window(Duration.ofSeconds(1))
.flatMap(Flux::count)
.subscribe(winSize -> log.debug("Got {} responses in last second", winSize));

Это хорошо, но в производственной системе мы бы предпочли централизованное решение для мониторинга и составления диаграмм для сбора различных метрик. Это становится особенно важным, когда у вас есть сотни различных приложений в тысячах экземпляров. Наличие единой графической панели, объединяющей всю важную информацию, становится критически важным. Нам нужны два компонента, чтобы собрать некоторые метрики:

  • метрики публикации
  • собирая и визуализируя их

Публикация метрик с использованием Dropwizard Metrics

В Spring Boot 2 метрики Dropwizard были заменены микрометром . В этой статье используется первое, в следующем будет показано последнее решение на практике. Чтобы воспользоваться преимуществами Метрик Dropwizard, мы должны внедрить MetricRegistry или конкретные метрики в наши бизнес-классы.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
  
@Component
@RequiredArgsConstructor
class Indexer {
  
    private final PersonGenerator personGenerator;
    private final RestHighLevelClient client;
    private final Timer indexTimer;
    private final Counter indexConcurrent;
    private final Counter successes;
    private final Counter failures;
  
    public Indexer(PersonGenerator personGenerator, RestHighLevelClient client, MetricRegistry metricRegistry) {
        this.personGenerator = personGenerator;
        this.client = client;
        this.indexTimer = metricRegistry.timer(name("es", "index"));
        this.indexConcurrent = metricRegistry.counter(name("es", "concurrent"));
        this.successes = metricRegistry.counter(name("es", "successes"));
        this.failures = metricRegistry.counter(name("es", "failures"));
    }
  
    private Flux<IndexResponse> index(int count, int concurrency) {
        //....
    }
  
}

Столько шаблонов, чтобы добавить некоторые метрики!

  • indexTimer измеряет распределение времени (среднее значение, медиана и различные процентили) запросов на индексирование
  • indexConcurrent измеряет, сколько запросов в данный момент indexConcurrent (запросов отправлено, ответа пока нет); метрика идет вверх и вниз со временем
  • success и failures подсчитывает общее количество успешных и неудачных запросов на индексирование соответственно

Мы избавимся от шаблона через секунду, но сначала давайте посмотрим, как он работает в нашем бизнес-коде:

1
2
3
4
5
6
7
private Mono<IndexResponse> indexDocSwallowErrors(Doc doc) {
    return indexDoc(doc)
            .doOnSuccess(response -> successes.inc())
            .doOnError(e -> log.error("Unable to index {}", doc, e))
            .doOnError(e -> failures.inc())
            .onErrorResume(e -> Mono.empty());
}

Приведенный выше вспомогательный метод увеличивает число успехов и неудач при каждом завершении запроса. Кроме того, он регистрирует и поглощает ошибки, так что одна ошибка или тайм-аут не прерывает весь процесс импорта.

1
2
3
4
5
private <T> Mono<T> countConcurrent(Mono<T> input) {
    return input
            .doOnSubscribe(s -> indexConcurrent.inc())
            .doOnTerminate(indexConcurrent::dec);
}

Другой приведенный выше метод увеличивает показатель indexConcurrent при indexConcurrent нового запроса и уменьшает его при появлении результата или ошибки. Этот показатель продолжает расти и уменьшаться, показывая количество запросов в полете.

1
2
3
4
5
6
7
private <T> Mono<T> measure(Mono<T> input) {
    return Mono
            .fromCallable(indexTimer::time)
            .flatMap(time ->
                    input.doOnSuccess(x -> time.stop())
            );
}

Последний вспомогательный метод является наиболее сложным. Он измеряет общее время индексации, то есть время между отправляемым запросом и полученным ответом. На самом деле, он довольно общий, он просто вычисляет общее время между подпиской на произвольный Mono<T> и завершением. Почему это выглядит так странно? Ну, основной API Timer очень прост

1
indexTimer.time(() -> someSlowCode())

Он просто принимает лямбда-выражение и измеряет, сколько времени понадобилось, чтобы вызвать его. В качестве альтернативы вы можете создать небольшой объект Timer.Context который запоминает, когда он был создан. Когда вы вызываете Context.stop() он сообщает об этом измерении:

1
2
3
final Timer.Context time = indexTimer.time();
someSlowCode();
time.stop();

С асинхронными потоками это намного сложнее. Начало задачи (обозначается подпиской) и ее завершение обычно происходит через границы потоков в разных местах кода. Что мы можем сделать — это создать (лениво) новый объект Context (см .: fromCallable(indexTimer::time) ), и когда завернутый поток завершится, завершить Context (см .: input.doOnSuccess(x -> time.stop() ). Вот как вы составляете все эти методы:

1
2
3
4
5
personGenerator
            .infinite()
            .take(count)
            .flatMap(doc ->
                countConcurrent(measure(indexDocSwallowErrors(doc))), concurrency);

Это так, но загрязнение бизнес-кода столькими низкоуровневыми деталями сбора метрик кажется странным. Давайте обернем эти метрики специализированным компонентом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@RequiredArgsConstructor
class EsMetrics {
  
    private final Timer indexTimer;
    private final Counter indexConcurrent;
    private final Counter successes;
    private final Counter failures;
  
    void success() {
        successes.inc();
    }
  
    void failure() {
        failures.inc();
    }
  
    void concurrentStart() {
        indexConcurrent.inc();
    }
  
    void concurrentStop() {
        indexConcurrent.dec();
    }
  
    Timer.Context startTimer() {
        return indexTimer.time();
    }
  
}

Теперь мы можем использовать немного более высокоуровневую абстракцию:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
class Indexer {
 
    private final EsMetrics esMetrics;
  
    private <T> Mono<T> countConcurrent(Mono<T> input) {
        return input
                .doOnSubscribe(s -> esMetrics.concurrentStart())
                .doOnTerminate(esMetrics::concurrentStop);
    }
  
    //...
  
    private Mono<IndexResponse> indexDocSwallowErrors(Doc doc) {
        return indexDoc(doc)
                .doOnSuccess(response -> esMetrics.success())
                .doOnError(e -> log.error("Unable to index {}", doc, e))
                .doOnError(e -> esMetrics.failure())
                .onErrorResume(e -> Mono.empty());
    }
}

В следующей статье мы научимся составлять все эти методы еще лучше. И избегайте шаблонов.

Публикация и визуализация метрик

Собирать метрики самостоятельно недостаточно. Мы должны периодически публиковать агрегированные показатели, чтобы другие системы могли их использовать, обрабатывать и визуализировать. Одним из таких инструментов является графит и графана . Но прежде чем мы углубимся в их настройку, давайте сначала опубликуем метрики на консоли. Я считаю это особенно полезным при устранении неполадок в метриках или в процессе разработки.

1
2
3
4
5
6
7
8
9
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
  
@Bean
Slf4jReporter slf4jReporter(MetricRegistry metricRegistry) {
    final Slf4jReporter slf4jReporter = Slf4jReporter.forRegistry(metricRegistry.build();
    slf4jReporter.start(1, TimeUnit.SECONDS);
    return slf4jReporter;
}

Этот простой фрагмент кода берет существующую MetricRegistry и регистрирует Slf4jReporter . Каждую секунду вы будете видеть все показатели, напечатанные в ваших журналах (Logback и т. Д.):

1
2
3
4
5
6
type=COUNTER, name=es.concurrent, count=1
type=COUNTER, name=es.failures, count=0
type=COUNTER, name=es.successes, count=1653
type=TIMER, name=es.index, count=1653, min=1.104664, max=345.139385, mean=2.2166538118720576,
    stddev=11.208345077801448, median=1.455504, p75=1.660252, p95=2.7456, p98=5.625456, p99=9.69689, p999=85.062713,
    mean_rate=408.56403102372764, m1=0.0, m5=0.0, m15=0.0, rate_unit=events/second, duration_unit=milliseconds

Но это просто или устранение неполадок, чтобы опубликовать наши показатели во внешнем экземпляре Graphite, нам нужен GraphiteReporter :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
  
@Bean
GraphiteReporter graphiteReporter(MetricRegistry metricRegistry) {
    final Graphite graphite = new Graphite(new InetSocketAddress("localhost", 2003));
    final GraphiteReporter reporter = GraphiteReporter.forRegistry(metricRegistry)
            .prefixedWith("elastic-flux")
            .convertRatesTo(TimeUnit.SECONDS)
            .convertDurationsTo(TimeUnit.MILLISECONDS)
            .build(graphite);
    reporter.start(1, TimeUnit.SECONDS);
    return reporter;
}

Здесь я сообщаю localhost:2003 где мой образ Docker с Graphite + Grafana оказался. Раз в секунду все метрики отправляются на этот адрес. Позже мы можем визуализировать все эти показатели на Графане:


На верхней диаграмме показано распределение времени индексации (от 50-го до 99,9-го процентиля). Используя эту диаграмму, вы можете быстро узнать, какова типичная производительность (P50), а также (почти) производительность в худшем случае (P99.9). Логарифмическая шкала необычна, но в этом случае позволяет нам видеть как низкий, так и высокий процентили. Нижняя диаграмма еще интереснее. Он объединяет три метрики:

  • скорость (количество запросов в секунду) успешных операций с индексами
  • частота неудачных операций (красная полоса, сложенная поверх зеленой)
  • текущий уровень параллелизма (правая ось): номер запроса в полете

Эта диаграмма показывает пропускную способность системы (RPS), сбои и параллелизм. Слишком много сбоев или необычно высокий уровень параллелизма (многие операции ожидают ответа) могут быть признаком некоторых проблем в вашей системе. Определение панели доступно в репозитории GitHub.

В следующей статье мы узнаем, как перейти с метрик Dropwizard на микрометр. Очень приятный опыт!

Опубликовано на Java Code Geeks с разрешения Томаша Нуркевича, партнера нашей программы JCG. Смотрите оригинальную статью здесь: Мониторинг и измерение реактивного приложения с помощью Dropwizard Metrics

Мнения, высказанные участниками Java Code Geeks, являются их собственными.