Статьи

Akka Typed Actors: Изучение модели приема

В этой статье мы рассмотрим еще один паттерн Akka-Typed. На этот раз мы покажем вам, как вы можете использовать шаблоны приема. Это третья и последняя статья в серии о Akka-Typed. Две другие статьи также можно найти на этом сайте. Если вы еще ничего не знаете об Akka-Typed, лучше сначала прочитать «Первые шаги со статьей Akka Typed»:

Идея шаблона Receptionist очень проста и объясняется ScalaDoc:

1
2
3
4
5
6
7
8
/**
 * A Receptionist is an entry point into an Actor hierarchy where select Actors
 * publish their identity together with the protocols that they implement. Other
 * Actors need only know the Receptionist’s identity in order to be able to use
 * the services of the registered Actors.
 */
object Receptionist {
...

Таким образом, в основном клиент должен знать, как получить доступ к регистратору, и оттуда он может получить доступ к любому другому действующему лицу, зарегистрированному на ресепшн. Итак, давайте сначала создадим пару актеров, которые мы зарегистрируем у портье. Для этого мы определим двух очень простых актеров:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
  * Simple service and protocol. Does nothing special, just print out
  * the received message.
  */
 object FirstService {
 
   sealed trait FirstServiceMsg
   final case class FirstServiceMsg1(msg: String) extends FirstServiceMsg
 
   val behavior = Static[FirstServiceMsg] {
     case msg:FirstServiceMsg => println("First Service Receiver: " + msg)
   }
 }
 
 /**
  * Another simple service and protocol. Does nothing special, just print out
  * the received message.
  */
 object SecondService {
 
   sealed trait SecondServiceMsg
   final case class SecondServiceMsg1(msg: String) extends SecondServiceMsg
 
   val behavior = Static[SecondServiceMsg] {
     case msg:SecondServiceMsg => println("Second Service Receiver: " + msg)
   }
 }

Ничего особенного, только две службы, которые распечатывают каждое полученное сообщение. Помимо этих двух сервисов мы также создаем сервис, который будет действовать как клиент для этих двух сервисов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
object SenderService {
 
  sealed trait SenderMsg
  final case class registerAddresses(firstServices: Set[ActorRef[FirstServiceMsg]], secondServices: Set[ActorRef[SecondServiceMsg]]) extends SenderMsg
  final case class sendMessage(msg: String) extends SenderMsg
 
  val behavior = Total[SenderMsg] {
    case registerAddresses(firstRefs, secondRefs) => {
      Static {
        case sendMessage(msg) => {
          firstRefs.foreach(_ ! FirstServiceMsg1(msg))
          secondRefs.foreach(_ ! SecondServiceMsg1(msg))
        }
      }
    }
    case _ => Same
  }
}

Как видите, также довольно простой субъект, который меняет поведение, как только получает сообщение registerAddresses. После того, как он изменил свое поведение, он будет действовать на сообщения sendMessage для вызова зарегистрированных сервисов. Теперь, как мы можем связать все это вместе?

Для этого мы создаем другого актера, который запускает этот сценарий для нас:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
val scenario2 = {
  Full[Unit] {
    case Sig(ctx, PreStart) => {
      val receptionist = ctx.spawn(Props(Receptionist.behavior), "receptionist");
 
      // register three actors that can work with the FirstServiceMsg protocol
      val service1a = ctx.spawn(Props(FirstService.behavior), "service1a")
      val service1b = ctx.spawn(Props(FirstService.behavior), "service1b")
      val service1c = ctx.spawn(Props(FirstService.behavior), "service1c")
 
      // register three actors that can work with the SecondServiceMsg protocol
      val service2a = ctx.spawn(Props(SecondService.behavior), "service2a")
      val service2b = ctx.spawn(Props(SecondService.behavior), "service2b")
      val service2c = ctx.spawn(Props(SecondService.behavior), "service2c")
 
      // and the actor that will eventually send messages
      val sender = ctx.spawn(Props(SenderService.behavior),"sender")
 
      // define the service keys we'll use for registering
      val serviceKey1 = new ServiceKey[FirstServiceMsg] {}
      val serviceKey2 = new ServiceKey[SecondServiceMsg] {}
 
      // register the services with the receptionise
      val responseWrapperFirst = ctx.spawnAdapter[Registered[FirstServiceMsg]] {case _ =>}
      val responseWrapperSecond = ctx.spawnAdapter[Registered[SecondServiceMsg]] {case _ =>}
 
      receptionist ! Register(serviceKey1, service1a)(responseWrapperFirst)
      receptionist ! Register(serviceKey1, service1b)(responseWrapperFirst)
      receptionist ! Register(serviceKey1, service1c)(responseWrapperFirst)
 
      receptionist ! Register(serviceKey2, service2a)(responseWrapperSecond)
      receptionist ! Register(serviceKey2, service2b)(responseWrapperSecond)
      receptionist ! Register(serviceKey2, service2c)(responseWrapperSecond)
 
      // as a client we can now ask the receptionist to give us the actor references for services
      // that implement a specific protocol. We pass the result to the sender service. Ugly way
      // for now, but more to demonstrate how it works.
      val getListingWrapper = ctx.spawnAdapter[Listing[FirstServiceMsg]] {
        case firsts : Listing[FirstServiceMsg] => {
          val secondWrapper = ctx.spawnAdapter[Listing[SecondServiceMsg]] {
            case seconds : Listing[SecondServiceMsg] => {
              sender ! registerAddresses(firsts.addresses, seconds.addresses)
            }
          }
          receptionist ! Find[SecondServiceMsg](serviceKey2)(secondWrapper)
        }
      }
      // get message from the first lookup, and pass it to the adapter, which will look up the
      // second
      receptionist ! Find[FirstServiceMsg](serviceKey1)(getListingWrapper)
 
      // now wait a bit to make sure that through the receptionist we get a list of target actorrefs
      Thread.sleep(200)
 
      // these are sent to all the registered service implementations
      sender ! sendMessage("Hello1")
      sender ! sendMessage("Hello2")
 
      Same
    }
  }
}

Это много кода, но действительно легко увидеть, что происходит. Первое, что мы делаем, это используем ctx.spawn для создания набора дочерних актеров того типа, который мы только что обсуждали. После того, как мы определили актеров, нам нужно зарегистрировать наших актеров FirstService и SecondService у администратора. Для этого нам нужно отправить сообщение, подобное этому:

1
2
3
4
5
6
/**
   * Associate the given  with the given . Multiple
   * registrations can be made for the same key. Unregistration is implied by
   * the end of the referenced Actor’s lifecycle.
   */
  final case class Register[T](key: ServiceKey[T], address: ActorRef[T])(val replyTo: ActorRef[Registered[T]]) extends Command

В нашем примере мы делаем это следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
// define the service keys we'll use for registering
val serviceKey1 = new ServiceKey[FirstServiceMsg] {}
val serviceKey2 = new ServiceKey[SecondServiceMsg] {}
 
// register the services with the receptionise
val responseWrapperFirst = ctx.spawnAdapter[Registered[FirstServiceMsg]] {case _ =>}
val responseWrapperSecond = ctx.spawnAdapter[Registered[SecondServiceMsg]] {case _ =>}
 
receptionist ! Register(serviceKey1, service1a)(responseWrapperFirst)
receptionist ! Register(serviceKey1, service1b)(responseWrapperFirst)
receptionist ! Register(serviceKey1, service1c)(responseWrapperFirst)
 
receptionist ! Register(serviceKey2, service2a)(responseWrapperSecond)
receptionist ! Register(serviceKey2, service2b)(responseWrapperSecond)
receptionist ! Register(serviceKey2, service2c)(responseWrapperSecond)

Как видите, мы создаем адаптер для обработки параметра replyTo сообщения Register (в данном случае мы просто игнорируем ответ). Затем мы используем эти адаптеры для регистрации наших сервисных актеров у администратора. На данный момент мы зарегистрировали наши услуги у администратора и теперь можем использовать сообщение Найти:

1
2
3
4
5
/**
   * Query the Receptionist for a list of all Actors implementing the given
   * protocol.
   */
  final case class Find[T](key: ServiceKey[T])(val replyTo: ActorRef[Listing[T]]) extends Command

.. чтобы получить список зарегистрированных актеров для определенного типа:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
// as a client we can now ask the receptionist to give us the actor references for services
// that implement a specific protocol. We pass the result to the sender service. Ugly way
// for now, but more to demonstrate how it works.
val getListingWrapper = ctx.spawnAdapter[Listing[FirstServiceMsg]] {
  case firsts : Listing[FirstServiceMsg] => {
    val secondWrapper = ctx.spawnAdapter[Listing[SecondServiceMsg]] {
      case seconds : Listing[SecondServiceMsg] => {
        sender ! registerAddresses(firsts.addresses, seconds.addresses)
      }
    }
    receptionist ! Find[SecondServiceMsg](serviceKey2)(secondWrapper)
  }
}
// get message from the first lookup, and pass it to the adapter, which will look up the
// second
receptionist ! Find[FirstServiceMsg](serviceKey1)(getListingWrapper)

Здесь важно отметить строку 99:

1
sender ! registerAddresses(firsts.addresses, seconds.addresses)

Здесь мы отправили информацию о зарегистрированных сервисах нашему актеру, который выступает в роли клиента.

Теперь осталось только отправить отправителю сообщение sendMessage, и оно должно быть отправлено всем зарегистрированным службам:

1
2
3
// these are sent to all the registered service implementations
 sender ! sendMessage("Hello1")
 sender ! sendMessage("Hello2")

Когда мы сейчас запустим это, результат будет выглядеть так:

01
02
03
04
05
06
07
08
09
10
11
12
First Service Receiver: FirstServiceMsg1(Hello1)
First Service Receiver: FirstServiceMsg1(Hello1)
First Service Receiver: FirstServiceMsg1(Hello1)
Second Service Receiver: SecondServiceMsg1(Hello1)
First Service Receiver: FirstServiceMsg1(Hello2)
First Service Receiver: FirstServiceMsg1(Hello2)
Second Service Receiver: SecondServiceMsg1(Hello1)
First Service Receiver: FirstServiceMsg1(Hello2)
Second Service Receiver: SecondServiceMsg1(Hello1)
Second Service Receiver: SecondServiceMsg1(Hello2)
Second Service Receiver: SecondServiceMsg1(Hello2)
Second Service Receiver: SecondServiceMsg1(Hello2)

Легко, верно!

Ссылка: Akka Typed Actors: Изучите модель приема от нашего партнера JCG Йоса Дирксена в блоге Smart Java .