Статьи

Реактивное программирование с помощью Akka и Scala

В ходе нашей лаборатории мы хотели реализовать приложение с помощью Akka и Scala, потому что мы собираемся оценить высокопроизводительные и масштабируемые программные архитектуры на JVM. В этом блоге мы описываем, как настроить приложение Akka и показать несколько простых демонстрационных приложений.

Начальная загрузка приложения Akka / Scala

Базовая настройка приложения проста. Мы используем  sbt как инструмент для сборки. Поэтому нам нужно создать  build.sbt  и добавить необходимые артефакты Akka в качестве зависимостей:

name := "The Akka Lab"

version := "0.1"

scalaVersion := "2.10.4 "

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.2.3",
  "com.typesafe.akka" %% "akka-testkit" % "2.2.3",
  "org.scalatest" %% "scalatest" % "2.0" % "test",
)

Вы можете легко импортировать проект в IntelliJ или использовать плагины sbt для создания файлов проекта для предпочитаемой вами IDE:

Простая передача сообщений

После импорта проекта мы можем реализовать нашу первую систему ActorSystem. Его структура показана ниже:

Простая структура систем актеров

Мы хотим создать единую систему ActorSystem с именем маршрутизации,  имеющую  Receiver Actor с именем single  рядом с  RoundRobinRouter маршрутизатором  с 10 дочерними  типами  Receiver. Нам просто нужно создать экземпляр ActorSystem и создать как дети  одного  и  маршрутизатор . Сам RoundRobinRouter создает своих детей:

 import scala.concurrent.duration._
    val duration = 3.seconds
    implicit val timeout = Timeout(duration)

    val sys = ActorSystem("Routing")

    val single = sys.actorOf(Props[Receiver](new Receiver(2.seconds.toMillis)), "single")
    val router = sys.actorOf(Props[Receiver].withRouter(RoundRobinRouter(nrOfInstances = 10)), "router")

Receiver Получает сообщения типа  Message(String) и печатает параметр сообщения. После получения сообщения мы переключаем состояние нашего получателя с помощью become механизма Akka . Итак, вот определение нашего  Receiver актера:

class Receiver(timeout: Long) extends Actor with ActorLogging {
    import demo.RoutingStrategies.Receiver._

    def this() = this(1000)

    override def receive = fastReceive

    def fastReceive: Receive = {
      case Message(m)=> {
        log.info(m)

        context.become(slowReceive)
      }
    }

    def slowReceive: Receive = {
      case Message(m) => {
        Thread.sleep(timeout)

        log.info(s"Slow: $m")

        context.become(fastReceive)
      }
    }
  }

Как упоминалось ранее, актер просто печатает сообщение. Получив сообщение, он переключает свое состояние с  fastReceive на  slowReceive и наоборот для имитации более сложной и трудоемкой операции. Теперь, когда наша система завершена, мы можем начать отправлять сообщения  single и  router:

// Sending a message by using actors path
sys.actorSelection("user/single") ! Message("Hello You, by path! [fast]") // fast really?

// Sending a message by using ActorRef
single ! Message("Hello You! [slow]") // slow really?
single ! Message("Hello You! [fast]") // fast really?

// Sending a message by using Router
router ! Message("Hello Anybody! [fast]")   // route message to next Receiver actor
router ! Broadcast(Message("Hello World! [1xslow, 9xfast]")) // route message to all Receiver actors

Прямо здесь мы получили нашу первую проблему. Как вы можете видеть, ожидается, что Akka сохраняет порядок сообщений, и это верно — до тех пор, пока вы не перепутаете отправку сообщений с помощью  ActorRef и  ActorSelection. В этом случае единственной гарантией является то, что все сообщения, отправленные на,  ActorRef будут иметь определенный порядок, и все сообщения, отправленные с помощью, также  ActorSelection имеют определенный порядок. Но между этими двумя механизмами адресации сообщений нет гарантированного порядка. Последнее, что мы хотим попробовать, это закрыть ActorSystem после того, как все сообщения были обработаны. Поскольку мы находимся в многопоточной среде, мы не можем просто завершить работу системы в конце основного метода. Мы могли бы позвонить,  system.shutdown() а затем использовать system.awaitTermination() ждать завершения всех активных в данный момент операций, но мы не знаем, были ли обработаны все сообщения. По этой причине Akka предоставляет  gracefulShutdown механизм: использование его означает, что специальное сообщение  PoisonPill, помещается в очередь в почтовом ящике актера. Все сообщения до  PoisonPill будут обработаны в обычном режиме. Когда  PoisonPill обрабатывается, актер завершает работу и отправляет  Terminated сообщение. После того, как мы собрали все Terminated сообщения, мы можем безопасно завершить  работу системы:

for {
  routerTerminated  <- gracefulStop(router, duration, Broadcast(PoisonPill))
  singleTerminated <- gracefulStop(single, duration)
} {
  sys.shutdown()
}

PingPong: удаленные сообщения

Чтобы попробовать  удаленное взаимодействие  в Akka, мы решили сыграть в настольный теннис Actor. Основной код актера довольно прост (упрощенная версия):

object PingPongActor {
  case class Ping(message : String)
  case class Pong(message : String)
}

class PingPongActor extends Actor with ActorLogging {
  import demo.PingPongActor.{Pong, Ping}
  def receive = {
    case Ping(m) => {
      log.info(s"Ping: $m")
      sender ! Pong(m)
    }

    case Pong(m) => {
      log.info(s"Pong: $m")
      sender ! Ping(m)
    }
  }
}

На основе  примера Akka Remote Hello-World  мы написали «клиентское» и «серверное» приложения и настроили их с помощью Typesafe Config. Одному из актеров нужно только начать игру, а потом оба пинг-понга счастливо. Поскольку протокол сообщений очень прост, приложение хорошо подходит для измерения задержек сообщений Akka. Следовательно, мы прикрепили метку времени к каждому сообщению, используя  System#nanoTime(). Однако, как указано в  Javadoc System # nanoTime () , он подходит только для измерения времени в пределах одной JVM. Таким образом, вместо того, чтобы измерять только задержку от одного субъекта к другому, мы решили измерить задержку туда и обратно, что позволяет нам System#nanoTime() безопасно использовать  . Чтобы измерить их, оба сообщения расширяются свойством timestamp и  receive соответственно изменяются:

def receive = {
    case Ping(m, ts) => {
        log.info(s"Ping: $m")
        //just forward the timestamp
        sender ! Pong(m, ts)
    }

    case Pong(m, ts) => {
        val roundTripTime = System.nanoTime() - ts
        log.info(s"Pong: $m with round trip time $roundTripTime ns.")
        sender ! Ping(m, next, System.nanoTime())
    }
}

Наши выносы для этого примера:

  • Распределение акторов легко возможно, но не сразу очевидно, как распределяются актеры (т.е. мы должны написать клиентское и серверное приложение в нашем случае)
  • Измерение времени в распределенной системе требует некоторых размышлений, но мы получили очень простое решение для измерения задержек в обоих направлениях.

В сторону: Typesafe Config

Мы обнаружили, что  конфигурация Typesafe  заслуживает внимания, поскольку у нее простой синтаксис, легко использовать API Scala / Java и это механизм конфигурации Akka. Конфигурация Typesafe имеет JSON-подобный синтаксис, называемый HOCON, который позволяет использовать разные типы данных, например числа, строки, массивы или вложенные «объекты». Он также имеет встроенную поддержку для замены заполнителя. Вы можете использовать его для переопределения настроек Akka по умолчанию, чтобы настроить приложение Akka без изменения одной строки кода или для обеспечения настраиваемой конфигурации для вашего собственного приложения. Вот структурная выдержка из конфигурации нашего приложения:

  # overriding Akka defaults
  akka {
    ...
  }

  # server-side Akka overrides
  server {
    akka {
      ...
    }
  }

  # client-side Akka overrides
  client {
    ...
  }

В серверном приложении мы загружаем конфигурацию следующим образом:

 // load akka defaults, ignore others
  val akkaConf = ConfigFactory.load("application-remoting.conf").withOnlyPath("akka")
  // load server default
  val serverConf = ConfigFactory.load("application-remoting.conf").getConfig("server")

  // merge server and akka config
  val conf = serverConf.withFallback(akkaConf)

Сначала мы загружаем конфигурацию по умолчанию в,  akkaConf а затем в конфигурацию выделенного сервера  serverConf. Наконец, мы объединяем их в единую конфигурацию под названием  conf. Когда мы читаем свойство из  conf, мы получим одно из блока ‘akka’ в разделе сервера, если оно есть, или из корневого блока ‘akka’, если нет. Таким же образом Akka считывает значения по умолчанию  reference.conf и переопределяет их свойствами,  application.conf если такой файл присутствует в пути к классам приложения. Если вы хотите узнать конфигурацию по умолчанию Akka, вы можете просмотреть  reference.conf или просмотреть документацию Akka .

Торговое приложение

Наконец, мы хотели попробовать более сложный пример, который требует большего моделирования предметной области и более сложного протокола обмена сообщениями. Пример вдохновлен торговым приложением, основанным на  примере производительности торговли Akka,  но мы отклонились во многих аспектах. В своем текущем состоянии торговое приложение имеет некоторые причуды, не обладает множеством функций и в настоящее время даже позволяет испаряться деньгам при определенных обстоятельствах… Это нехорошо, но мы смогли создать прототип некоторых концепций, и это отправная точка для дальнейших экспериментов и улучшений, которые мы обсудим позже более подробно.

Домен

Целью приложения является моделирование участников рынка, которые хотят покупать ценные бумаги. Каждый участник может размещать заказы: покупатели делают ставку, продавцы — спрашивают. Ставки и запросы сопоставляются в книге заказов (по одной на ценную бумагу) и совершается сделка. Алгоритм основан на  OrderBook.scala Акки . Он в основном пытается соотнести максимальные ставки с минимальными запросами как можно дольше. Если заказ не может быть выполнен полностью, он разделяется. Все товары участников отслеживаются на счетах: ценные бумаги хранятся в хранилище, наличные деньги хранятся на складе. Каждая учетная запись списывается, как только заказ размещен, чтобы избежать чрезмерной оплаты. При выполнении заказа товар зачисляется.

Моделирование приложения Akka

Приложение состоит из двух участников, которые связаны между собой пользовательским  маршрутизатором :

  • MarketParticipantУчастник рынка периодически размещает заказы. Он случайным образом решает, делать ли ставку или спрос, а также случайным образом принимает решение о предложенной цене, которая основана на текущей рыночной цене ценной бумаги, включая случайный спред.
  • OrderBook: Для OrderBook каждой сделки в системе существует один  актер для соответствия сделкам. Он принимает заказы и периодически сопоставляет их. После этого он уведомляет участников  MarketParticipantоб успешной торговле.
  • OrderRouter: Мы решили соединить  MarketParticipants и  OrderBookss через пользовательский маршрутизатор. Во время запуска маршрутизатор создает  OrderBook актеров. При поступлении заказа он решает, кто  OrderBook несет ответственность, и направляет заказ.

Системная структура торговой системы Диаграмма ниже показывает поток сообщений о сделке через систему. Участники рынка размещают заявку и предложение, через  OrderRouter которое пересылают сообщения соответствующим  OrderBook для этой ценной бумаги. Это соответствует приказам и отвечает обеим сторонам с  BidResponse и  AskResponse на успех. Они, в свою очередь, могут соответствующим образом скорректировать баланс своего счета. Поток сообщений через приложение

Реализация

Симуляция существует в двух вариантах: одноузловая реализация, которая загружается  TradingSimulationApp и распределенная реализация, которая реализует  RemoteClientApp симуляцию рынка и  RemoteServerApp симуляцию книг заказов. Для настройки различных аспектов приложения, таких как ценные бумаги, номер или участники рынка, мы использовали Typesafe Config. Подключение конкретной реализации достигается с помощью  шаблона Cake .

Открытые вопросы

Мы очень можем попробовать множество функций Akka, таких как становление / отмена, сохранение, настраиваемые маршрутизаторы или удаленное взаимодействие. Тем не менее, домен позволяет расширить пример приложения еще во многих различных аспектах, которые мы опишем чуть ниже.

Домен

Что касается домена, мы видим следующие области улучшения:

  • Потеря денег : приложение удерживает деньги в случае, если заказ разделен или даже испаряется, если цена покупки отличается от цены предложения. Это не было большой проблемой для нашего кратковременного моделирования, но это, безусловно, демонстрационный пример для реального приложения. Эту проблему можно решить разными способами. Например, мы можем отменить заказы через определенное количество времени, если они не могут быть выполнены, или просто зарезервировать деньги вместо того, чтобы реально взимать депозит.
  • Подтверждения : Подтверждение получения заказов позволит участникам рынка легче отслеживать состояние.

технологический

  • Репликация и отказоустойчивость . В настоящее время, если  OrderBook субъект терпит неудачу, все открытые сделки и рыночная оценка теряются. Использование выделенного  узла супервизора  и механизма репликации для каждого  OrderBook сделает приложение намного более надежным.
  • Мониторинг . Демонстрация может включать механизм мониторинга для визуализации различных бизнес-метрик, таких как текущие рыночные цены, количество открытых заказов или совокупный доход, а также технические метрики, такие как доставленные сообщения, задержка сообщений и пропускная способность системы.
  • Производительность : система вообще не настроена на производительность. Основываясь на мониторинге и различных сценариях, мы могли бы найти узкие места и настроить систему, основываясь на  широких возможностях конфигурации Akka .

Последние мысли

Хотя наши три лабораторных дня были очень продуктивными, мы едва поцарапали поверхность того, что возможно с Akka. Как вы уже догадались, прочитав этот пост в блоге, мы боролись с некоторыми аспектами, но это хороший знак: в конце концов, мы только узнаем что-то новое, борясь в первую очередь. Нам было очень весело в этой лаборатории, и мы с нетерпением ждем следующей, чтобы исследовать другие аспекты Akka.

Источники демонстрации можно найти в  репозитории ComSyto GitHub .