Статьи

Акка Заметки — DeathWatch — 7

Когда мы говорили о жизненном цикле Actor , мы видели, что Actors можно было остановить различными способами (используя ActorSystem.stop или ActorContext.stop или отправив PoisonPill — также есть Kill и gracefulStop ).

Какой бы ни была причина, по которой актер умирает, бывают случаи, когда несколько других действующих лиц в системе хотели бы знать об этом. Давайте возьмем тривиальный пример актера, который общается с базой данных — назовем его RepositoryActor . По понятным причинам в системе было бы немного других действующих лиц, которые отправляли бы сообщение этому RepositoryActor . Эти «заинтересованные» актеры хотели бы следить или watch этим актером, если он выйдет из строя. Теперь это в терминах Actor называется DeathWatch . И методы, чтобы watch и unwatch это интуитивно ActorContext.watch и ActorContext.unwatch . Если смотреть, наблюдатели получат сообщение « Terminated от остановленного Актера, которое они могут добавить в свою частичную функцию получения.

В отличие от Supervision (следующая запись будет вставлять ссылку после завершения), где строгое соблюдение иерархии родитель-потомок, любой Actor может watch любым другим Actor в ActorSystem.

Deathwatch

Давайте посмотрим на код.

Код

QuoteRepositoryActor

  1. Наш QueryRepositoryActor содержит несколько quotes в виде списка и служит случайным при получении QuoteRepositoryRequest .
  2. Он отслеживает количество полученных сообщений и, если он получает более 3 сообщений, он убивает себя с помощью PoisonPill

Ничего особенного здесь.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package me.rerun.akkanotes.deathwatch
 
import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala} 
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._ 
import scala.util.Random
 
class QuoteRepositoryActor() extends Actor with ActorLogging {
 
  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")
 
  var repoRequestCount:Int=1
 
  def receive = {
 
    case QuoteRepositoryRequest => {
 
      if (repoRequestCount>3){
        self!PoisonPill
      }
      else {
        //Get a random Quote from the list and construct a response
        val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))
 
        log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
        repoRequestCount=repoRequestCount+1
        sender ! quoteResponse
      }
 
    }
 
  }
 
}

TeacherActorWatcher

Опять же, ничего особенного с TeacherActorWatcher за исключением того, что он создает QuoteRepositoryActor и следит за ним с помощью context.watch .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
package me.rerun.akkanotes.deathwatch
 
import akka.actor.{Terminated, Props, Actor, ActorLogging} 
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest 
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest
 
class TeacherActorWatcher extends Actor with ActorLogging {
 
  val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
  context.watch(quoteRepositoryActor)
 
 
  def receive = {
    case QuoteRequest => {
      quoteRepositoryActor ! QuoteRepositoryRequest
    }
    case Terminated(terminatedActorRef)=>{
      log.error(s"Child Actor {$terminatedActorRef} Terminated")
    }
  }
}

TestCases

Это интересный момент. Честно говоря, я никогда не думал, что это можно проверить. Акка-тесткит FTW. Мы проанализируем три тестовых случая здесь:

1. Подтвердите получение Terminated сообщения при просмотре

QuoteRepositoryActor должен отправить QuoteRepositoryActor Terminated сообщение после получения 4-го сообщения. Первые три сообщения должны быть в порядке.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"A QuoteRepositoryActor" must {
    ...
    ...
    ...
 
    "send back a termination message to the watcher on 4th message" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]
 
      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //Let's watch the Actor
 
      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }
 
        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")
 
        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectTerminated(quoteRepository)  //Expect a Terminated Message
      }
    }

2. Подтвердить непринятие Terminated сообщения, если оно не было просмотрено / не просмотрено.

На самом деле, мы context.unwatch просто чтобы продемонстрировать context.unwatch . testProbe.watch testProbe.unwatch будет работать нормально, если мы удалим testProbe.watch и testProbe.unwatch .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
"not send back a termination message on 4th message if not watched" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]
 
      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //watching
 
      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }
 
        testProbe.unwatch(quoteRepository) //not watching anymore
        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")
 
        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectNoMsg() //Not Watching. No Terminated Message
      }
    }

3. Подтвердите получение Terminated сообщения в TeacherActorWatcher

Мы подписываемся на EventStream и проверяем наличие конкретного сообщения журнала для подтверждения завершения.

01
02
03
04
05
06
07
08
09
10
11
12
13
"end back a termination message to the watcher on 4th message to the TeacherActor" in {
 
      //This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
      val teacherActor=TestActorRef[TeacherActorWatcher]
 
      within (1000 millis) {
        (1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor
 
        EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
          teacherActor!QuoteRequest //Send the dangerous 4th message
        }
      }
    }

Не удивительно, что свойство pattern в EventFilter ожидает шаблон регулярного выражения. Ожидается, что pattern="""Child Actor .* Terminated""" будет соответствовать сообщению журнала в формате Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated

Github

Как всегда, код доступен на github . Следите за deathwatch .

Ссылка: Akka Notes — DeathWatch — 7 от нашего партнера по JCG Аруна Маниваннана в блоге Rerun.me .