Представьте себе простую систему актеров Akka, состоящую из двух сторон: MonitoringActor
и NetworkActor
. Всякий раз, когда кто-то ( клиент ) отправляет CheckHealth
первому, он запрашивает второе, отправляя Ping
. NetworkActor
обязан ответить Pong
как можно скорее (сценарий [A]). Как только MonitoringActor
получает такой ответ, он немедленно отвечает клиенту сообщением о состоянии Up
. Однако MonitoringActor
обязан отправить ответ Down
если NetworkActor
не смог ответить Pong
течение одной секунды (сценарий [B]). Оба рабочих процесса изображены ниже:
По-видимому, есть как минимум три способа реализации этой простой задачи в Akka, и мы изучим их плюсы и минусы.
Обычный актер
В этом сценарии MonitoringActor
прослушивает Pong
напрямую без посредников:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
class MonitoringActor extends Actor with ActorLogging { private val networkActor = context.actorOf(Props[NetworkActor], "network" ) private var origin: Option[ActorRef] = None def receive = { case CheckHealth => networkActor ! Ping origin = Some(sender) case Pong => origin.foreach(_ ! Up) origin = None } } |
Реализация NetworkActor
имеет значения, просто предположите, что он отвечает Pong
для каждого Ping
. Как вы можете видеть, MonitoringActor
обрабатывает два сообщения: CheckHealth
отправленное клиентом, и Pong
предположительно, отправленный NetworkActor
. К сожалению, нам пришлось хранить ссылку на клиента в поле источника, потому что в противном случае она была бы потеряна после обработки CheckHealth
. Итак, мы добавили немного государства. Реализация довольно проста, но имеет довольно много проблем:
- Последующее
CheckHealth
перезапишет предыдущееorigin
-
CheckHealth
не должно быть разрешено при ожиданииPong
- Если
Pong
никогда не прибудет, мы останемся в противоречивом состоянии - … Потому что у нас еще нет условия ожидания 1 секунды
Но прежде чем мы реализуем условие тайм-аута, давайте немного реорганизовать наш код, чтобы сделать состояние более явным и безопасным для типов:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
class MonitoringActor extends Actor with ActorLogging { private val networkActor = context.actorOf(Props[NetworkActor], "network" ) def receive = waitingForCheckHealth private def waitingForCheckHealth: Receive = { case CheckHealth => networkActor ! Ping context become waitingForPong(sender) } private def waitingForPong(origin: ActorRef): Receive = { case Pong => origin ! Up context become waitingForCheckHealth } } |
context.become()
позволяет изменить поведение актера на лету . В нашем случае мы либо ждем CheckHealth
или Pong
— но никогда не оба. Но куда делось государство (ссылка на источник)? Ну, это ловко спрятано. waitingForPong()
принимает origin
качестве параметра и возвращает функцию PartialFunction
. Эта функция закрывает этот параметр, поэтому глобальная переменная actor больше не нужна. Хорошо, теперь мы готовы реализовать 1-секундный тайм-аут при ожидании Pong
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
def receive = waitingForCheckHealth private def waitingForCheckHealth: Receive = { case CheckHealth => networkActor ! Ping implicit val ec = context.dispatcher val timeout = context.system.scheduler. scheduleOnce( 1 .second, self, Down) context become waitingForPong(sender, timeout) } private def waitingForPong(origin: ActorRef, timeout: Cancellable): Receive = LoggingReceive { case Pong => timeout.cancel() origin ! Up context become receive case Down => origin ! Down context become receive } |
После отправки Ping
мы немедленно планируем отправку сообщения « Down
себе примерно через одну секунду. Тогда мы идем в waitingForPong
. Если Pong
прибывает, мы отменяем запланированный Down
и отправляем Up
. Однако, если мы впервые получили Down
это означает, что истекла одна секунда. Итак, мы пересылаем Down
обратно к клиенту. Это только у меня или может быть такая простая задача не требует такого количества кода?
Кроме того, обратите внимание, что наш MonitoringActor
не способен обрабатывать более одного клиента одновременно. После получения CheckHealth
клиентам больше не разрешено, пока не будет отправлено сообщение « Up
или « Down
. Кажется довольно ограничивающим.
Составление фьючерсов
Другой подход к той же самой проблеме — использование модели спроса и будущего. Внезапно код становится намного короче и легче для чтения:
01
02
03
04
05
06
07
08
09
10
|
def receive = { case CheckHealth => implicit val timeout: Timeout = 1 .second implicit val ec = context.dispatcher val origin = sender networkActor ? Ping andThen { case Success(_) => origin ! Up case Failure(_) => origin ! Down } } |
Это оно! Мы запрашиваем networkActor
, отправляя Ping
а затем, когда приходит ответ, мы отвечаем клиенту. В случае Success(_)
( _
заполнитель означает Pong
но нам все равно) мы отправляем Up
. Если это был Failure(_)
(где _
наиболее вероятно содержит AskTimeout
брошенный через одну секунду без ответа), мы пересылаем Down
. В этом коде есть одна огромная ловушка. И в обратном вызове, и в успешном, и в случае неудачи мы не можем использовать sender
напрямую, потому что этот фрагмент кода может быть выполнен гораздо позже другим потоком. Значение sender
является кратковременным, и к тому времени, когда Pong
прибудет, это может указывать на любого другого актера, который случайно отправил нам что-то. Таким образом, мы должны сохранить оригинального sender
в локальной переменной источника и перехватить его вместо этого.
Если вас это раздражает, вы можете поиграть с pipeTo
:
1
2
3
4
5
6
7
8
|
def receive = LoggingReceive { case CheckHealth => implicit val ec = context.dispatcher networkActor.ask(Ping)( 1 .second). map{_ => Up}. recover{ case _ => Down}. pipeTo(sender) } |
То же, что и раньше, мы ask
(синоним ?
networkActor
) networkActor
с тайм-аутом. Если получен правильный ответ, мы сопоставляем его с Up
. Если вместо этого будущее заканчивается исключением, мы восстанавливаемся из него, сопоставляя его с сообщением « Down
. Независимо от того, какая «ветка» была использована, результат передается sender
.
Вы должны задать себе вопрос: почему приведенный выше код подходит, несмотря на использование sender
то время как предыдущий был бы поврежден? Если вы внимательно посмотрите на объявления, вы заметите, что pipeTo()
принимает ActorRef
по значению, а не по имени. Это означает, что sender
вычисляется сразу при выполнении выражения, а не позже, когда возвращаются ответы. Мы идем по тонкому льду здесь, поэтому, пожалуйста, будьте осторожны, делая такие предположения.
Выделенный актер
Актеры легкие, так почему бы не создать один только для проверки здоровья? Этот удаленный субъект будет отвечать за связь с NetworkActor
и отправку ответа клиенту. Единственной обязанностью MonitoringActor
будет создание экземпляра этого единовременного субъекта:
1
2
3
4
5
6
7
8
|
class MonitoringActor extends Actor with ActorLogging { def receive = { case CheckHealth => context.actorOf(Props(classOf[PingActor], networkActor, sender)) } } |
PingActor
довольно прост и похож на самое первое решение:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
class PingActor(networkActor: ActorRef, origin: ActorRef) extends Actor with ActorLogging { networkActor ! Ping context.setReceiveTimeout( 1 .second) def receive = { case Pong => origin ! Up self ! PoisonPill case ReceiveTimeout => origin ! Down self ! PoisonPill } } |
Когда актер создан, мы отправляем Ping
в NetworkActor
но также планируем сообщение о тайм-ауте. Теперь мы ждем либо Pong
либо тайм-аута Down
. В обоих случаях мы в конечном итоге PingActor
потому что PingActor
больше не нужен. Конечно, MonitoringActor
может создавать несколько независимых NetworkActor
одновременно.
Это решение сочетает в себе простоту и чистоту первого, но надежно как второе. Конечно, это также требует большей части кода. Вам решать, какую технику вы используете в реальных случаях использования. Кстати, после написания этой статьи я натолкнулся на актеров Ask, Tell и Per-request, которые касаются одной и той же проблемы и вводят аналогичные подходы. Определенно посмотрите и на это!