Представьте себе простую актерскую систему Akka, состоящую из двух сторон: MonitoringActor
и NetworkActor
. Всякий раз, когда кто-то ( клиент ) отправляет CheckHealth
первый, он спрашивает последнего, отправляя Ping
. NetworkActor
обязан ответить Pong
как можно скорее (сценарий). Получив MonitoringActor
такой ответ, он немедленно отвечает клиенту Up
сообщением о состоянии. Однако MonitoringActor
обязан отправить Down
ответ, если NetworkActor
не смог ответить в Pong
течение одной секунды (сценарий [B]). Оба рабочих процесса изображены ниже:
По-видимому, есть как минимум три способа реализации этой простой задачи в Akka, и мы изучим их плюсы и минусы.
Обычный актер
В этом сценарии MonitoringActor
слушает Pong
напрямую без каких-либо посредников:
class MonitoringActor extends Actor with ActorLogging { private val networkActor =context.actorOf(Props[NetworkActor], "network") private var origin:Option=None defreceive ={ case CheckHealth => networkActor ! Ping origin =Some(sender) casePong => origin.foreach(_! Up) origin =None } }
Реализация не NetworkActor
имеет значения, просто предположите, что она отвечает Pong
для каждого Ping
. Как видите, MonitoringActor
обрабатывает два сообщения: CheckHealth
отправленное клиентом и Pong
предположительно отправленное NetworkActor
. К сожалению, нам пришлось хранить ссылку клиента под origin
полем, потому что в противном случае он был бы потерян, если бы он CheckHealth
был обработан. Итак, мы добавили немного государства. Реализация довольно проста, но имеет довольно много проблем:
- Последующее
CheckHealth
перезапишет предыдущееorigin
CheckHealth
не должно быть разрешено в ожиданииPong
- Если
Pong
никогда не прибудет, мы останемся в противоречивом состоянии - … потому что у нас еще нет условия ожидания 1 секунды
Но прежде чем мы реализуем условие тайм-аута, давайте немного реорганизовать наш код, чтобы сделать состояние более явным и безопасным для типов:
class MonitoringActor extends Actor with ActorLogging { private val networkActor =context.actorOf(Props[NetworkActor], "network") def receive =waitingForCheckHealth private def waitingForCheckHealth:Receive ={ caseCheckHealth => networkActor ! Ping context become waitingForPong(sender) } private def waitingForPong(origin:ActorRef):Receive ={ casePong => origin ! Up context become waitingForCheckHealth } }
context.become()
позволяет изменить поведение актера на лету . В нашем случае мы либо ждем, CheckHealth
либо Pong
— но никогда не оба. Но куда делось государство ( origin
ссылка)? Ну, это ловко спрятано. waitingForPong()
Метод принимает в origin
качестве параметра и возвращает PartialFunction
. Эта функция закрывает этот параметр, поэтому глобальная переменная actor больше не нужна. Хорошо, теперь мы готовы реализовать 1-секундный таймаут при ожидании Pong
:
def receive =waitingForCheckHealth private def waitingForCheckHealth:Receive ={ case CheckHealth => networkActor ! Ping implicit val ec =context.dispatcher val timeout =context.system.scheduler. scheduleOnce(1.second, self, Down) context become waitingForPong(sender, timeout) } private def waitingForPong(origin:ActorRef, timeout:Cancellable):Receive =LoggingReceive { casePong => timeout.cancel() origin ! Up context become receive caseDown => origin ! Down context become receive }
После отправки Ping
мы сразу планируем отправку Down
сообщения себе примерно через одну секунду. Тогда мы идем в waitingForPong
. Если Pong
прибывает, мы отменяем запланированный Down
и отправляем Up
вместо. Однако, если мы впервые получили, Down
это означает, что прошла одна секунда. Итак, мы переходим Down
обратно к клиенту. Это только у меня или может быть такая простая задача не требует такого количества кода?
Кроме того, обратите внимание, что наша компания MonitoringActor
не способна обрабатывать более одного клиента одновременно. После того, как CheckHealth
было получено никаких больше клиентов не допускается до тех пор , Up
или Down
отправляется обратно. Кажется довольно ограничивающим.
Составление фьючерсов
Другой подход к той же самой проблеме — использование ask
модели и будущего. Внезапно код становится намного короче и легче для чтения:
def receive ={ case CheckHealth => implicit val timeout:Timeout =1.second implicit val ec =context.dispatcher val origin =sender networkActor ? Ping andThen { caseSuccess(_) => origin ! Up caseFailure(_) => origin ! Down } }
Это оно! Мы просимnetworkActor
отправить, Ping
а затем, когда приходит ответ, мы отвечаем клиенту. В случае, если это было Success(_)
( _
заполнитель означает, Pong
но нам все равно) мы отправляем Up
. Если это было Failure(_)
(где, _
скорее всего, AskTimeout
бросили через одну секунду без ответа), мы вперед Down
. В этом коде есть одна огромная ловушка. Мы не можем sender
напрямую использовать как обратные вызовы, так и успешные, потому что эти фрагменты кода могут быть выполнены другим потоком намного позже. sender
Значение временное, и к тому времени, когда Pong
оно придет, оно может указывать на любого другого актера, который случайно что-то нам прислал. Таким образом, мы должны сохранять оригинальность sender
в origin
локальная переменная и захватить его вместо этого.
Если вас это раздражает, вы можете поиграть с pipeTo
шаблоном:
def receive =LoggingReceive { case CheckHealth => implicit val ec =context.dispatcher networkActor.ask(Ping)(1.second). map{_=> Up}. recover{case_=> Down}. pipeTo(sender) }
То же, что и раньше ask
(синоним ?
метода) networkActor
с таймаутом. Если получен правильный ответ, мы сопоставляем его Up
. Если вместо этого будущее заканчивается исключением, мы восстанавливаем его, сопоставляя его с Down
сообщением. Независимо от того, какая «ветвь» была задействована, результат передается по трубопроводуsender
.
Вы должны задать себе вопрос: почему приведенный выше код подходит, несмотря на sender
то, что предыдущий код был бы поврежден? Если вы внимательно посмотрите на объявления, вы заметите, что они pipeTo()
принимают ActorRef
по значению, а не по имени. Это означает, что sender
вычисляется сразу, когда выполняется выражение, а не позже, когда возвращаются ответы. Мы идем по тонкому льду здесь, поэтому, пожалуйста, будьте осторожны, делая такие предположения.
Выделенный актер
Актеры легкие, так почему бы не создать один только для проверки здоровья? Этот выброшенный актер будет отвечать за общение NetworkActor
и отправку ответа клиенту. Единственной обязанностью MonitoringActor
будет создание экземпляра этого единовременного актера:
class MonitoringActor extendsActor withActorLogging { def receive ={ case CheckHealth => context.actorOf(Props(classOf[PingActor], networkActor, sender)) } }
PingActor
довольно просто и похоже на самое первое решение:
class PingActor(networkActor:ActorRef, origin:ActorRef) extends Actor withActorLogging { networkActor ! Ping context.setReceiveTimeout(1.second) def receive ={ case Pong => origin ! Up self ! PoisonPill case ReceiveTimeout => origin ! Down self ! PoisonPill } }
При создании актера мы посылаем Ping
к , NetworkActor
но и планировать время ожидания сообщения. Теперь мы ждем времени Pong
или времени Down
. В обоих случаях мы останавливаемся в конце концов, потому что PingActor
больше не нужны. Конечно, MonitoringActor
можно создавать несколько независимых NetworkActor
s одновременно.
Это решение сочетает в себе простоту и чистоту первого, но надежно как второе. Конечно, это также требует большей части кода. Вам решать, какую технику вы используете в реальных случаях использования. Кстати, после написания этой статьи я натолкнулся на актеров Ask, Tell и Per-request, которые касаются одной и той же проблемы и вводят аналогичные подходы. Определенно посмотрите и на это!