Статьи

Визуализация противодавления и реактивных потоков с помощью akka-streams, statsd, grafana и influenxdb

В настоящее время реактивное программирование привлекает все больше внимания. С помощью реактивного программирования можно легко создавать отказоустойчивые, масштабируемые и отказоустойчивые системы. В этой статье мы покажем несколько примеров реактивного программирования и, более конкретно, как обратное давление работает на практике. Для этого мы будем использовать akka-streams, поскольку мне действительно нравятся модели программирования Scala и Akka, и у них есть отличный DSL для определения потоков данных. Если вы немного новичок в реактивном программировании и Akka, отличное введение можно найти на слайд-шоу, которое я включил здесь:

В этой статье мы представим ряд сценариев, которые покажут, как противодавление и определенные конструкции потока, предоставляемые akka-потоками, влияют на источник событий (источник) и как это влияет на подписчика (приемник) этих событий. Для визуализации этих данных мы будем использовать набор простых черт, которые я создал для некоторых экспериментов по мониторингу актеров akka (источники можно найти здесь:  http://github.com/jos.dirksen/akka-mon ) вместе со statsd, Influx и графана. Для тех из вас, кто знаком с этими технологиями, краткое объяснение:

  • akka-mon ( https://github.com/josdirksen/akka-mon ): просто очень простой проект, я начал еще немного экспериментировать с мониторингом систем Akka. Это позволяет вам добавлять черты к вашим актерам, которые обеспечивают простое представление и ведение счета.
  • influxdb ( http://influxdb.com ): Influxdb представляет собой базу данных , специально созданные для хранения временных рядов. Мы используем effxdb для хранения обобщенных показателей из statsd.
  • grafana ( http://grafana.org/ ): Наконец, нам нужен способ визуализации данных. Сам Influxdb уже содержит простую визуализацию, но ничего, что было бы легко использовать. Поэтому для этого последнего компонента мы используем графану. С помощью grafana мы можем делать выборки из базы данных infxdb и визуализировать их во всех видах графиков.

Таким образом, в основном с установленными и настроенными этими компонентами мы имеем следующий поток:

  1. akka-mon отправляет событие (используя UDP) в statsd всякий раз, когда сообщение отправляется или принимается одним из настроенных участников. В наших сценариях мы отправляем событие, когда издатель отправляет сообщение, и когда один из подписчиков получает сообщение.
  2. Эти сообщения будут собираться statsd, и каждые 30 секунд statsd отправляет обновленную информацию о том, что было получено, на infxdb. Influxdb просто хранит всю эту информацию.
  3. Чтобы визуализировать эту информацию, мы настроим grafana для подключения к интерфейсу API effxdb, чтобы он мог извлекать сохраненную информацию и визуализировать ее на экране.

Как вы можете видеть, есть ряд движущихся частей, которые необходимо установить или настроить, чтобы все это работало правильно. Если вы хотите попробовать это сами, я добавил все файлы конфигурации и исходный код сценариев в github здесь ( https://github.com/josdirksen/akka-back-pressure ).

Начиная

Я не собираюсь объяснять все файлы конфигурации и инструкции по установке различных инструментов, которые используются. Установок по умолчанию для приставок infxdb, statsd и grafana должно быть достаточно, и можно найти соответствующие файлы конфигурации (включая файл sbt для этого проекта). Чтобы начать визуализировать обратное давление, нам сначала нужен источник (термин akka-streams для издателя), который создает сообщения, и нам, конечно, нужен приемник (также называемый подписчиком), который может обрабатывать сообщения. Давайте сначала посмотрим на приемник, поскольку мы создаем его, который немного сложнее, но позволяет нам детально контролировать скорость обмена сообщениями:

Загрузка гистограммы https: //gist.github.com/8a90c60adf5498a27d10.json …

В этом методе мы используем потоковый граф для создания нового источника, который мы можем использовать в других потоках. Этот источник объединяет два существующих источника, чтобы создать новый. Мы используем простой источник, созданный из диапазона, для создания ряда сообщений, и мы используем тиковый источник для регулирования количества сообщений, которые должны отправляться в секунду. Это работает следующим образом:

  1. Источник диапазона немедленно начинает создавать события.
  2. Это событие передается компоненту zip.
  3. Компонент zip теперь ожидает, пока не получит событие на обоих своих входах.
  4. TickSource создает события в указанном нами интервале.
  5. Теперь, когда tickSource создает событие, оба входа для компонента zip получат элемент, и компонент zip объединит их в кортеж и отправит их через соединитель zip.out.
  6. Компонент unzip (просто простая функция карты) получает этот кортеж ([Int, Tick]) и передает только событие, полученное из rangeSource.
  7. У нас есть последний шаг, который называется sendMap. Этот шаг добавлен, чтобы мы могли получать метрики в statsd, которые мы в конечном итоге можем визуализировать в графане.

Поскольку этот график является источником, нам нужно вернуть неподключенный выход. В этом случае мы возвращаем sendMap.outlet, и теперь у нас есть источник, скорость сообщения которого можно легко контролировать.

Первый сценарий: быстрый источник и быстрый подписчик

Теперь давайте посмотрим, что происходит, когда мы начинаем обрабатывать сообщения для нашего первого сценария. В этом сценарии мы будем использовать источник, который создает около 30 сообщений в секунду, и подписчик, который работает быстрее. Так что в этом сценарии ставка подписчика должна быть равна ставке издателя. Давайте сначала посмотрим на код, который определяет поток для этого сценария:

Загрузка гистограммы https: //gist.github.com/8a90c60adf5498a27d10.json …

В этом сценарии мы подключаем источник (который мы показали выше) к приемнику актера. Приемник акторов — это стандартный актер Akka, который в нашем случае запрашивает события / сообщения так быстро, как только они становятся доступными. В этом примере мы используем Actor, где мы можем настроить задержку для симуляции медленных подписчиков. Этот актер выглядит так:

Загрузка гистограммы https: //gist.github.com/8a90c60adf5498a27d10.json …

Для работы с потоками akka этот субъект должен расширить черту ActorSubscriber и определить стратегию запросов (дополнительную информацию см. В документации (ссылка)). Как видите, это очень простой актер, который просто обрабатывает сообщение и ожидает заданную задержку. Обратите внимание, что другие черты — это просто отслеживание черт, которые отправляют метрики в statsd.

Теперь давайте посмотрим, что произойдет, когда мы запустим этот пример (если вы делаете это самостоятельно, убедитесь, что statsd и influenxdb также запущены). С результатами в infxdb мы можем использовать графану для создания графика, который показывает количество сообщений, созданных источником и обработанных подписчиком.

Сценарий-1.png

Как видите, никаких сюрпризов там нет. Скорость подписчика точно равна скорости издателя и колеблется около 30 сообщений в секунду.

Второй сценарий: быстрый источник и замедление подписчика

Итак, в следующем сценарии давайте посмотрим, как обратное давление может повлиять на скорость нашего издателя. Если вы просматривали презентацию в начале этой статьи, вы знаете, что при обратном давлении медленный потребитель может ограничить скорость, с которой издатель отправляет событие, и наоборот. Таким образом, в этом сценарии мы будем моделировать потребителя, который становится медленнее после обработки каждого сообщения:

Загрузка гистограммы https: //gist.github.com/8a90c60adf5498a27d10.json …

Для этого сценария мы используем другого актера, SlowDownActor. Этот актер будет работать медленнее каждый раз, когда он обработал сообщение:

Загрузка гистограммы https: //gist.github.com/8a90c60adf5498a27d10.json …

В полученном сообщении этого актера мы делаем Thread.sleep с возрастающей задержкой. В этом сценарии, поскольку мы используем противодавление, скорость издателя контролируется скоростью, с которой подписчик может обрабатывать сообщения. Это можно увидеть в полученном графике графана:

Сценарий-2.png

На этом графике ставки подписчика и издателя одинаковы и медленно снижаются по мере увеличения задержки подписчика.

Третий сценарий: быстрый издатель, быстрый подписчик, который становится медленнее, с удаленным буфером

В этом сценарии мы вводим определенный блок компоновки, предоставляемый akka-streams, буфером. Если восходящий подписчик замедляется, буфер будет хранить определенное количество сообщений, прежде чем либо сказать издателю о замедлении, либо он начнет отбрасывать сообщения. Мы будем использовать следующий сценарий:

Загрузка гистограммы https: //gist.github.com/8a90c60adf5498a27d10.json …

Мы начинаем производить 5000 сообщений со скоростью 50 в секунду. Эти сообщения буферизуются в буфере и, наконец, используются «slowingSink», который является SlowDownActor (см. Код выше). Мы настроили буфер с размером 100 и OverFlowStrategy dropHead, что означает, что мы будем отбрасывать самое старое сообщение в буфере, когда приходят новые. Когда мы запустим этот сценарий, вы увидите следующее:

Сценарий-3.1.png

Вы можете видеть, что тариф абонента фактически игнорируется. Это потому, что мы установили OverflowStrategy в OverflowStrategy.dropHead. Интересно отметить, что подписчик продолжает обрабатывать сообщения через некоторое время после остановки издателя. Это потому, что в буфере все еще есть сообщения. Если мы установим это в OverflowStrategy.backPressure, как только буфер заполнится, производитель замедлится:

 val buffer = Flow[Int].buffer(2000, OverflowStrategy.BackPressure)

Это приводит к следующему графику графана:

Сценарий-3.2.png

Здесь мы видим, что показатель производителя остается высоким, а абонентский — низким. Это фаза, где буфер заполняется. Как только буфер заполнен, противодавление срабатывает, и скорость производителя падает до скорости абонента. Когда издатель завершит работу, подписчик с радостью продолжит обработку сообщений, пока его буфер не станет пустым.

Четвертый сценарий: быстрый издатель, один быстрый потребитель, один потребитель медленный

Пока что мы видели только 1 издателя и 1 потребителя, в этом сценарии мы рассмотрим влияние двух подписчиков на одного издателя:

Загрузка гистограммы https: //gist.github.com/8a90c60adf5498a27d10.json …

Для этого мы используем конструкцию Broadcast. С помощью трансляции мы дублируем сообщение и отправляем его нескольким подписчикам. В этом сценарии у нас есть один медленный подписчик и один быстрый подписчик. Результат этого можно увидеть на следующем графике:

Сценарий-4.png

Здесь вы можете видеть, что скорость издателя определяется скоростью, с которой медленный подписчик может обрабатывать сообщения. Это происходит потому, что медленный подписчик информирует издателя о замедлении. Это в перспективе также влияет на быстрого абонента.

Пятый сценарий: быстрый издатель, один быстрый потребитель и один медленный потребитель, который становится медленнее, но имеет буфер с падением

В предыдущем сценарии мы видели, что скорость издателя падает до самого медленного подписчика. Альтернативный подход заключается в использовании буфера между медленным подписчиком и издателем. Таким образом, вы можете указать, разрешено ли одному медленному подписчику замедлять работу издателя. Это то, что мы делаем в следующем сценарии:

Загрузка гистограммы https: //gist.github.com/8a90c60adf5498a27d10.json …

В этом сценарии источник сначала отправляет сообщения широковещательному компоненту, широковещательный компонент дублирует сообщение и отправляет его нижестоящим компонентам. Как вы можете видеть, широковещательная рассылка напрямую подключена к быстрому подписчику, а другая точка вещания сначала подключается к буферу и, наконец, к приемнику, который становится медленнее. Это приводит к следующему выводу:

Сценарий-5.1.png

Сравните этот график с предыдущим, который мы видели. Здесь мы видим, что скорость издателя и быстрого подписчика остается на одном уровне. Причина в том, что сообщения отбрасываются буфером, который мы добавили перед медленным подписчиком.

Когда мы изменяем буфер, чтобы прекратить отбрасывать сообщения, и вместо этого используем противодавление, мы получаем следующее:

Сценарий-5.2.png

Здесь мы видим, что пока буфер заполняется, и быстрый подписчик, и издатель могут поддерживать высокую скорость. Однако после заполнения буфера скорость падает до скорости, с которой медленный подписчик может обрабатывать сообщения.

Akka-streams предлагает ряд других графических компонентов, которые вы можете использовать для определения потока сообщений. Один интересный и последний из них — это балансировщик.

Шестой сценарий: быстрый издатель, быстрый подписчик и замедляющий подписчик вместе с балансировщиком

Компонент балансировщика отправляет сообщение подписчику, который доступен. Таким образом, в нашем случае больше сообщений будет отправлено быстрому подписчику, чем медленному. Когда медленный подписчик становится медленнее, быстрый подписчик будет обрабатывать все больше и больше сообщений:

Загрузка гистограммы https: //gist.github.com/8a90c60adf5498a27d10.json …

Это приводит к следующему графику:
Сценарий-6.png

Этот график показывает ожидаемое поведение. Интересно отметить, что в определенный момент издатель также начинает замедляться. Причина в том, что быстрый подписчик получает все больше и больше сообщений и в определенный момент не может угнаться за издателем. В этот момент срабатывает противодавление издателя.

Выводы

Итак, что мы можем сделать из всего этого? Вы можете видеть, что с реактивными потоками и противодавлением действительно просто убедиться, что скорость издателя и подписчика хорошо согласована. Это обеспечивает отличный контроль над потоком, позволяет избежать переполнения памяти, а с помощью дополнительных конструкций потока действительно легко поддерживать более сложные сценарии.

Существует гораздо больше информации о реактивных потоках, акка-потоках и всех возможных способах их использования. Что действительно помогло мне понять это, посмотрев на графики. Я думаю, что они действительно хорошо показывают, как обратное давление (и другие конструкции) влияют на издателя и подписчиков потоков событий.