Статьи

Три варианта шаблона запроса-ответа в Акке

Представьте себе простую актерскую систему Akka, состоящую из двух сторон:  MonitoringActor и  NetworkActor. Всякий раз, когда кто-то ( клиент ) отправляет  CheckHealth первый, он спрашивает последнего, отправляя  PingNetworkActor обязан ответить  Pong как можно скорее (сценарий). Получив  MonitoringActor такой ответ, он немедленно отвечает клиенту  Up сообщением о состоянии. Однако  MonitoringActor обязан отправить  Down ответ, если  NetworkActor не смог ответить в  Pong течение одной секунды (сценарий [B]). Оба рабочих процесса изображены ниже:

По-видимому, есть как минимум три способа реализации этой простой задачи в Akka, и мы изучим их плюсы и минусы.

Обычный актер

В этом сценарии  MonitoringActor слушает  Pong напрямую без каких-либо посредников:

class MonitoringActor extends Actor with ActorLogging {
private val networkActor =context.actorOf(Props[NetworkActor], "network")
private var origin:Option=None
defreceive ={
case CheckHealth =>
networkActor ! Ping
origin =Some(sender)
casePong =>
origin.foreach(_! Up)
origin =None
}
}

Реализация не  NetworkActor имеет значения, просто предположите, что она отвечает  Pong для каждого  Ping. Как видите, MonitoringActor обрабатывает два сообщения:  CheckHealth отправленное клиентом и  Pong предположительно отправленное NetworkActor. К сожалению, нам пришлось хранить ссылку клиента под  origin полем, потому что в противном случае он был бы потерян, если бы он  CheckHealth был обработан. Итак, мы добавили немного государства. Реализация довольно проста, но имеет довольно много проблем:

  • Последующее  CheckHealth перезапишет предыдущее origin
  • CheckHealth не должно быть разрешено в ожидании Pong
  • Если  Pong никогда не прибудет, мы останемся в противоречивом состоянии
  • … потому что у нас еще нет условия ожидания 1 секунды

Но прежде чем мы реализуем условие тайм-аута, давайте немного реорганизовать наш код, чтобы сделать состояние более явным и безопасным для типов:

class MonitoringActor extends Actor with ActorLogging {
private val networkActor =context.actorOf(Props[NetworkActor], "network")
def receive =waitingForCheckHealth
private def waitingForCheckHealth:Receive ={
caseCheckHealth =>
networkActor ! Ping
context become waitingForPong(sender)
}
private def waitingForPong(origin:ActorRef):Receive ={
casePong =>
origin ! Up
context become waitingForCheckHealth
}
}

context.become() позволяет  изменить поведение актера на лету . В нашем случае мы либо ждем,  CheckHealth либо Pong — но никогда не оба. Но куда делось государство ( origin ссылка)? Ну, это ловко спрятано. waitingForPong()Метод принимает в  origin качестве параметра и возвращает  PartialFunction. Эта функция закрывает этот параметр, поэтому глобальная переменная actor больше не нужна. Хорошо, теперь мы готовы реализовать 1-секундный таймаут при ожидании  Pong:

def receive =waitingForCheckHealth
private def waitingForCheckHealth:Receive ={
case CheckHealth =>
networkActor ! Ping
implicit val ec =context.dispatcher
val timeout =context.system.scheduler.
scheduleOnce(1.second, self, Down)
context become waitingForPong(sender, timeout)
}
private def waitingForPong(origin:ActorRef, timeout:Cancellable):Receive =LoggingReceive {
casePong =>
timeout.cancel()
origin ! Up
context become receive
caseDown =>
origin ! Down
context become receive
}

После отправки  Ping мы сразу планируем отправку  Down сообщения себе примерно через одну секунду. Тогда мы идем в  waitingForPong. Если  Pong прибывает, мы отменяем запланированный  Down и отправляем  Up вместо. Однако, если мы впервые получили,  Down это означает, что прошла одна секунда. Итак, мы переходим  Down обратно к клиенту. Это только у меня или может быть такая простая задача не требует такого количества кода?

Кроме того, обратите внимание, что наша компания  MonitoringActor не способна обрабатывать более одного клиента одновременно. После того, как CheckHealth было получено никаких больше клиентов не допускается до тех пор  , Up или  Down отправляется обратно. Кажется довольно ограничивающим. 

Составление фьючерсов

Другой подход к той же самой проблеме — использование  ask модели и будущего. Внезапно код становится намного короче и легче для чтения:

def receive ={
case CheckHealth =>
implicit val timeout:Timeout =1.second
implicit val ec =context.dispatcher
val origin =sender
networkActor ? Ping andThen {
caseSuccess(_) => origin ! Up
caseFailure(_) => origin ! Down
}
}

Это оно! Мы  просимnetworkActor  отправить,  Pingа затем,  когда приходит ответ, мы отвечаем клиенту. В случае, если это было Success(_) ( _ заполнитель означает,  Pong но нам все равно) мы отправляем  Up. Если это было  Failure(_) (где,  _ скорее всего,  AskTimeout бросили через одну секунду без ответа), мы вперед  Down. В этом коде есть одна огромная ловушка. Мы не можем sender напрямую использовать как обратные вызовы, так и успешные,  потому что эти фрагменты кода могут быть выполнены другим потоком намного позже.  senderЗначение временное, и к тому времени, когда  Pong оно придет, оно может указывать на любого другого актера, который случайно что-то нам прислал. Таким образом, мы должны сохранять оригинальность  sender в origin локальная переменная и захватить его вместо этого.

Если вас это раздражает, вы можете поиграть с  pipeTo шаблоном:

def receive =LoggingReceive {
case CheckHealth =>
implicit val ec =context.dispatcher
networkActor.ask(Ping)(1.second).
map{_=> Up}.
recover{case_=> Down}.
pipeTo(sender)
}

То же, что и раньше  ask (синоним  ? метода)  networkActor с таймаутом. Если получен правильный ответ, мы сопоставляем его  Up. Если вместо этого будущее заканчивается исключением, мы восстанавливаем его, сопоставляя его с  Down сообщением. Независимо от того, какая «ветвь» была задействована, результат  передается  по  трубопроводуsender .

Вы должны задать себе вопрос: почему приведенный выше код подходит, несмотря на sender то, что предыдущий код  был бы поврежден? Если вы внимательно посмотрите на объявления, вы заметите, что они  pipeTo() принимают  ActorRef по значению, а не по имени. Это означает, что  sender вычисляется сразу, когда выполняется выражение, а не позже, когда возвращаются ответы. Мы идем по тонкому льду здесь, поэтому, пожалуйста, будьте осторожны, делая такие предположения. 

Выделенный актер

Актеры легкие, так почему бы не создать один только для проверки здоровья? Этот выброшенный актер будет отвечать за общение  NetworkActor и отправку ответа клиенту. Единственной обязанностью MonitoringActor будет создание экземпляра этого единовременного актера:

class MonitoringActor extendsActor withActorLogging {
def receive ={
case CheckHealth =>
context.actorOf(Props(classOf[PingActor], networkActor, sender))
}
}

PingActor довольно просто и похоже на самое первое решение:

class PingActor(networkActor:ActorRef, origin:ActorRef) extends Actor withActorLogging {
networkActor ! Ping
context.setReceiveTimeout(1.second)
def receive ={
case Pong =>
origin ! Up
self ! PoisonPill
case ReceiveTimeout =>
origin ! Down
self ! PoisonPill
}
}

При создании актера мы посылаем  Ping к  , NetworkActor но и планировать время ожидания сообщения. Теперь мы ждем времени Pong или времени  Down. В обоих случаях мы останавливаемся в конце концов, потому что  PingActor больше не нужны. Конечно,  MonitoringActor можно создавать несколько независимых  NetworkActors одновременно.

Это решение сочетает в себе простоту и чистоту первого, но надежно как второе. Конечно, это также требует большей части кода. Вам решать, какую технику вы используете в реальных случаях использования. Кстати, после написания этой статьи я натолкнулся на актеров Ask, Tell и Per-request,  которые касаются одной и той же проблемы и вводят аналогичные подходы. Определенно посмотрите и на это!