Сбои больше похожи на функцию распределенных систем. А с помощью модели Akka, допускающей сбои в отказоустойчивости, вы можете добиться четкого разделения между вашей бизнес-логикой и логикой обработки сбоев (логикой контроля). Все без особых усилий. Это довольно удивительно. Это тема нашего обсуждения сейчас.
КОНТРОЛЬ ЗА АКТЕРОМ
Представьте себе стек вызовов методов, и самый верхний метод в вашем стеке создает исключение. Что можно сделать методами вниз по стеку?
- Исключение может быть перехвачено и обработано для восстановления
- Исключение может быть поймано, может быть зарегистрировано и храниться в тайне.
- Методы в стеке также могут выбрать исключение исключения полностью (или могут быть перехвачены и переброшены)
Представьте, что все методы, пока основной метод не обрабатывает исключение. В этом случае программа завершает работу после написания эссе об исключении для консоли.
Вы также можете сравнить тот же сценарий с порождением потоков. Если дочерний поток генерирует исключение , и если run
или call
метод не обрабатывает его, то исключение , как ожидается , будет обрабатываться родительской нити или основной нити, каков бы ни был случай. Если основной поток не обрабатывает это, то система завершает работу.
Давайте сделаем это еще раз — если дочерний субъект, который был создан с использованием context.actorOf
сбоев с исключением, родительский субъект (он же супервизор) может предпочесть обрабатывать любые сбои дочернего субъекта. Если это так, он может предпочесть обработать его и восстановить ( Restart
/ Resume
). Иначе, добавьте исключение ( Escalate
) к его родителю. В качестве альтернативы, это может быть только Stop
детский актер — это конец истории для этого ребенка. Почему я сказал родитель (он же руководитель)? Просто потому, что подход Акки к надзору — это родительский надзор — это означает, что надзирать за ними могут только создатели актеров.
Это оно !! Мы в значительной степени покрыли все наблюдения Directives
(что можно было сделать с отказами).
СТРАТЕГИИ
Ах, я забыл упомянуть об этом: вы уже знаете, что актер Акка может создавать детей и создавать столько детей, сколько они захотят.
Теперь рассмотрим два сценария:
1. OneForOneStrategy
Ваш актер порождает нескольких дочерних актеров, и каждый из этих дочерних актеров подключается к различным источникам данных. Допустим, вы запускаете приложение, которое переводит английское слово на несколько языков.
Предположим, что один дочерний актер терпит неудачу, и вы можете пропустить этот результат в окончательном списке, что бы вы хотели сделать? Выключить службу? Нет, вы можете просто перезапустить / остановить только этот дочерний актер. Не так ли? Теперь это называется OneForOneStrategy
в терминах стратегии контроля Akka — если один актер потерпит неудачу, просто справьтесь с одним.
В зависимости от ваших бизнес — исключений, вы хотели бы реагировать по- разному ( Stop
, Restart
, Escalate
, Resume
) для различных исключений. Чтобы настроить свою собственную стратегию, вы просто переопределяете supervisorStrategy
в своем классе Actor.
Пример объявления OneForOneStrategy
будет
import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy.Stop class TeacherActorOneForOne extends Actor with ActorLogging { ... ... override val supervisorStrategy=OneForOneStrategy() { case _: MinorRecoverableException => Restart case _: Exception => Stop } ... ...
2. AllForOneStrategy
Предположим, что вы выполняете внешнюю сортировку (еще один пример, чтобы доказать, что мое творчество — отстой !!), и каждый ваш кусок обрабатывается другим актером. Внезапно, один Актер не может бросить исключение. Не имеет смысла продолжать обработку остальных фрагментов, потому что конечный результат будет неверным. Итак, это логично для Stop
ВСЕХ актеров.
Почему я сказал, Stop
а не Restart
в предыдущей строке? Потому что Restart
ing также не будет иметь никакого смысла для этого варианта использования, учитывая, что почтовый ящик для каждого из этих субъектов не будет очищен при перезапуске. Так что, если мы перезапустим, остальные куски все равно будут обработаны. Это не то, что мы хотим. Воссоздание Актеров с блестящими новыми почтовыми ящиками было бы правильным подходом здесь.
Опять же, так же, как OneForOneStrategy
, вы просто переопределите supervisorStrategy
с реализацией AllForOneStrategy
И пример будет
import akka.actor.{Actor, ActorLogging} import akka.actor.AllForOneStrategy import akka.actor.SupervisorStrategy.Escalate import akka.actor.SupervisorStrategy.Stop class TeacherActorAllForOne extends Actor with ActorLogging { ... override val supervisorStrategy = AllForOneStrategy() { case _: MajorUnRecoverableException => Stop case _: Exception => Escalate } ... ...
DIRECTIVES
Конструктор обоих AllForOneStrategy
и the OneForOneStrategy
принимает PartialFunction[Throwable,Directive]
вызов, Decider
который отображает a Throwable
в a, Directive
как вы можете видеть здесь:
case _: MajorUnRecoverableException => Stop
Есть просто только четыре вида директив — Stop
, Resume
, Escalate
и Restart
Стоп
Дочерний субъект останавливается в случае возникновения исключительной ситуации, и любые сообщения остановленному субъекту, очевидно, попадают в очередь deadLetters.
Резюме
Дочерний актер просто игнорирует сообщение, вызвавшее исключение, и продолжает обработку остальных сообщений в очереди.
Рестарт
Дочерний актер останавливается и инициализируется новый актер. Обработка остальных сообщений в почтовом ящике продолжается. Остальной мир не знает, что это произошло, так как тот же ActorRef присоединен к новому Актору.
Эскалация Супервизор уклоняется от ошибки и позволяет своему супервизору обрабатывать исключение.
СТРАТЕГИЯ ПО УМОЛЧАНИЮ
Что если наш Актер не указывает никакой Стратегии, но создал дочерних Актеров. Как они обрабатываются? В Actor
признаке объявлена стратегия по умолчанию, которая (если она сокращена) выглядит следующим образом:
override val supervisorStrategy=OneForOneStrategy() { case _: ActorInitializationException=> Stop case _: ActorKilledException => Stop case _: DeathPactException => Stop case _: Exception => Restart }
Итак, по сути, стратегия по умолчанию обрабатывает четыре случая:
1. ACTORINITIALIZATIONEXCEPTION => STOP
Когда Актер не может быть инициализирован, он бросает ActorInitializationException
. Актер был бы остановлен тогда. Давайте смоделируем это, бросив исключение в preStart
обратном вызове:
package me.rerun.akkanotes.supervision import akka.actor.{ActorSystem, Props} import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest import akka.actor.Actor import akka.actor.ActorLogging object ActorInitializationExceptionApp extends App{ val actorSystem=ActorSystem("ActorInitializationException") val actor=actorSystem.actorOf(Props[ActorInitializationExceptionActor], "initializationExceptionActor") actor!"someMessageThatWillGoToDeadLetter" } class ActorInitializationExceptionActor extends Actor with ActorLogging{ override def preStart={ throw new Exception("Some random exception") } def receive={ case _=> } }
Запуск ActorInitializationExceptionApp
сгенерирует ActorInitializationException
(duh !!) и затем переместит все сообщения в очередь сообщений deadLetters
актера:
Журнал
[ERROR] [11/10/2014 16:08:46.569] [ActorInitializationException-akka.actor.default-dispatcher-2] [akka://ActorInitializationException/user/initializationExceptionActor] Some random exception akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) ... ... Caused by: java.lang.Exception: Some random exception at me.rerun.akkanotes.supervision.ActorInitializationExceptionActor.preStart(ActorInitializationExceptionApp.scala:17) ... ... [INFO] [11/10/2014 16:08:46.581] [ActorInitializationException-akka.actor.default-dispatcher-4] [akka://ActorInitializationException/user/initializationExceptionActor] Message [java.lang.String] from Actor[akka://ActorInitializationException/deadLetters] to Actor[akka://ActorInitializationException/user/initializationExceptionActor#-1290470495] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2. ACTORKILLEDEXCEPTION => STOP
When the Actor was killed using the Kill
message, then it would throw an ActorKilledException
. The default strategy would stop the child Actor if it throws the exception. At first, it seems that there’s no point in stopping an already killed Actor. However, consider this :
-
ActorKilledException
would just be propagated to the supervisor. What about the lifecycle watchers or deathwatchers of this Actor that we saw during DeathWatch. The watchers won’t know anything until the Actor isStopped
. -
Sending a
Kill
on an Actor would just affect that particular actor which the supervisor knows. However, handling that withStop
would suspend the mailbox of that Actor, suspends the mailboxes of child actors, stops the child actors, sends aTerminated
to all the child actor watchers, send aTerminated
to all the immediate failed Actor’s watchers and finally stop the Actor itself. (Wow, that’s pretty awesome !!)
package me.rerun.akkanotes.supervision import akka.actor.{ActorSystem, Props} import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.Kill object ActorKilledExceptionApp extends App{ val actorSystem=ActorSystem("ActorKilledExceptionSystem") val actor=actorSystem.actorOf(Props[ActorKilledExceptionActor]) actor!"something" actor!Kill actor!"something else that falls into dead letter queue" } class ActorKilledExceptionActor extends Actor with ActorLogging{ def receive={ case message:String=> log.info (message) } }
Log
The logs just say that once the ActorKilledException
comes in, the supervisor stops that actor and then the messages go into the queue of deadLetters
INFO m.r.a.s.ActorKilledExceptionActor - something ERROR akka.actor.OneForOneStrategy - Kill akka.actor.ActorKilledException: Kill INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ActorKilledExceptionSystem/deadLetters] to Actor[akka://ActorKilledExceptionSystem/user/$a#-1569063462] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
3. DEATHPACTEXCEPTION => STOP
From DeathWatch, you know that when an Actor watches over a child Actor, it is expected to handle the Terminated
message in its receive
. What if it doesn’t? You get the DeathPactException
The code shows that the Supervisor watches
the child actor after creation but doesn’t handle the Terminated
message from the child.
package me.rerun.akkanotes.supervision import akka.actor.{ActorSystem, Props} import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.Kill import akka.actor.PoisonPill import akka.actor.Terminated object DeathPactExceptionApp extends App{ val actorSystem=ActorSystem("DeathPactExceptionSystem") val actor=actorSystem.actorOf(Props[DeathPactExceptionParentActor]) actor!"create_child" //Throws DeathPactException Thread.sleep(2000) //Wait until Stopped actor!"someMessage" //Message goes to DeadLetters } class DeathPactExceptionParentActor extends Actor with ActorLogging{ def receive={ case "create_child"=> { log.info ("creating child") val child=context.actorOf(Props[DeathPactExceptionChildActor]) context.watch(child) //Watches but doesnt handle terminated message. Throwing DeathPactException here. child!"stop" } case "someMessage" => log.info ("some message") //Doesnt handle terminated message //case Terminated(_) => } } class DeathPactExceptionChildActor extends Actor with ActorLogging{ def receive={ case "stop"=> { log.info ("Actor going to stop and announce that it's terminated") self!PoisonPill } } }
Log
The logs tell us that the DeathPactException
comes in, the supervisor stops that actor and then the messages go into the queue ofdeadLetters
INFO m.r.a.s.DeathPactExceptionParentActor - creating child INFO m.r.a.s.DeathPactExceptionChildActor - Actor going to stop and announce that it's terminated ERROR akka.actor.OneForOneStrategy - Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated akka.actor.DeathPactException: Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://DeathPactExceptionSystem/deadLetters] to Actor[akka://DeathPactExceptionSystem/user/$a#-1452955980] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
4. EXCEPTION => RESTART
For all other Exceptions, the default Directive
is to Restart
the Actor. Check the following app. Just to prove that the Actor is restarted, OtherExceptionParentActor
makes the child throw an exception and immediately sends a message. The message reaches the mailbox and when the the child actor restarts, the message gets processed. Nice !!
package me.rerun.akkanotes.supervision import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorSystem import akka.actor.OneForOneStrategy import akka.actor.Props import akka.actor.SupervisorStrategy.Stop object OtherExceptionApp extends App{ val actorSystem=ActorSystem("OtherExceptionSystem") val actor=actorSystem.actorOf(Props[OtherExceptionParentActor]) actor!"create_child" } class OtherExceptionParentActor extends Actor with ActorLogging{ def receive={ case "create_child"=> { log.info ("creating child") val child=context.actorOf(Props[OtherExceptionChildActor]) child!"throwSomeException" child!"someMessage" } } } class OtherExceptionChildActor extends akka.actor.Actor with ActorLogging{ override def preStart={ log.info ("Starting Child Actor") } def receive={ case "throwSomeException"=> { throw new Exception ("I'm getting thrown for no reason") } case "someMessage" => log.info ("Restarted and printing some Message") } override def postStop={ log.info ("Stopping Child Actor") } }
Log
The logs of this program is pretty neat.
- The exception gets thrown. We see the trace
- The child restarts — Stop and Start gets called (we’ll see about the
preRestart
andpostRestart
callbacks soon) - The message that was send to the Child actor before restart is processed.
INFO m.r.a.s.OtherExceptionParentActor - creating child INFO m.r.a.s.OtherExceptionChildActor - Starting Child Actor ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason java.lang.Exception: I'm getting thrown for no reason at me.rerun.akkanotes.supervision.OtherExceptionChildActor$anonfun$receive$2.applyOrElse(OtherExceptionApp.scala:39) ~[classes/:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.4.jar:na] ... ... INFO m.r.a.s.OtherExceptionChildActor - Stopping Child Actor INFO m.r.a.s.OtherExceptionChildActor - Starting Child Actor INFO m.r.a.s.OtherExceptionChildActor - Restarted and printing some Message
ESCALATE AND RESUME
We saw examples of Stop
and Restart
via the defaultStrategy
. Now, let’s have a quick look at the Escalate
.
Resume
just ignores the exception and proceeds processing the next message in the mailbox. It’s more like catching the exception and doing nothing about it. Awesome stuff but not a lot to talk about there.
Escalating generally means that the exception is something critical and the immediate supervisor would not be able to handle it. So, it asks help from it’s supervisor. Let’s take an example.
Consider three Actors — EscalateExceptionTopLevelActor
, EscalateExceptionParentActor
and EscalateExceptionChildActor
. If the child actor throws an exception and if the parent level actor could not handle it, it could Escalate
it to the Top level Actor. The Top level actor could choose to react with any of the Directives. In our example, we just Stop
. Stop
would stop the immediate child (which is the EscalateExceptionParentActor
). As you know, when a Stop
is executed on an Actor, all its children would also be stopped before the Actor itself is stopped.
package me.rerun.akkanotes.supervision import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorSystem import akka.actor.OneForOneStrategy import akka.actor.Props import akka.actor.SupervisorStrategy.Escalate import akka.actor.SupervisorStrategy.Stop import akka.actor.actorRef2Scala object EscalateExceptionApp extends App { val actorSystem = ActorSystem("EscalateExceptionSystem") val actor = actorSystem.actorOf(Props[EscalateExceptionTopLevelActor], "topLevelActor") actor ! "create_parent" } class EscalateExceptionTopLevelActor extends Actor with ActorLogging { override val supervisorStrategy = OneForOneStrategy() { case _: Exception => { log.info("The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.") Stop //Stop will stop the Actor that threw this Exception and all its children } } def receive = { case "create_parent" => { log.info("creating parent") val parent = context.actorOf(Props[EscalateExceptionParentActor], "parentActor") parent ! "create_child" //Sending message to next level } } } class EscalateExceptionParentActor extends Actor with ActorLogging { override def preStart={ log.info ("Parent Actor started") } override val supervisorStrategy = OneForOneStrategy() { case _: Exception => { log.info("The exception is ducked by the Parent Actor. Escalating to TopLevel Actor") Escalate } } def receive = { case "create_child" => { log.info("creating child") val child = context.actorOf(Props[EscalateExceptionChildActor], "childActor") child ! "throwSomeException" } } override def postStop = { log.info("Stopping parent Actor") } } class EscalateExceptionChildActor extends akka.actor.Actor with ActorLogging { override def preStart={ log.info ("Child Actor started") } def receive = { case "throwSomeException" => { throw new Exception("I'm getting thrown for no reason.") } } override def postStop = { log.info("Stopping child Actor") } }
Log
As you could see from the logs,
- The child actor throws exception.
- The immediate supervisor (
EscalateExceptionParentActor
) escalates that exception to its supervisor (EscalateExceptionTopLevelActor
) - The resultant directive from
EscalateExceptionTopLevelActor
is to Stop the Actor. As a sequence, the child actors gets stopped first. - The parent actor gets stopped next (only after the watchers have been notified)
INFO m.r.a.s.EscalateExceptionTopLevelActor - creating parent INFO m.r.a.s.EscalateExceptionParentActor - Parent Actor started INFO m.r.a.s.EscalateExceptionParentActor - creating child INFO m.r.a.s.EscalateExceptionChildActor - Child Actor started INFO m.r.a.s.EscalateExceptionParentActor - The exception is ducked by the Parent Actor. Escalating to TopLevel Actor INFO m.r.a.s.EscalateExceptionTopLevelActor - The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children. ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason. java.lang.Exception: I'm getting thrown for no reason. at me.rerun.akkanotes.supervision.EscalateExceptionChildActor$$anonfun$receive$3.applyOrElse(EscalateExceptionApp.scala:71) ~[classes/:na] ... ... INFO m.r.a.s.EscalateExceptionChildActor - Stopping child Actor INFO m.r.a.s.EscalateExceptionParentActor - Stopping parent Actor
Please note that whatever directive that was issued would only apply to the immediate child that escalated. Say, if a Restart
is issued at the TopLevel, only the Parent would be restarted and anything in its constructor/preStart
would be executed. If the children of the Parent actor was created in the constructor, they would also be created. However, children that were created through messages to the Parent Actor would still be in the Terminated
state.
TRIVIA
Actually, you could control whether the preStart
gets called at all. We’ll see about this in the next minor write-up. If you are curious, just have a look at the postRestart
method of the Actor
def postRestart(reason: Throwable): Unit = { preStart() }
CODE
As always, code is on github
(my .gitignore
wasn’t setup right for this project. will fix it today. sorry)