Статьи

Три варианта шаблона запроса-ответа в Акке

Представьте себе простую систему актеров Akka, состоящую из двух сторон: MonitoringActor и NetworkActor . Всякий раз, когда кто-то ( клиент ) отправляет CheckHealth первому, он запрашивает второе, отправляя Ping . NetworkActor обязан ответить Pong как можно скорее (сценарий [A]). Как только MonitoringActor получает такой ответ, он немедленно отвечает клиенту сообщением о состоянии Up . Однако MonitoringActor обязан отправить ответ Down если NetworkActor не смог ответить Pong течение одной секунды (сценарий [B]). Оба рабочих процесса изображены ниже:

actors2

По-видимому, есть как минимум три способа реализации этой простой задачи в Akka, и мы изучим их плюсы и минусы.

Обычный актер

В этом сценарии MonitoringActor прослушивает Pong напрямую без посредников:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
class MonitoringActor extends Actor with ActorLogging {
  
  private val networkActor = context.actorOf(Props[NetworkActor], "network")
  private var origin: Option[ActorRef] = None
  
  def receive = {
    case CheckHealth =>
      networkActor ! Ping
      origin = Some(sender)
    case Pong =>
      origin.foreach(_ ! Up)
      origin = None
  }
}

Реализация NetworkActor имеет значения, просто предположите, что он отвечает Pong для каждого Ping . Как вы можете видеть, MonitoringActor обрабатывает два сообщения: CheckHealth отправленное клиентом, и Pong предположительно, отправленный NetworkActor . К сожалению, нам пришлось хранить ссылку на клиента в поле источника, потому что в противном случае она была бы потеряна после обработки CheckHealth . Итак, мы добавили немного государства. Реализация довольно проста, но имеет довольно много проблем:

  • Последующее CheckHealth перезапишет предыдущее origin
  • CheckHealth не должно быть разрешено при ожидании Pong
  • Если Pong никогда не прибудет, мы останемся в противоречивом состоянии
  • … Потому что у нас еще нет условия ожидания 1 секунды

Но прежде чем мы реализуем условие тайм-аута, давайте немного реорганизовать наш код, чтобы сделать состояние более явным и безопасным для типов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
class MonitoringActor extends Actor with ActorLogging {
  
  private val networkActor = context.actorOf(Props[NetworkActor], "network")
  
  def receive = waitingForCheckHealth
  
  private def waitingForCheckHealth: Receive = {
    case CheckHealth =>
      networkActor ! Ping
      context become waitingForPong(sender)
  }
  
  private def waitingForPong(origin: ActorRef): Receive = {
    case Pong =>
      origin ! Up
      context become waitingForCheckHealth
  }
}

context.become() позволяет изменить поведение актера на лету . В нашем случае мы либо ждем CheckHealth или Pong — но никогда не оба. Но куда делось государство (ссылка на источник)? Ну, это ловко спрятано. waitingForPong() принимает origin качестве параметра и возвращает функцию PartialFunction . Эта функция закрывает этот параметр, поэтому глобальная переменная actor больше не нужна. Хорошо, теперь мы готовы реализовать 1-секундный тайм-аут при ожидании Pong :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
def receive = waitingForCheckHealth
  
private def waitingForCheckHealth: Receive = {
  case CheckHealth =>
    networkActor ! Ping
    implicit val ec = context.dispatcher
    val timeout = context.system.scheduler.
      scheduleOnce(1.second, self, Down)
    context become waitingForPong(sender, timeout)
}
  
private def waitingForPong(origin: ActorRef, timeout: Cancellable): Receive = LoggingReceive {
  case Pong =>
    timeout.cancel()
    origin ! Up
    context become receive
  case Down =>
    origin ! Down
    context become receive
}

После отправки Ping мы немедленно планируем отправку сообщения « Down себе примерно через одну секунду. Тогда мы идем в waitingForPong . Если Pong прибывает, мы отменяем запланированный Down и отправляем Up . Однако, если мы впервые получили Down это означает, что истекла одна секунда. Итак, мы пересылаем Down обратно к клиенту. Это только у меня или может быть такая простая задача не требует такого количества кода?

Кроме того, обратите внимание, что наш MonitoringActor не способен обрабатывать более одного клиента одновременно. После получения CheckHealth клиентам больше не разрешено, пока не будет отправлено сообщение « Up или « Down . Кажется довольно ограничивающим.

Составление фьючерсов

Другой подход к той же самой проблеме — использование модели спроса и будущего. Внезапно код становится намного короче и легче для чтения:

01
02
03
04
05
06
07
08
09
10
def receive = {
  case CheckHealth =>
    implicit val timeout: Timeout = 1.second
    implicit val ec = context.dispatcher
    val origin = sender
    networkActor ? Ping andThen {
      case Success(_) => origin ! Up
      case Failure(_) => origin ! Down
    }
}

Это оно! Мы запрашиваем networkActor , отправляя Ping а затем, когда приходит ответ, мы отвечаем клиенту. В случае Success(_) ( _ заполнитель означает Pong но нам все равно) мы отправляем Up . Если это был Failure(_) (где _ наиболее вероятно содержит AskTimeout брошенный через одну секунду без ответа), мы пересылаем Down . В этом коде есть одна огромная ловушка. И в обратном вызове, и в успешном, и в случае неудачи мы не можем использовать sender напрямую, потому что этот фрагмент кода может быть выполнен гораздо позже другим потоком. Значение sender является кратковременным, и к тому времени, когда Pong прибудет, это может указывать на любого другого актера, который случайно отправил нам что-то. Таким образом, мы должны сохранить оригинального sender в локальной переменной источника и перехватить его вместо этого.

Если вас это раздражает, вы можете поиграть с pipeTo :

1
2
3
4
5
6
7
8
def receive = LoggingReceive {
  case CheckHealth =>
    implicit val ec = context.dispatcher
    networkActor.ask(Ping)(1.second).
      map{_ => Up}.
      recover{case _ => Down}.
      pipeTo(sender)
}

То же, что и раньше, мы ask (синоним ? networkActor ) networkActor с тайм-аутом. Если получен правильный ответ, мы сопоставляем его с Up . Если вместо этого будущее заканчивается исключением, мы восстанавливаемся из него, сопоставляя его с сообщением « Down . Независимо от того, какая «ветка» была использована, результат передается sender .

Вы должны задать себе вопрос: почему приведенный выше код подходит, несмотря на использование sender то время как предыдущий был бы поврежден? Если вы внимательно посмотрите на объявления, вы заметите, что pipeTo() принимает ActorRef по значению, а не по имени. Это означает, что sender вычисляется сразу при выполнении выражения, а не позже, когда возвращаются ответы. Мы идем по тонкому льду здесь, поэтому, пожалуйста, будьте осторожны, делая такие предположения.

Выделенный актер

Актеры легкие, так почему бы не создать один только для проверки здоровья? Этот удаленный субъект будет отвечать за связь с NetworkActor и отправку ответа клиенту. Единственной обязанностью MonitoringActor будет создание экземпляра этого единовременного субъекта:

1
2
3
4
5
6
7
8
class MonitoringActor extends Actor with ActorLogging {
  
  def receive = {
    case CheckHealth =>
      context.actorOf(Props(classOf[PingActor], networkActor, sender))
  }
  
}

PingActor довольно прост и похож на самое первое решение:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
class PingActor(networkActor: ActorRef, origin: ActorRef) extends Actor with ActorLogging {
  
  networkActor ! Ping
  context.setReceiveTimeout(1.second)
  
  def receive = {
    case Pong =>
      origin ! Up
      self ! PoisonPill
    case ReceiveTimeout =>
      origin ! Down
      self ! PoisonPill
  }
}

Когда актер создан, мы отправляем Ping в NetworkActor но также планируем сообщение о тайм-ауте. Теперь мы ждем либо Pong либо тайм-аута Down . В обоих случаях мы в конечном итоге PingActor потому что PingActor больше не нужен. Конечно, MonitoringActor может создавать несколько независимых NetworkActor одновременно.

Это решение сочетает в себе простоту и чистоту первого, но надежно как второе. Конечно, это также требует большей части кода. Вам решать, какую технику вы используете в реальных случаях использования. Кстати, после написания этой статьи я натолкнулся на актеров Ask, Tell и Per-request, которые касаются одной и той же проблемы и вводят аналогичные подходы. Определенно посмотрите и на это!

Ссылка: три варианта шаблона запроса-ответа в Akka от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей .