Статьи

Расширенное реактивное программирование с помощью Akka и Scala

После знакомства с Akka в  нашей первой лаборатории Akka , мы —  @ RoadRunner12048  и @dmitterd  — хотели попробовать отслеживать приложение Akka, чтобы лучше понять его динамическое поведение. Кроме того, мы хотели поиграть с поддержкой кластеризации. Мы использовали очень грубое моделирование биржевой торговли и приложение для пинг-понга, которое мы оба реализовали в первой лаборатории в качестве темы для наших экспериментов.

Обновление с Akka 2.2.3 до 2.3.2

Мы начали нашу лабораторию с обновления до последней версии Akka 2.3.2 и получили некоторые ошибки компиляции, касающиеся маршрутизации. Итак, мы просмотрели руководство по  миграции  и нашли следующий комментарий, который, похоже, относится к нам:

API для создания пользовательских маршрутизаторов и ресайзеров изменился без сохранения старого API как устаревшего. Это должен быть API, используемый только несколькими пользователями, и они должны иметь возможность без особых проблем перейти на новый API.

Команда Akka представила два различных типа маршрутизаторов в Akka 2.3:  Pool и  Group. Пулы — это маршрутизаторы, которые сами управляют своими маршрутами (создание и завершение как дочерние акторы), тогда как группы — это маршрутизаторы, которые получают предварительно настроенные маршруты извне.

В нашем торговом приложении мы используем собственный маршрутизатор для управления так называемыми книгами заказов. В этом сценарии  Pool подход лучше подходит, потому что маршрутизатор должен настроить и управлять всеми  OrderBook участниками самостоятельно. Кроме того, наш маршрутизатор впоследствии необходимо будет настраивать во время выполнения, и в результате мы получим выделенного участника, который использует встроенный маршрутизатор, как описано в  документации Akka . Таким образом, наш субъект-маршрутизатор может пересылать сообщения на свои  OrderBookсерверы, но при необходимости обрабатывать сообщения конфигурации маршрутизации самостоятельно:

// Actor with routing functionality
class OrderRoutingActor(securities: Seq[Security]) extends Actor with ActorLogging {

  // routes all Asks and Bids belonging to same Security to a single OrderBook
  val routingLogic = new OrderBookRoutingLogic()

  // the router
  val router = new Router(routingLogic)

  override def preStart(): Unit = {

    // setup child OrderBook actor for given security   
    securities.foreach { security =>

      val orderBookForSecurity = context.actorOf(Props[OrderBook](
          new OrderBook(security) with SimpleTradeMatcher with AverageMarketPriceCalculator), security.name)

      routingLogic.addOrderBook(security, orderBookForSecurity)
    }

    log.info(s"Orderbook router for $securities is started.")
  }

  override def receive: Receive = {

    // handle new router configuration
    case UpdateConfig(config) => // reconfigure routingLogic

    // route the rest
    case msg => router.route(msg, sender())
  }
}

Давайте опишем шаг за шагом, что делает маршрутизатор. Как уже упоминалось, мы создали выделенного субъекта для маршрутизации сообщений с использованием  Router. Внутренне  Router использует OrderBookRoutingLogic который просто держит  Map в  Security к  ActorRef. Каждое сообщение типа  Order, которые являются  Bids или  Asks на  Security, будет направлено на ответственный OrderBook. Все остальные сообщения будут транслироваться всем известным пользователям  ActorRef.

Это почти то же самое, что и до обновления до Akka 2.3.2, но маршрутизатор не был Actor , что имеет большое значение. Обычно сообщение добавляется в Actorпочтовый ящик пользователя синхронно,  и маршрутизация выполняется в том же шаге. Однако, когда  Routerсамо сообщение внедряется в  Actor каждое сообщение, оно сначала добавляется в почтовый ящик субъекта маршрутизации (синхронно). Когда маршрутизатор обрабатывает сообщение, он добавляет его в почтовый ящик получателя. Поскольку задействованы два последовательных почтовых ящика вместо одного, это увеличит задержку сообщения. Хотя в реальном мире торговое приложение может иметь значение наносекунд, мы не будем больше беспокоиться о нашем демонстрационном приложении.

Мониторинг Акка

Мы хотели узнать больше о поведении нашей системы Akka во время выполнения. Поэтому мы добавили мониторинг в наше демонстрационное приложение Ping-Pong. Чтобы ускориться за три наших лабораторных дня, мы подумали, что согласимся на довольно новый проект  Kamon , используя их  образ Docker,  который включает в себя все необходимые компоненты, включая причудливую панель инструментов, и мы готовы идти вперед. К сожалению, это было не так просто.

Сбор данных мониторинга

Во-первых, нам нужно собрать данные в приложении. Kamon уже предоставляет интеграцию для Akka в модуле «kamon-core», который может измерять такие показатели, как длина почтового ящика актера или время обработки сообщения. Kamon интегрирован в приложение Akka с использованием прокси-серверов AspectJ, поэтому нам нужно добавить ткач AspectJ во время выполнения в качестве агента Java, как описано в разделе «  Приступая к работе» документации Kamon . Утилита AspectJ также может применяться при использовании  sbt run, однако документация Kamon устарела, и нам пришлось настроить документированную конфигурацию, чтобы она работала (см. Наш  проект Gitub akka-lab ). Очевидно, что сбор данных — это только часть истории. Далее нам нужны средства, чтобы что-то увидеть.

Панель мониторинга

В настоящее время Kamon предоставляет две возможности экспорта данных мониторинга:  NewRelic  и StatsD ; мы пошли на StatsD. StatsD — это демон Node.js, который получает данные мониторинга через UDP и перенаправляет их в так называемые бэкэнды, которые в конечном итоге отображают данные мониторинга.

Чтобы отправить данные демону StatsD, нам нужно добавить следующий фрагмент в файл конфигурации приложения:

akka {
    # ...
    extensions = ["kamon.statsd.StatsD"]
    # ...
}

и убедитесь , что мы добавим интеграцию статистики в  build.sbt"io.kamon" % "kamon-statsd" % "0.3.0".

Кроме того, нам нужно предоставить файл конфигурации для Kamon. Вот немного упрощенная версия:

kamon {
  statsd {
    # Our StatsD target host and port
    hostname = "127.0.0.1"
    port = 8125

    simple-metric-key-generator {
      # Application prefix for all metrics pushed to StatsD
      application = "pingpong"
    }
  }

  metrics {
    filters = [
      {
        actor {
          includes = [ "*" ],
          excludes = [ "system/*" ]
        }
      },
      {
        trace {
          includes = [ "*" ]
          excludes = []
        }
      }
    ]
  }
}

Приложение Akka теперь полностью настроено. Далее нам нужно установить необходимые пакеты для обработки данных мониторинга.

Команда Kamon предоставляет  образ Docker, в  котором StatsD и подходящий бэкэнд уже предварительно настроены. Идея действительно замечательная, но мы не смогли запустить ее на наших Mac-книгах ни через  boot2docker,  ни через  Vagrant . Мы могли бы загрузить образ Docker в Vagrant, но мы никогда не помещали данные мониторинга в приборную панель. В итоге мы установили StatsD и серверную часть Graphite, следуя инструкциям по установке  Steve Akers  со следующими изменениями:

  1. При установке Каира соблюдайте все предостережения, сделанные Homebrew. Убедитесь, что все работает, набрав  import cairo в оболочке Python. Вы не должны получать никаких ошибок. Если Graphite не находит Каир, он не может визуализировать графику, поэтому этот шаг крайне важен.
  2. Графит это приложение Django. Когда мы открыли стартовую страницу Graphite на нашем сервере, мы просто получили сообщение «Ошибка импорта: модуль с именем по умолчанию». Как жаль! Оказалось, что  мы должны установить Django 1.5  вместо текущей версии 1.6. Django 1.5 можно установить через  sudo pip install django==1.5.

После того, как все установлено, мы можем запустить демон StatsD и Graphite:

~statsd $ node stats.js config.js
/opt/graphite $ sudo python bin/run-graphite-devel-server.py .

После этого мы запускаем сервер Ping-Pong и клиентское приложение. В Graphite мы можем посмотреть различные показатели, которые собирает Kamon, например размер почтового ящика:

Мониторинг Акка Пинг-Понг

Кластеризация торгового приложения

В стороне: Акка Кластер Спец

Чтобы начать кластеризацию Akka, нам нужны две вещи:

  1. Добавьте пакет кластера Akka в качестве зависимости в нашем  build.sbt :libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.3.2"
  2. Обновите файл  application.conf,  как описано в  документе Akka.

Узлы кластера могут либо присоединяться автоматически, подключаясь к настроенным начальным узлам, либо явно предоставляя адрес для программного присоединения. Фрагмент ниже демонстрирует программный подход:

val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552").withFallback(ConfigFactory.load())

val sys = ActorSystem("TradingShard", config)

val seed: Address = Address("akka.tcp", "TradingShard", "127.0.0.1", 2551)

val cluster: Cluster = Cluster(sys).joinSeedNodes(a1 :: Nil)

Предполагая, что у нас уже есть действующая система акторов, названная  TradingSystem на нашей машине через порт  2551, мы создаем вторую систему акторов с тем же именем — что очень важно — на порте  2552, разрешим адрес начального узла и присоединяемся к кластеру, используя это адрес. Адрес может быть получен, например, из базы данных, службы REST или любого другого источника данных.

Теперь, когда мы стали участником кластера, мы хотим общаться с участниками других узлов кластера. Это, конечно, возможно при использовании полного адреса субъекта, включая хост и порт его системы субъекта. Но в большинстве случаев это не то, что мы хотим. Akka также предоставляет «маршрутизирующие» реализации маршрутизаторов, такие как  ClusterRouterGroup или ClusterRouterPool которые можно настроить как обычные маршрутизаторы:

val routeesPaths = "/user/tradingShardManager" :: Nil

val codedShardManagerRouterConf = ClusterRouterGroup(
  BroadcastGroup(routeesPaths),
  ClusterRouterGroupSettings(
    totalInstances = 100,
    routeesPaths = routeesPaths,
    allowLocalRoutees = false,
    useRole = None
  )
)

val codedShardManagerRouter = context.actorOf(codedShardManagerRouterConf.props(), "clusterRouter")

В этом примере, который нам понадобится позже, мы создаем пул, в котором маршруты — это те акторы кластера, которые подключены к пути, "/user/tradingShardManager"за исключением актера, который принадлежит той же системе акторов  ( allowLocalRoutees = false). Этот пул маршрутизаторов получает  BroadcastGroup стратегию, которая означает, что он будет отправлять все сообщения каждому известному маршрутизатору. Akka поставляется со многими другими этими стратегиями, от простого циклического перебора до согласованного хеширования, до стратегий, основанных на таких показателях, как кучи памяти или загрузка процессора. Кроме того, конечно, можно позволить маршрутизатору управлять экземплярами субъектов и создавать их в кластере по мере необходимости.

Хотя Akka уже поставляется с множеством этих замечательных стратегий, даже этого иногда может быть недостаточно. В этих случаях вы можете глубже погрузиться в детали конфигурации кластера и написать свой собственный протокол, основанный на событиях кластера, которые Akka передает, если что-то происходит в кластере. Самые интересные сообщения для нашего торгового приложения:

  • MemberUp который публикуется, когда новый узел становится частью кластера.
  • MemberDown который публикуется, когда участник покидает кластер (например, машина выключается).
  • LeaderChanged который публикуется каждый раз, когда участник становится лидером кластера. Не существует явного процесса выбора, на котором узел становится лидером. Лидер — это как раз тот член, который первым может взять на себя эту роль. Лидер может время от времени меняться, как описано более подробно в  Спецификации кластера Akka.

Чтобы получить эти сообщения, вы можете просто подписать актера на поток событий кластера и обработать нужные вам сообщения:

class EventListeningActor extends Actor {
  override def preStart(): Unit = {
    val cluster = Cluster(context.system)

    cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[ClusterDomainEvent])
    cluster.sendCurrentClusterState(self)
  }

  override def receive = {
    case LeaderChanged(newLeader) => 
    case MemberRemoved(member, status) =>
    case MemberUp(member) => 
  }
}

Кластерная торговля

После того, как мы познакомились с поддержкой кластеризации Akka, нам пришлось подумать о протоколе связи между актерами. Хотя довольно просто определить протокол связи для внутрипроцессной системы Actor, определенно становится сложнее определить надежный протокол связи для кластерной системы Actor. Кластеризация вносит большую динамику в поведение приложения Akka во время выполнения: узлы кластера могут приходить и уходить, может меняться лидер, или части кластера могут быть изолированы друг от друга из-за разрыва сети.

Из-за сложности, с которой мы столкнулись, мы несколько раз пересматривали протокол связи. Основная идея, как показано ниже, состоит в том, чтобы разместить несколько книг заказов на каждом узле-члене кластера. Участник рынка может отправлять ордера на любой узел кластера. Если ответственная книга заказов размещена на текущем узле кластера, он обрабатывает заказ. В противном случае он ищет целевой узел в своей внутренней таблице конфигурации и перенаправляет заказ.

TradingAppCluster OrderRouting

Остальная часть протокола необходима для распространения и обновления таблицы конфигурации по мере того, как члены присоединяются и уходят. Протокол реализован  TradingShardManager и переработана версия  OrderRoutingActor. У каждого участника есть собственный менеджер торговых сегментов, который инкапсулирует всю логику протокола. Как упоминалось ранее, один из членов играет роль лидера. Этот участник  TradingShardManager отвечает за ведение глобальной таблицы маршрутизации. Всякий раз, когда новый участник присоединяется к кластеру, лидер  TradingShardManagerзапрашивает локальные книги заказов нового участника. Если приходит ответ, книги заказов нового участника добавляются в глобальную таблицу маршрутизации. После этого обновленная таблица маршрутизации будет транслироваться другим участникам  TradingShardManager, которые затем отвечают за обновление своей локальной OrderRouterмаршруты. Теперь каждый маршрутизатор заказов знает, куда направлять запросы и ставки для каждой книги заказов в кластере.

TradingAppCluster ShardConfig

Резюме

Обновление наших демонстрационных приложений до Akka 2.3.2 заняло немного усилий, но благодаря руководству по миграции это удалось. В целом мы удовлетворены новым различием между пулами маршрутизаторов и группами.

У нас смешанные чувства по поводу текущей поддержки мониторинга приложений Akka. Хотя команда Kamon проделала большую работу и интегрировать мониторинг в приложение Akka легко, инфраструктуре мониторинга требуется несколько серверов и разнообразный набор технологий, таких как Node.js (StatsD) и Python (Graphite). В короткие сроки нашей Лаборатории мы едва поцарапали поверхность в этой области.

Мы смогли получить непосредственный опыт кластеризации. Хотя сам код не так сильно меняется, кластеризация оказывает большое влияние на дизайн протокола связи. Наш главный вывод заключается в том, что протокол связи должен быть очень хорошо продуман, и для того, чтобы сделать это правильно, требуется некоторый опыт.