Когда мы говорили о жизненном цикле Actor , мы видели, что Actors можно было остановить различными способами (используя ActorSystem.stop или ActorContext.stop или отправив PoisonPill — также есть Kill и gracefulStop ).
Какой бы ни была причина, по которой актер умирает, бывают случаи, когда несколько других действующих лиц в системе хотели бы знать об этом. Давайте возьмем тривиальный пример актера, который общается с базой данных — назовем его RepositoryActor . По понятным причинам в системе было бы немного других действующих лиц, которые отправляли бы сообщение этому RepositoryActor . Эти «заинтересованные» актеры хотели бы следить или watch этим актером, если он выйдет из строя. Теперь это в терминах Actor называется DeathWatch . И методы, чтобы watch и unwatch это интуитивно ActorContext.watch и ActorContext.unwatch . Если смотреть, наблюдатели получат сообщение « Terminated от остановленного Актера, которое они могут добавить в свою частичную функцию получения.
В отличие от Supervision (следующая запись будет вставлять ссылку после завершения), где строгое соблюдение иерархии родитель-потомок, любой Actor может watch любым другим Actor в ActorSystem.
Давайте посмотрим на код.
Код
QuoteRepositoryActor
- Наш
QueryRepositoryActorсодержит несколькоquotesв виде списка и служит случайным при полученииQuoteRepositoryRequest. - Он отслеживает количество полученных сообщений и, если он получает более 3 сообщений, он убивает себя с помощью
PoisonPill
Ничего особенного здесь.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package me.rerun.akkanotes.deathwatchimport akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala} import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._ import scala.util.Randomclass QuoteRepositoryActor() extends Actor with ActorLogging { val quotes = List( "Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time", "You never gonna know if you never even try") var repoRequestCount:Int=1 def receive = { case QuoteRepositoryRequest => { if (repoRequestCount>3){ self!PoisonPill } else { //Get a random Quote from the list and construct a response val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size))) log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse") repoRequestCount=repoRequestCount+1 sender ! quoteResponse } } }} |
TeacherActorWatcher
Опять же, ничего особенного с TeacherActorWatcher за исключением того, что он создает QuoteRepositoryActor и следит за ним с помощью context.watch .
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
package me.rerun.akkanotes.deathwatchimport akka.actor.{Terminated, Props, Actor, ActorLogging} import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequestclass TeacherActorWatcher extends Actor with ActorLogging { val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor") context.watch(quoteRepositoryActor) def receive = { case QuoteRequest => { quoteRepositoryActor ! QuoteRepositoryRequest } case Terminated(terminatedActorRef)=>{ log.error(s"Child Actor {$terminatedActorRef} Terminated") } }} |
TestCases
Это интересный момент. Честно говоря, я никогда не думал, что это можно проверить. Акка-тесткит FTW. Мы проанализируем три тестовых случая здесь:
1. Подтвердите получение Terminated сообщения при просмотре
QuoteRepositoryActor должен отправить QuoteRepositoryActor Terminated сообщение после получения 4-го сообщения. Первые три сообщения должны быть в порядке.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
"A QuoteRepositoryActor" must { ... ... ... "send back a termination message to the watcher on 4th message" in { val quoteRepository=TestActorRef[QuoteRepositoryActor] val testProbe=TestProbe() testProbe.watch(quoteRepository) //Let's watch the Actor within (1000 millis) { var receivedQuotes = List[String]() (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest) receiveWhile() { case QuoteRepositoryResponse(quoteString) => { receivedQuotes = receivedQuotes :+ quoteString } } receivedQuotes.size must be (3) println(s"receiveCount ${receivedQuotes.size}") //4th message quoteRepository!QuoteRepositoryRequest testProbe.expectTerminated(quoteRepository) //Expect a Terminated Message } } |
2. Подтвердить непринятие Terminated сообщения, если оно не было просмотрено / не просмотрено.
На самом деле, мы context.unwatch просто чтобы продемонстрировать context.unwatch . testProbe.watch testProbe.unwatch будет работать нормально, если мы удалим testProbe.watch и testProbe.unwatch .
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
"not send back a termination message on 4th message if not watched" in { val quoteRepository=TestActorRef[QuoteRepositoryActor] val testProbe=TestProbe() testProbe.watch(quoteRepository) //watching within (1000 millis) { var receivedQuotes = List[String]() (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest) receiveWhile() { case QuoteRepositoryResponse(quoteString) => { receivedQuotes = receivedQuotes :+ quoteString } } testProbe.unwatch(quoteRepository) //not watching anymore receivedQuotes.size must be (3) println(s"receiveCount ${receivedQuotes.size}") //4th message quoteRepository!QuoteRepositoryRequest testProbe.expectNoMsg() //Not Watching. No Terminated Message } } |
3. Подтвердите получение Terminated сообщения в TeacherActorWatcher
Мы подписываемся на EventStream и проверяем наличие конкретного сообщения журнала для подтверждения завершения.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
"end back a termination message to the watcher on 4th message to the TeacherActor" in { //This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase val teacherActor=TestActorRef[TeacherActorWatcher] within (1000 millis) { (1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{ teacherActor!QuoteRequest //Send the dangerous 4th message } } } |
Не удивительно, что свойство pattern в EventFilter ожидает шаблон регулярного выражения. Ожидается, что pattern="""Child Actor .* Terminated""" будет соответствовать сообщению журнала в формате Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated
Github
Как всегда, код доступен на github . Следите за deathwatch .
| Ссылка: | Akka Notes — DeathWatch — 7 от нашего партнера по JCG Аруна Маниваннана в блоге Rerun.me . |
