Статьи

Akka Notes: актерский надзор

Сбои больше похожи на функцию распределенных систем. А с помощью модели Akka, допускающей сбои в отказоустойчивости, вы можете добиться четкого разделения между вашей бизнес-логикой и логикой обработки сбоев (логикой контроля). Все без особых усилий. Это довольно удивительно. Это тема нашего обсуждения сейчас.

КОНТРОЛЬ ЗА АКТЕРОМ

Представьте себе стек вызовов методов, и самый верхний метод в вашем стеке создает исключение. Что можно сделать методами вниз по стеку?

  1. Исключение может быть перехвачено и обработано для восстановления
  2. Исключение может быть поймано, может быть зарегистрировано и храниться в тайне.
  3. Методы в стеке также могут выбрать исключение исключения полностью (или могут быть перехвачены и переброшены)

Представьте, что все методы, пока основной метод не обрабатывает исключение. В этом случае программа завершает работу после написания эссе об исключении для консоли.

Стек исключений

Вы также можете сравнить тот же сценарий с порождением потоков. Если дочерний поток генерирует исключение , и если  run или call метод не обрабатывает его, то исключение , как ожидается , будет обрабатываться родительской нити или основной нити, каков бы ни был случай. Если основной поток не обрабатывает это, то система завершает работу.

Давайте сделаем это еще раз — если дочерний субъект, который был создан с использованием  context.actorOf сбоев с исключением, родительский субъект (он же супервизор) может предпочесть обрабатывать любые сбои дочернего субъекта. Если это так, он может предпочесть обработать его и восстановить ( Restart/ Resume). Иначе, добавьте исключение ( Escalate) к его родителю. В качестве альтернативы, это может быть только  Stop детский актер — это конец истории для этого ребенка. Почему я сказал родитель (он же руководитель)? Просто потому, что подход Акки к надзору — это родительский надзор — это означает, что надзирать за ними могут только создатели актеров.

Это оно !! Мы в значительной степени покрыли все наблюдения  Directives (что можно было сделать с отказами).

СТРАТЕГИИ

Ах, я забыл упомянуть об этом: вы уже знаете, что актер Акка может создавать детей и создавать столько детей, сколько они захотят.

Теперь рассмотрим два сценария:

1. OneForOneStrategy

Ваш актер порождает нескольких дочерних актеров, и каждый из этих дочерних актеров подключается к различным источникам данных. Допустим, вы запускаете приложение, которое переводит английское слово на несколько языков.

OneForOneStrategy

Предположим, что один дочерний актер терпит неудачу, и вы можете пропустить этот результат в окончательном списке, что бы вы хотели сделать? Выключить службу? Нет, вы можете просто перезапустить / остановить только этот дочерний актер. Не так ли? Теперь это называется  OneForOneStrategy в терминах стратегии контроля Akka — если один актер потерпит неудачу, просто справьтесь с одним.

В зависимости от ваших бизнес — исключений, вы хотели бы реагировать по- разному ( StopRestartEscalateResume) для различных исключений. Чтобы настроить свою собственную стратегию, вы просто переопределяете  supervisorStrategy в своем классе Actor.

Пример объявления  OneForOneStrategy будет

import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.OneForOneStrategy  
import akka.actor.SupervisorStrategy.Stop


class TeacherActorOneForOne extends Actor with ActorLogging {

    ...
    ...
  override val supervisorStrategy=OneForOneStrategy() {

    case _: MinorRecoverableException     => Restart
    case _: Exception                   => Stop

  }
  ...
  ...  

2. AllForOneStrategy

Предположим, что вы выполняете  внешнюю сортировку  (еще один пример, чтобы доказать, что мое творчество — отстой !!), и каждый ваш кусок обрабатывается другим актером. Внезапно, один Актер не может бросить исключение. Не имеет смысла продолжать обработку остальных фрагментов, потому что конечный результат будет неверным. Итак, это логично для  Stop ВСЕХ актеров.

AllForOneStrategy

Почему я сказал,  Stop а не  Restart в предыдущей строке? Потому что  Restarting также не будет иметь никакого смысла для этого варианта использования, учитывая, что почтовый ящик для каждого из этих субъектов не будет очищен при перезапуске. Так что, если мы перезапустим, остальные куски все равно будут обработаны. Это не то, что мы хотим. Воссоздание Актеров с блестящими новыми почтовыми ящиками было бы правильным подходом здесь.

Опять же, так же, как  OneForOneStrategy, вы просто переопределите  supervisorStrategy с реализацией AllForOneStrategy

И пример будет

import akka.actor.{Actor, ActorLogging}  
import akka.actor.AllForOneStrategy  
import akka.actor.SupervisorStrategy.Escalate  
import akka.actor.SupervisorStrategy.Stop


class TeacherActorAllForOne extends Actor with ActorLogging {

  ...

  override val supervisorStrategy = AllForOneStrategy() {

    case _: MajorUnRecoverableException => Stop
    case _: Exception => Escalate

  }
  ...
  ...

DIRECTIVES

Конструктор обоих  AllForOneStrategy и the  OneForOneStrategy принимает  PartialFunction[Throwable,Directive] вызов,  Deciderкоторый отображает a  Throwable в a,  Directive как вы можете видеть здесь:

case _: MajorUnRecoverableException => Stop  

Есть просто только четыре вида директив —  StopResumeEscalate и Restart

Стоп

Дочерний субъект останавливается в случае возникновения исключительной ситуации, и любые сообщения остановленному субъекту, очевидно, попадают в очередь deadLetters.

Резюме

Дочерний актер просто игнорирует сообщение, вызвавшее исключение, и продолжает обработку остальных сообщений в очереди.

Рестарт

Дочерний актер останавливается и инициализируется новый актер. Обработка остальных сообщений в почтовом ящике продолжается. Остальной мир не знает, что это произошло, так как тот же ActorRef присоединен к новому Актору.

Эскалация Супервизор уклоняется от ошибки и позволяет своему супервизору обрабатывать исключение.

СТРАТЕГИЯ ПО УМОЛЧАНИЮ

Что если наш Актер не указывает никакой Стратегии, но создал дочерних Актеров. Как они обрабатываются? В Actor признаке объявлена ​​стратегия по умолчанию,  которая (если она сокращена) выглядит следующим образом:

override val supervisorStrategy=OneForOneStrategy() {

    case _: ActorInitializationException=> Stop
    case _: ActorKilledException        => Stop
    case _: DeathPactException             => Stop
    case _: Exception                   => Restart

}

Итак, по сути, стратегия по умолчанию обрабатывает четыре случая:

1. ACTORINITIALIZATIONEXCEPTION => STOP

Когда Актер не может быть инициализирован, он бросает  ActorInitializationException. Актер был бы остановлен тогда. Давайте смоделируем это, бросив исключение в  preStart обратном вызове:

package me.rerun.akkanotes.supervision

import akka.actor.{ActorSystem, Props}  
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest  
import akka.actor.Actor  
import akka.actor.ActorLogging

object ActorInitializationExceptionApp extends App{

  val actorSystem=ActorSystem("ActorInitializationException")
  val actor=actorSystem.actorOf(Props[ActorInitializationExceptionActor], "initializationExceptionActor")
  actor!"someMessageThatWillGoToDeadLetter"
}

class ActorInitializationExceptionActor extends Actor with ActorLogging{  
  override def preStart={
    throw new Exception("Some random exception")
  }
  def receive={
    case _=>
  }
}

Запуск  ActorInitializationExceptionApp сгенерирует  ActorInitializationException (duh !!) и затем переместит все сообщения в очередь сообщений  deadLetters актера:

Журнал

[ERROR] [11/10/2014 16:08:46.569] [ActorInitializationException-akka.actor.default-dispatcher-2] [akka://ActorInitializationException/user/initializationExceptionActor] Some random exception
akka.actor.ActorInitializationException: exception during creation  
    at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
...
...
Caused by: java.lang.Exception: Some random exception  
    at me.rerun.akkanotes.supervision.ActorInitializationExceptionActor.preStart(ActorInitializationExceptionApp.scala:17)
...
...

[INFO] [11/10/2014 16:08:46.581] [ActorInitializationException-akka.actor.default-dispatcher-4] [akka://ActorInitializationException/user/initializationExceptionActor] Message [java.lang.String] from Actor[akka://ActorInitializationException/deadLetters] to Actor[akka://ActorInitializationException/user/initializationExceptionActor#-1290470495] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2. ACTORKILLEDEXCEPTION => STOP

When the Actor was killed using the Kill message, then it would throw an ActorKilledException. The default strategy would stop the child Actor if it throws the exception. At first, it seems that there’s no point in stopping an already killed Actor. However, consider this :

  1. ActorKilledException would just be propagated to the supervisor. What about the lifecycle watchers or deathwatchers of this Actor that we saw during DeathWatch. The watchers won’t know anything until the Actor is Stopped.

  2. Sending a Kill on an Actor would just affect that particular actor which the supervisor knows. However, handling that withStop would suspend the mailbox of that Actor, suspends the mailboxes of child actors, stops the child actors, sends aTerminated to all the child actor watchers, send a Terminated to all the immediate failed Actor’s watchers and finally stop the Actor itself. (Wow, that’s pretty awesome !!)

package me.rerun.akkanotes.supervision

import akka.actor.{ActorSystem, Props}  
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest  
import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.Kill

object ActorKilledExceptionApp extends App{

  val actorSystem=ActorSystem("ActorKilledExceptionSystem")
  val actor=actorSystem.actorOf(Props[ActorKilledExceptionActor])
  actor!"something"
  actor!Kill
  actor!"something else that falls into dead letter queue"
}

class ActorKilledExceptionActor extends Actor with ActorLogging{  
  def receive={
    case message:String=> log.info (message)
  }
}

Log

The logs just say that once the ActorKilledException comes in, the supervisor stops that actor and then the messages go into the queue of deadLetters

INFO  m.r.a.s.ActorKilledExceptionActor - something

ERROR akka.actor.OneForOneStrategy - Kill  
akka.actor.ActorKilledException: Kill

INFO  akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ActorKilledExceptionSystem/deadLetters] to Actor[akka://ActorKilledExceptionSystem/user/$a#-1569063462] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
3. DEATHPACTEXCEPTION => STOP

From DeathWatch, you know that when an Actor watches over a child Actor, it is expected to handle the Terminated message in its receive. What if it doesn’t? You get the DeathPactException

DeathPactException сопоставляется с Stop

The code shows that the Supervisor watches the child actor after creation but doesn’t handle the Terminated message from the child.

package me.rerun.akkanotes.supervision

import akka.actor.{ActorSystem, Props}  
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest  
import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.Kill  
import akka.actor.PoisonPill  
import akka.actor.Terminated

object DeathPactExceptionApp extends App{

  val actorSystem=ActorSystem("DeathPactExceptionSystem")
  val actor=actorSystem.actorOf(Props[DeathPactExceptionParentActor])
  actor!"create_child" //Throws DeathPactException
  Thread.sleep(2000) //Wait until Stopped
  actor!"someMessage" //Message goes to DeadLetters

}

class DeathPactExceptionParentActor extends Actor with ActorLogging{

  def receive={
    case "create_child"=> {
      log.info ("creating child")
      val child=context.actorOf(Props[DeathPactExceptionChildActor])
      context.watch(child) //Watches but doesnt handle terminated message. Throwing DeathPactException here.
      child!"stop"
    }
    case "someMessage" => log.info ("some message")
    //Doesnt handle terminated message
    //case Terminated(_) =>
  }
}

class DeathPactExceptionChildActor extends Actor with ActorLogging{  
  def receive={
    case "stop"=> {
      log.info ("Actor going to stop and announce that it's terminated")
      self!PoisonPill
    }
  }
}

Log

The logs tell us that the DeathPactException comes in, the supervisor stops that actor and then the messages go into the queue ofdeadLetters

INFO  m.r.a.s.DeathPactExceptionParentActor - creating child

INFO  m.r.a.s.DeathPactExceptionChildActor - Actor going to stop and announce that it's terminated

ERROR akka.actor.OneForOneStrategy - Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated  
akka.actor.DeathPactException: Monitored actor [Actor[akka://DeathPactExceptionSystem/user/$a/$a#-695506341]] terminated

INFO  akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://DeathPactExceptionSystem/deadLetters] to Actor[akka://DeathPactExceptionSystem/user/$a#-1452955980] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
4. EXCEPTION => RESTART

For all other Exceptions, the default Directive is to Restart the Actor. Check the following app. Just to prove that the Actor is restarted, OtherExceptionParentActor makes the child throw an exception and immediately sends a message. The message reaches the mailbox and when the the child actor restarts, the message gets processed. Nice !!

Другое исключение карты для остановки

package me.rerun.akkanotes.supervision

import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.ActorSystem  
import akka.actor.OneForOneStrategy  
import akka.actor.Props  
import akka.actor.SupervisorStrategy.Stop

object OtherExceptionApp extends App{

  val actorSystem=ActorSystem("OtherExceptionSystem")
  val actor=actorSystem.actorOf(Props[OtherExceptionParentActor])
  actor!"create_child"

}

class OtherExceptionParentActor extends Actor with ActorLogging{

  def receive={
    case "create_child"=> {
      log.info ("creating child")
      val child=context.actorOf(Props[OtherExceptionChildActor])

      child!"throwSomeException"
      child!"someMessage"
    }
  }
}

class OtherExceptionChildActor extends akka.actor.Actor with ActorLogging{

  override def preStart={
    log.info ("Starting Child Actor")
  }

  def receive={
    case "throwSomeException"=> {
      throw new Exception ("I'm getting thrown for no reason") 
    }
    case "someMessage" => log.info ("Restarted and printing some Message")
  }

  override def postStop={
    log.info ("Stopping Child Actor")
  }

}

Log

The logs of this program is pretty neat.

  1. The exception gets thrown. We see the trace
  2. The child restarts — Stop and Start gets called (we’ll see about the preRestart and postRestart callbacks soon)
  3. The message that was send to the Child actor before restart is processed.
INFO  m.r.a.s.OtherExceptionParentActor - creating child

INFO  m.r.a.s.OtherExceptionChildActor - Starting Child Actor

ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason

java.lang.Exception: I'm getting thrown for no reason  
    at me.rerun.akkanotes.supervision.OtherExceptionChildActor$anonfun$receive$2.applyOrElse(OtherExceptionApp.scala:39) ~[classes/:na]
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.4.jar:na]
...
...    

INFO  m.r.a.s.OtherExceptionChildActor - Stopping Child Actor

INFO  m.r.a.s.OtherExceptionChildActor - Starting Child Actor

INFO  m.r.a.s.OtherExceptionChildActor - Restarted and printing some Message
ESCALATE AND RESUME

We saw examples of Stop and Restart via the defaultStrategy. Now, let’s have a quick look at the Escalate.

Resume just ignores the exception and proceeds processing the next message in the mailbox. It’s more like catching the exception and doing nothing about it. Awesome stuff but not a lot to talk about there.

Escalating generally means that the exception is something critical and the immediate supervisor would not be able to handle it. So, it asks help from it’s supervisor. Let’s take an example.

Consider three Actors — EscalateExceptionTopLevelActorEscalateExceptionParentActor and EscalateExceptionChildActor. If the child actor throws an exception and if the parent level actor could not handle it, it could Escalate it to the Top level Actor. The Top level actor could choose to react with any of the Directives. In our example, we just StopStop would stop the immediate child (which is the EscalateExceptionParentActor). As you know, when a Stop is executed on an Actor, all its children would also be stopped before the Actor itself is stopped.

обострять

package me.rerun.akkanotes.supervision

import akka.actor.Actor  
import akka.actor.ActorLogging  
import akka.actor.ActorSystem  
import akka.actor.OneForOneStrategy  
import akka.actor.Props  
import akka.actor.SupervisorStrategy.Escalate  
import akka.actor.SupervisorStrategy.Stop  
import akka.actor.actorRef2Scala

object EscalateExceptionApp extends App {

  val actorSystem = ActorSystem("EscalateExceptionSystem")
  val actor = actorSystem.actorOf(Props[EscalateExceptionTopLevelActor], "topLevelActor")
  actor ! "create_parent"
}

class EscalateExceptionTopLevelActor extends Actor with ActorLogging {

  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception => {
      log.info("The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.")
      Stop //Stop will stop the Actor that threw this Exception and all its children
    }
  }

  def receive = {
    case "create_parent" => {
      log.info("creating parent")
      val parent = context.actorOf(Props[EscalateExceptionParentActor], "parentActor")
      parent ! "create_child" //Sending message to next level
    }
  }
}

class EscalateExceptionParentActor extends Actor with ActorLogging {

  override def preStart={
    log.info ("Parent Actor started")
  }

  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception => {
      log.info("The exception is ducked by the Parent Actor. Escalating to TopLevel Actor")
      Escalate
    }
  }

  def receive = {
    case "create_child" => {
      log.info("creating child")
      val child = context.actorOf(Props[EscalateExceptionChildActor], "childActor")
      child ! "throwSomeException"
    }
  }

  override def postStop = {
    log.info("Stopping parent Actor")
  }
}

class EscalateExceptionChildActor extends akka.actor.Actor with ActorLogging {

  override def preStart={
    log.info ("Child Actor started")
  }

  def receive = {
    case "throwSomeException" => {
      throw new Exception("I'm getting thrown for no reason.")
    }
  }
  override def postStop = {
    log.info("Stopping child Actor")
  }
}

Log

As you could see from the logs,

  1. The child actor throws exception.
  2. The immediate supervisor (EscalateExceptionParentActor) escalates that exception to its supervisor (EscalateExceptionTopLevelActor)
  3. The resultant directive from EscalateExceptionTopLevelActor is to Stop the Actor. As a sequence, the child actors gets stopped first.
  4. The parent actor gets stopped next (only after the watchers have been notified)
INFO  m.r.a.s.EscalateExceptionTopLevelActor - creating parent

INFO  m.r.a.s.EscalateExceptionParentActor - Parent Actor started

INFO  m.r.a.s.EscalateExceptionParentActor - creating child

INFO  m.r.a.s.EscalateExceptionChildActor - Child Actor started

INFO  m.r.a.s.EscalateExceptionParentActor - The exception is ducked by the Parent Actor. Escalating to TopLevel Actor

INFO  m.r.a.s.EscalateExceptionTopLevelActor - The exception from the Child is now handled by the Top level Actor. Stopping Parent Actor and its children.

ERROR akka.actor.OneForOneStrategy - I'm getting thrown for no reason.  
java.lang.Exception: I'm getting thrown for no reason.  
    at me.rerun.akkanotes.supervision.EscalateExceptionChildActor$$anonfun$receive$3.applyOrElse(EscalateExceptionApp.scala:71) ~[classes/:na]
    ...
    ...

INFO  m.r.a.s.EscalateExceptionChildActor - Stopping child Actor

INFO  m.r.a.s.EscalateExceptionParentActor - Stopping parent Actor

Please note that whatever directive that was issued would only apply to the immediate child that escalated. Say, if a Restart is issued at the TopLevel, only the Parent would be restarted and anything in its constructor/preStart would be executed. If the children of the Parent actor was created in the constructor, they would also be created. However, children that were created through messages to the Parent Actor would still be in the Terminated state.

TRIVIA

Actually, you could control whether the preStart gets called at all. We’ll see about this in the next minor write-up. If you are curious, just have a look at the postRestart method of the Actor

def postRestart(reason: Throwable): Unit = {  
  preStart()
}

CODE

As always, code is on github

(my .gitignore wasn’t setup right for this project. will fix it today. sorry)