Сбои больше похожи на функцию распределенных систем. А с помощью модели Akka, допускающей сбои в отказоустойчивости, вы можете добиться четкого разделения между вашей бизнес-логикой и логикой обработки сбоев (логикой контроля). Все без особых усилий. Это довольно удивительно. Это тема нашего обсуждения сейчас.
КОНТРОЛЬ ЗА АКТЕРОМ
Представьте себе стек вызовов методов, и самый верхний метод в вашем стеке создает исключение. Что можно сделать методами вниз по стеку?
- Исключение может быть перехвачено и обработано для восстановления
- Исключение может быть поймано, может быть зарегистрировано и храниться в тайне.
- Методы в стеке также могут выбрать исключение исключения полностью (или могут быть перехвачены и переброшены)
Представьте, что все методы, пока основной метод не обрабатывает исключение. В этом случае программа завершает работу после написания эссе об исключении для консоли.
Вы также можете сравнить тот же сценарий с порождением потоков. Если дочерний поток генерирует исключение , и если run или call метод не обрабатывает его, то исключение , как ожидается , будет обрабатываться родительской нити или основной нити, каков бы ни был случай. Если основной поток не обрабатывает это, то система завершает работу.
Давайте сделаем это еще раз — если дочерний субъект, который был создан с использованием context.actorOf сбоев с исключением, родительский субъект (он же супервизор) может предпочесть обрабатывать любые сбои дочернего субъекта. Если это так, он может предпочесть обработать его и восстановить ( Restart/ Resume). Иначе, добавьте исключение ( Escalate) к его родителю. В качестве альтернативы, это может быть только Stop детский актер — это конец истории для этого ребенка. Почему я сказал родитель (он же руководитель)? Просто потому, что подход Акки к надзору — это родительский надзор — это означает, что надзирать за ними могут только создатели актеров.
Это оно !! Мы в значительной степени покрыли все наблюдения Directives (что можно было сделать с отказами).
СТРАТЕГИИ
Ах, я забыл упомянуть об этом: вы уже знаете, что актер Акка может создавать детей и создавать столько детей, сколько они захотят.
Теперь рассмотрим два сценария:
1. OneForOneStrategy
Ваш актер порождает нескольких дочерних актеров, и каждый из этих дочерних актеров подключается к различным источникам данных. Допустим, вы запускаете приложение, которое переводит английское слово на несколько языков.
Предположим, что один дочерний актер терпит неудачу, и вы можете пропустить этот результат в окончательном списке, что бы вы хотели сделать? Выключить службу? Нет, вы можете просто перезапустить / остановить только этот дочерний актер. Не так ли? Теперь это называется OneForOneStrategy в терминах стратегии контроля Akka — если один актер потерпит неудачу, просто справьтесь с одним.
В зависимости от ваших бизнес — исключений, вы хотели бы реагировать по- разному ( Stop, Restart, Escalate, Resume) для различных исключений. Чтобы настроить свою собственную стратегию, вы просто переопределяете supervisorStrategy в своем классе Actor.
Пример объявления OneForOneStrategy будет
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop
class TeacherActorOneForOne extends Actor with ActorLogging {
...
...
override val supervisorStrategy=OneForOneStrategy() {
case _: MinorRecoverableException => Restart
case _: Exception => Stop
}
...
...
2. AllForOneStrategy
Предположим, что вы выполняете внешнюю сортировку (еще один пример, чтобы доказать, что мое творчество — отстой !!), и каждый ваш кусок обрабатывается другим актером. Внезапно, один Актер не может бросить исключение. Не имеет смысла продолжать обработку остальных фрагментов, потому что конечный результат будет неверным. Итак, это логично для Stop ВСЕХ актеров.
Почему я сказал, Stop а не Restart в предыдущей строке? Потому что Restarting также не будет иметь никакого смысла для этого варианта использования, учитывая, что почтовый ящик для каждого из этих субъектов не будет очищен при перезапуске. Так что, если мы перезапустим, остальные куски все равно будут обработаны. Это не то, что мы хотим. Воссоздание Актеров с блестящими новыми почтовыми ящиками было бы правильным подходом здесь.
Опять же, так же, как OneForOneStrategy, вы просто переопределите supervisorStrategy с реализацией AllForOneStrategy
И пример будет
import akka.actor.{Actor, ActorLogging}
import akka.actor.AllForOneStrategy
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.SupervisorStrategy.Stop
class TeacherActorAllForOne extends Actor with ActorLogging {
...
override val supervisorStrategy = AllForOneStrategy() {
case _: MajorUnRecoverableException => Stop
case _: Exception => Escalate
}
...
...
DIRECTIVES
Конструктор обоих AllForOneStrategy и the OneForOneStrategy принимает PartialFunction[Throwable,Directive] вызов, Deciderкоторый отображает a Throwable в a, Directive как вы можете видеть здесь:
case _: MajorUnRecoverableException => Stop
Есть просто только четыре вида директив — Stop, Resume, Escalate и Restart
Стоп
Дочерний субъект останавливается в случае возникновения исключительной ситуации, и любые сообщения остановленному субъекту, очевидно, попадают в очередь deadLetters.
Резюме
Дочерний актер просто игнорирует сообщение, вызвавшее исключение, и продолжает обработку остальных сообщений в очереди.
Рестарт
Дочерний актер останавливается и инициализируется новый актер. Обработка остальных сообщений в почтовом ящике продолжается. Остальной мир не знает, что это произошло, так как тот же ActorRef присоединен к новому Актору.
Эскалация Супервизор уклоняется от ошибки и позволяет своему супервизору обрабатывать исключение.
СТРАТЕГИЯ ПО УМОЛЧАНИЮ
Что если наш Актер не указывает никакой Стратегии, но создал дочерних Актеров. Как они обрабатываются? В Actor признаке объявлена стратегия по умолчанию, которая (если она сокращена) выглядит следующим образом:
override val supervisorStrategy=OneForOneStrategy() {
case _: ActorInitializationException=> Stop
case _: ActorKilledException => Stop
case _: DeathPactException => Stop
case _: Exception => Restart
}
Итак, по сути, стратегия по умолчанию обрабатывает четыре случая:
1. ACTORINITIALIZATIONEXCEPTION => STOP
Когда Актер не может быть инициализирован, он бросает ActorInitializationException. Актер был бы остановлен тогда. Давайте смоделируем это, бросив исключение в preStart обратном вызове:
package me.rerun.akkanotes.supervision
import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
object ActorInitializationExceptionApp extends App{
val actorSystem=ActorSystem("ActorInitializationException")
val actor=actorSystem.actorOf(Props[ActorInitializationExceptionActor], "initializationExceptionActor")
actor!"someMessageThatWillGoToDeadLetter"
}
class ActorInitializationExceptionActor extends Actor with ActorLogging{
override def preStart={
throw new Exception("Some random exception")
}
def receive={
case _=>
}
}
Запуск ActorInitializationExceptionApp сгенерирует ActorInitializationException (duh !!) и затем переместит все сообщения в очередь сообщений deadLetters актера:
Журнал
[ERROR] [11/10/2014 16:08:46.569] [ActorInitializationException-akka.actor.default-dispatcher-2] [akka://ActorInitializationException/user/initializationExceptionActor] Some random exception
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
...
...
Caused by: java.lang.Exception: Some random exception
at me.rerun.akkanotes.supervision.ActorInitializationExceptionActor.preStart(ActorInitializationExceptionApp.scala:17)
...
...
[INFO] [11/10/2014 16:08:46.581] [ActorInitializationException-akka.actor.default-dispatcher-4] [akka://ActorInitializationException/user/initializationExceptionActor] Message [java.lang.String] from Actor[akka://ActorInitializationException/deadLetters] to Actor[akka://ActorInitializationException/user/initializationExceptionActor#-1290470495] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2. ACTORKILLEDEXCEPTION => STOP
When the Actor was killed using the Kill message, then it would throw an ActorKilledException. The default strategy would stop the child Actor if it throws the exception. At first, it seems that there’s no point in stopping an already killed Actor. However, consider this :
-
ActorKilledExceptionwould just be propagated to the supervisor. What about the lifecycle watchers or deathwatchers of this Actor that we saw during DeathWatch. The watchers won’t know anything until the Actor isStopped. -
Sending a
Killon an Actor would just affect that particular actor which the supervisor knows. However, handling that withStopwould suspend the mailbox of that Actor, suspends the mailboxes of child actors, stops the child actors, sends aTerminatedto all the child actor watchers, send aTerminatedto all the immediate failed Actor’s watchers and finally stop the Actor itself. (Wow, that’s pretty awesome !!)
package me.rerun.akkanotes.supervision
import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Kill
object ActorKilledExceptionApp extends App{
val actorSystem=ActorSystem("ActorKilledExceptionSystem")
val actor=actorSystem.actorOf(Props[ActorKilledExceptionActor])
actor!"something"
actor!Kill
actor!"something else that falls into dead letter queue"
}
class ActorKilledExceptionActor extends Actor with ActorLogging{
def receive={
case message:String=> log.info (message)
}
}
Log
The logs just say that once the ActorKilledException comes in, the supervisor stops that actor and then the messages go into the queue of deadLetters
INFO m.r.a.s.ActorKilledExceptionActor - something ERROR akka.actor.OneForOneStrategy - Kill akka.actor.ActorKilledException: Kill INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ActorKilledExceptionSystem/deadLetters] to Actor[akka://ActorKilledExceptionSystem/user/$a#-1569063462] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
3. DEATHPACTEXCEPTION => STOP
From DeathWatch, you know that when an Actor watches over a child Actor, it is expected to handle the Terminated message in its receive. What if it doesn’t? You get the DeathPactException
The code shows that the Supervisor watches the child actor after creation but doesn’t handle the Terminated message from the child.
package me.rerun.akkanotes.supervision
import akka.actor.{ActorSystem, Props}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Kill
import akka.actor.PoisonPill
import akka.actor.Terminated
object DeathPactExceptionApp extends App{
val actorSystem=ActorSystem("DeathPactExceptionSystem")
val actor=actorSystem.actorOf(Props[DeathPactExceptionParentActor])
actor!"create_child" //Throws DeathPactException
Thread.sleep(2000) //Wait until Stopped
actor!"someMessage" //Message goes to DeadLetters
}
class DeathPactExceptionParentActor extends Actor with ActorLogging{
def receive={
case "create_child"=> {
log.info ("creating child")
val child=context.actorOf(Props[DeathPactExceptionChildActor])
context.watch(child) //Watches but doesnt handle terminated message. Throwing DeathPactException here.
child!"stop"
}
case "someMessage" => log.info ("some message")
//Doesnt handle terminated message
//case Terminated(_) =>
}
}
class DeathPactExceptionChildActor extends Actor with ActorLogging{
def receive={
case "stop"=> {
log.info ("Actor going to stop and announce that it's terminated")
self!PoisonPill
}
}
}
Log
The logs tell us that the DeathPactException comes in, the supervisor stops that actor and then the messages go into the queue ofdeadLetters
INFO m.r.a.s.DeathPactExceptionParentActor - creating child INFO m.r.a.s.DeathPactExceptionChildActor - Actor going to stop and announce that it's terminated ERROR akka.actor.OneForOneStrategy - Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated akka.actor.DeathPactException: Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://DeathPactExceptionSystem/deadLetters] to Actor[akka://DeathPactExceptionSystem/user/$a#-1452955980] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
4. EXCEPTION => RESTART
For all other Exceptions, the default Directive is to Restart the Actor. Check the following app. Just to prove that the Actor is restarted, OtherExceptionParentActor makes the child throw an exception and immediately sends a message. The message reaches the mailbox and when the the child actor restarts, the message gets processed. Nice !!
package me.rerun.akkanotes.supervision
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Stop
object OtherExceptionApp extends App{
val actorSystem=ActorSystem("OtherExceptionSystem")
val actor=actorSystem.actorOf(Props[OtherExceptionParentActor])
actor!"create_child"
}
class OtherExceptionParentActor extends Actor with ActorLogging{
def receive={
case "create_child"=> {
log.info ("creating child")
val child=context.actorOf(Props[OtherExceptionChildActor])
child!"throwSomeException"
child!"someMessage"
}
}
}
class OtherExceptionChildActor extends akka.actor.Actor with ActorLogging{
override def preStart={
log.info ("Starting Child Actor")
}
def receive={
case "throwSomeException"=> {
throw new Exception ("I'm getting thrown for no reason")
}
case "someMessage" => log.info ("Restarted and printing some Message")
}
override def postStop={
log.info ("Stopping Child Actor")
}
}
Log
The logs of this program is pretty neat.
- The exception gets thrown. We see the trace
- The child restarts — Stop and Start gets called (we’ll see about the
preRestartandpostRestartcallbacks soon) - The message that was send to the Child actor before restart is processed.
INFO m.r.a.s.OtherExceptionParentActor - creating child
INFO m.r.a.s.OtherExceptionChildActor - Starting Child Actor
ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason
java.lang.Exception: I'm getting thrown for no reason
at me.rerun.akkanotes.supervision.OtherExceptionChildActor$anonfun$receive$2.applyOrElse(OtherExceptionApp.scala:39) ~[classes/:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.4.jar:na]
...
...
INFO m.r.a.s.OtherExceptionChildActor - Stopping Child Actor
INFO m.r.a.s.OtherExceptionChildActor - Starting Child Actor
INFO m.r.a.s.OtherExceptionChildActor - Restarted and printing some Message
ESCALATE AND RESUME
We saw examples of Stop and Restart via the defaultStrategy. Now, let’s have a quick look at the Escalate.
Resume just ignores the exception and proceeds processing the next message in the mailbox. It’s more like catching the exception and doing nothing about it. Awesome stuff but not a lot to talk about there.
Escalating generally means that the exception is something critical and the immediate supervisor would not be able to handle it. So, it asks help from it’s supervisor. Let’s take an example.
Consider three Actors — EscalateExceptionTopLevelActor, EscalateExceptionParentActor and EscalateExceptionChildActor. If the child actor throws an exception and if the parent level actor could not handle it, it could Escalate it to the Top level Actor. The Top level actor could choose to react with any of the Directives. In our example, we just Stop. Stop would stop the immediate child (which is the EscalateExceptionParentActor). As you know, when a Stop is executed on an Actor, all its children would also be stopped before the Actor itself is stopped.
package me.rerun.akkanotes.supervision
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy.Escalate
import akka.actor.SupervisorStrategy.Stop
import akka.actor.actorRef2Scala
object EscalateExceptionApp extends App {
val actorSystem = ActorSystem("EscalateExceptionSystem")
val actor = actorSystem.actorOf(Props[EscalateExceptionTopLevelActor], "topLevelActor")
actor ! "create_parent"
}
class EscalateExceptionTopLevelActor extends Actor with ActorLogging {
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception => {
log.info("The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.")
Stop //Stop will stop the Actor that threw this Exception and all its children
}
}
def receive = {
case "create_parent" => {
log.info("creating parent")
val parent = context.actorOf(Props[EscalateExceptionParentActor], "parentActor")
parent ! "create_child" //Sending message to next level
}
}
}
class EscalateExceptionParentActor extends Actor with ActorLogging {
override def preStart={
log.info ("Parent Actor started")
}
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception => {
log.info("The exception is ducked by the Parent Actor. Escalating to TopLevel Actor")
Escalate
}
}
def receive = {
case "create_child" => {
log.info("creating child")
val child = context.actorOf(Props[EscalateExceptionChildActor], "childActor")
child ! "throwSomeException"
}
}
override def postStop = {
log.info("Stopping parent Actor")
}
}
class EscalateExceptionChildActor extends akka.actor.Actor with ActorLogging {
override def preStart={
log.info ("Child Actor started")
}
def receive = {
case "throwSomeException" => {
throw new Exception("I'm getting thrown for no reason.")
}
}
override def postStop = {
log.info("Stopping child Actor")
}
}
Log
As you could see from the logs,
- The child actor throws exception.
- The immediate supervisor (
EscalateExceptionParentActor) escalates that exception to its supervisor (EscalateExceptionTopLevelActor) - The resultant directive from
EscalateExceptionTopLevelActoris to Stop the Actor. As a sequence, the child actors gets stopped first. - The parent actor gets stopped next (only after the watchers have been notified)
INFO m.r.a.s.EscalateExceptionTopLevelActor - creating parent
INFO m.r.a.s.EscalateExceptionParentActor - Parent Actor started
INFO m.r.a.s.EscalateExceptionParentActor - creating child
INFO m.r.a.s.EscalateExceptionChildActor - Child Actor started
INFO m.r.a.s.EscalateExceptionParentActor - The exception is ducked by the Parent Actor. Escalating to TopLevel Actor
INFO m.r.a.s.EscalateExceptionTopLevelActor - The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.
ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason.
java.lang.Exception: I'm getting thrown for no reason.
at me.rerun.akkanotes.supervision.EscalateExceptionChildActor$$anonfun$receive$3.applyOrElse(EscalateExceptionApp.scala:71) ~[classes/:na]
...
...
INFO m.r.a.s.EscalateExceptionChildActor - Stopping child Actor
INFO m.r.a.s.EscalateExceptionParentActor - Stopping parent Actor
Please note that whatever directive that was issued would only apply to the immediate child that escalated. Say, if a Restart is issued at the TopLevel, only the Parent would be restarted and anything in its constructor/preStart would be executed. If the children of the Parent actor was created in the constructor, they would also be created. However, children that were created through messages to the Parent Actor would still be in the Terminated state.
TRIVIA
Actually, you could control whether the preStart gets called at all. We’ll see about this in the next minor write-up. If you are curious, just have a look at the postRestart method of the Actor
def postRestart(reason: Throwable): Unit = {
preStart()
}
CODE
As always, code is on github
(my .gitignore wasn’t setup right for this project. will fix it today. sorry)