Когда мы говорили о жизненном цикле Actor , мы видели, что Actors можно было остановить различными способами (используя ActorSystem.stop или ActorContext.stop или отправив PoisonPill
— также есть Kill
и gracefulStop
).
Какой бы ни была причина, по которой актер умирает, бывают случаи, когда несколько других действующих лиц в системе хотели бы знать об этом. Давайте возьмем тривиальный пример актера, который общается с базой данных — назовем его RepositoryActor
. По понятным причинам в системе было бы немного других действующих лиц, которые отправляли бы сообщение этому RepositoryActor
. Эти «заинтересованные» актеры хотели бы следить или watch
этим актером, если он выйдет из строя. Теперь это в терминах Actor называется DeathWatch
. И методы, чтобы watch
и unwatch
это интуитивно ActorContext.watch
и ActorContext.unwatch
. Если смотреть, наблюдатели получат сообщение « Terminated
от остановленного Актера, которое они могут добавить в свою частичную функцию получения.
В отличие от Supervision (следующая запись будет вставлять ссылку после завершения), где строгое соблюдение иерархии родитель-потомок, любой Actor может watch
любым другим Actor в ActorSystem.
Давайте посмотрим на код.
Код
QuoteRepositoryActor
- Наш
QueryRepositoryActor
содержит несколькоquotes
в виде списка и служит случайным при полученииQuoteRepositoryRequest
. - Он отслеживает количество полученных сообщений и, если он получает более 3 сообщений, он убивает себя с помощью
PoisonPill
Ничего особенного здесь.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package me.rerun.akkanotes.deathwatch import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala} import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._ import scala.util.Random class QuoteRepositoryActor() extends Actor with ActorLogging { val quotes = List( "Moderation is for cowards" , "Anything worth doing is worth overdoing" , "The trouble is you think you have time" , "You never gonna know if you never even try" ) var repoRequestCount:Int= 1 def receive = { case QuoteRepositoryRequest => { if (repoRequestCount> 3 ){ self!PoisonPill } else { //Get a random Quote from the list and construct a response val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size))) log.info(s "QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse" ) repoRequestCount=repoRequestCount+ 1 sender ! quoteResponse } } } } |
TeacherActorWatcher
Опять же, ничего особенного с TeacherActorWatcher
за исключением того, что он создает QuoteRepositoryActor
и следит за ним с помощью context.watch
.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
package me.rerun.akkanotes.deathwatch import akka.actor.{Terminated, Props, Actor, ActorLogging} import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest class TeacherActorWatcher extends Actor with ActorLogging { val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor" ) context.watch(quoteRepositoryActor) def receive = { case QuoteRequest => { quoteRepositoryActor ! QuoteRepositoryRequest } case Terminated(terminatedActorRef)=>{ log.error(s "Child Actor {$terminatedActorRef} Terminated" ) } } } |
TestCases
Это интересный момент. Честно говоря, я никогда не думал, что это можно проверить. Акка-тесткит FTW. Мы проанализируем три тестовых случая здесь:
1. Подтвердите получение Terminated
сообщения при просмотре
QuoteRepositoryActor
должен отправить QuoteRepositoryActor
Terminated
сообщение после получения 4-го сообщения. Первые три сообщения должны быть в порядке.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
"A QuoteRepositoryActor" must { ... ... ... "send back a termination message to the watcher on 4th message" in { val quoteRepository=TestActorRef[QuoteRepositoryActor] val testProbe=TestProbe() testProbe.watch(quoteRepository) //Let's watch the Actor within ( 1000 millis) { var receivedQuotes = List[String]() ( 1 to 3 ).foreach(_ => quoteRepository ! QuoteRepositoryRequest) receiveWhile() { case QuoteRepositoryResponse(quoteString) => { receivedQuotes = receivedQuotes :+ quoteString } } receivedQuotes.size must be ( 3 ) println(s "receiveCount ${receivedQuotes.size}" ) //4th message quoteRepository!QuoteRepositoryRequest testProbe.expectTerminated(quoteRepository) //Expect a Terminated Message } } |
2. Подтвердить непринятие Terminated
сообщения, если оно не было просмотрено / не просмотрено.
На самом деле, мы context.unwatch
просто чтобы продемонстрировать context.unwatch
. testProbe.watch
testProbe.unwatch
будет работать нормально, если мы удалим testProbe.watch
и testProbe.unwatch
.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
"not send back a termination message on 4th message if not watched" in { val quoteRepository=TestActorRef[QuoteRepositoryActor] val testProbe=TestProbe() testProbe.watch(quoteRepository) //watching within ( 1000 millis) { var receivedQuotes = List[String]() ( 1 to 3 ).foreach(_ => quoteRepository ! QuoteRepositoryRequest) receiveWhile() { case QuoteRepositoryResponse(quoteString) => { receivedQuotes = receivedQuotes :+ quoteString } } testProbe.unwatch(quoteRepository) //not watching anymore receivedQuotes.size must be ( 3 ) println(s "receiveCount ${receivedQuotes.size}" ) //4th message quoteRepository!QuoteRepositoryRequest testProbe.expectNoMsg() //Not Watching. No Terminated Message } } |
3. Подтвердите получение Terminated
сообщения в TeacherActorWatcher
Мы подписываемся на EventStream и проверяем наличие конкретного сообщения журнала для подтверждения завершения.
01
02
03
04
05
06
07
08
09
10
11
12
13
|
"end back a termination message to the watcher on 4th message to the TeacherActor" in { //This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase val teacherActor=TestActorRef[TeacherActorWatcher] within ( 1000 millis) { ( 1 to 3 ).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor EventFilter.error (pattern= "" "Child Actor .* Terminated" "" , occurrences = 1 ).intercept{ teacherActor!QuoteRequest //Send the dangerous 4th message } } } |
Не удивительно, что свойство pattern
в EventFilter
ожидает шаблон регулярного выражения. Ожидается, что pattern="""Child Actor .* Terminated"""
будет соответствовать сообщению журнала в формате Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated
Github
Как всегда, код доступен на github . Следите за deathwatch
.
Ссылка: | Akka Notes — DeathWatch — 7 от нашего партнера по JCG Аруна Маниваннана в блоге Rerun.me . |