Статьи

Akka Typed Actors: изучение модели получателя

В предыдущей статье мы рассмотрели некоторые основные функции, предоставляемые Akka Typed. В этой и следующей статьях мы немного подробнее рассмотрим некоторые функции и сделаем это, взглянув на два разных паттерна, предоставляемых Akka Typed: Receiver и Receptionist. Если вы новичок в Akka Typed, было бы неплохо сначала прочитать предыдущую статью, поскольку это даст вам некоторое представление о Akka Typed. Таким образом, для этой статьи в нашей серии, написанной на акке, мы рассмотрим шаблон Receiver.

Шаблон получателя

В дистрибутиве Akka Typed есть пакетный вызов akka.typed.patterns. В этом пакете есть два различных шаблона: шаблон Receiver и шаблон Receptionist. Почему эти два шаблона были настолько важны, чтобы добавить их к дистрибутиву, я не знаю, если честно, но они предоставляют хороший способ представить некоторые дополнительные концепции и идеи, лежащие в основе Akka Typed.

Итак, давайте рассмотрим шаблон Receiver, и мы сделаем шаблон Receptionist в следующей статье. Чтобы понять, что делает шаблон Receiver, давайте просто посмотрим на сообщения, которые мы можем отправить ему:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
/**
   * Retrieve one message from the Receiver, waiting at most for the given duration.
   */
  final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]
  /**
   * Retrieve all messages from the Receiver that it has queued after the given
   * duration has elapsed.
   */
  final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]
  /**
   * Retrieve the external address of this Receiver (i.e. the side at which it
   * takes in the messages of type T.
   */
  final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]

Как вы можете видеть из этих сообщений, Receiver делает то, что он ставит в очередь сообщения типа T и предоставляет дополнительные команды для получения одного или нескольких из этих сообщений в ожидании определенного времени. Чтобы использовать получатель, нам нужно получить ExternalAddress, чтобы мы могли отправлять ему сообщения типа T. А от другого участника мы можем отправлять сообщения getOne и GetAll, чтобы узнать, ожидают ли какие-либо сообщения в получателе.

Для нашего примера мы собираемся создать следующих актеров:

  • Производитель, который отправляет сообщения типа T получателю.
  • Потребитель, который может получать сообщения типа T от этого получателя.
  • Корневой субъект, который запускает этот сценарий.

Начнем с продюсера, который выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
  * Producer object containing the protocol and the behavior. This is a very simple
  * actor that produces messages using a schedule. To start producing messages
  * we need to send an initial message
  */
 object Producer {
 
   // a simple protocol defining the messages that can be sent
   sealed trait ProducerMsg
   final case class registerReceiverMsgIn(msgIn: ActorRef[HelloMsg]) extends ProducerMsg
   final case class addHelloWorldMsg(msg: HelloMsg) extends ProducerMsg
 
   // the producer, which first waits for a registerReceiver message, after which
   // it changes behavior, to send messages.
   val producer = Full[ProducerMsg] {
 
     // if we receive a register message, we know where to send messages to
     case Msg(ctx, registerReceiverMsgIn(msgConsumer)) =>
 
       println("Producer: Switching behavior")
 
       // simple helper function which sends a message to self.
       def scheduleMessage() = ctx.schedule(500 millisecond, ctx.self, addHelloWorldMsg(Hello(s"hello @ ${System.currentTimeMillis()}")))
       // schedule the first one, the rest will be triggered through the behavior.
       scheduleMessage()
 
       Static {
         // add a message to the receiver and schedule a new one
         case addHelloWorldMsg(msg) => {println(s"Producer: Adding new '$msg' to receiver: $msgConsumer") ;msgConsumer ! msg; scheduleMessage()}
       }
 
     // don't switch behavior on any of the other messages
     case _ => Same
   }
 }

В этом объекте мы определяем сообщения, которые могут быть отправлены субъекту, и поведение. Сообщение registerReceiverMsgIn предоставляет субъекту адресата, которому он должен отправлять сообщения (подробнее об этом позже), а addHelloWorldMsg сообщает поведению, какое сообщение отправлять на адрес, указанный в сообщении registerReceiverMsgIn. Если вы посмотрите на это поведение, то увидите, что мы используем поведение Full [T]. Для этого поведения мы должны обеспечить совпадения для всех сообщений и сигналов, и в качестве дополнительного бонуса мы также получаем доступ к актору ctx. В своем исходном состоянии это поведение реагирует только на сообщения registerReceiverMsgIn. Когда он получает такое сообщение, он делает две вещи:

  1. Он определяет функцию, которую мы можем использовать для планирования сообщения, которое мы также вызываем напрямую, для планирования отправки сообщения за полсекунды.
  2. Это определяет наше новое поведение. Это новое поведение может обрабатывать сообщения, отправленные функцией scheduleMessage. Получив это сообщение, он отправляет содержимое предоставленному messageConsumer (Получателю) и снова вызывает сообщение о расписании. Продолжать отправлять сообщения каждые 500 мс.

Поэтому, когда мы отправляем начальное значение registerReceiverMessage, это приводит к появлению субъекта, который отправляет новое сообщение получателю каждые 500 мс. Теперь давайте посмотрим на другую сторону: потребитель.

Для потребителя мы также завернули все в объект, который выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
object Consumer {
   val consumer = Total[HelloMsg] {
     // in the case of a registerReceiver message, we change the implementation
     // since we're ready to receive other message.
     case registerReceiverCmdIn(commandAddress) => {
       println("Consumer: Switching behavior")
 
       // return a static implementation which closes over actorRefs
       // all messages we receive we pass to the receiver, which will queue
       // them. We have a specific message that prints out the received messages
       ContextAware { ctx =>
         Static[HelloMsg] {
 
           // printmessages just prints out the list of messages we've received
           case PrintMessages(msgs) => println(s"Consumer: Printing messages: $msgs") ;msgs.foreach { hw => println(s"  $hw")}
 
           // if we get the getAllMessages request, we get all the messages from
           // the receiver.
           case GetAllMessages() => {
             println("Consumer: requesting all messages")
             val wrap = ctx.spawnAdapter[GetAllResult[HelloMsg]] {
               case msgs:GetAllResult[HelloMsg] => println(s"Consumer: Received ${msgs.msgs.length} messages"); PrintMessages(msgs.msgs)
             }
             commandAddress ! GetAll(2 seconds)(wrap)
           }
         }
       }
     }
 
     // for all the other cases return the existing implementation, in essence
     // we're just ignoring other messages till we change state
     case _ => Same
   }   
 }

В этом объекте мы определяем одно поведение, которое также переключает его реализацию после получения первого сообщения. Первое сообщение в этом случае называется registerReceiverCmdIn. С помощью этого сообщения мы получаем доступ к actorRef (из Receiver), в который нам нужно отправлять сообщения GetAll и getOne. После того, как мы изменили поведение, мы обрабатываем наше собственное пользовательское сообщение GetAllMessages, которое инициирует отправку сообщения GetAll получателю. Поскольку наше собственное поведение не соответствует типу ответов, полученных от Receiver, мы используем адаптер (ctx.spawnAdapter). Этот адаптер получит ответ от приемника и распечатает сообщения.

Последняя часть сообщения — это актер, который инициирует это поведение:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Simple root actor, which we'll use to start the other actors
val scenario1 = {
  Full[Unit] {
    case Sig(ctx, PreStart) => {
 
      import Producer._
      import Consumer._
 
      println("Scenario1: Started, now lets start up a number of child actors to do our stuff")
 
      // first start the two actors, one implements the receiver pattern, and
      // the other is the one we control directly.
      val receiverActor = ctx.spawn(Props(Receiver.behavior[HelloMsg]), "receiver")
      val consumerActor = ctx.spawn(Props(consumer), "adder")
      val producerActor = ctx.spawn(Props(producer), "producer")
 
      // our producerActor first needs the actorRef it can use to add messages to the receiver
      // for this we use a wrapper, this wrapper creates a child, which we use to get the
      // address, to which we can send messages.
      val wrapper = ctx.spawnAdapter[ActorRef[HelloMsg]] {
        case p: ActorRef[HelloMsg] => producerActor ! registerReceiverMsgIn(p)
      }
 
      // now send the message to get the external address, the response will be sent
      // to our own actor as a registerReceiver message, through the adapter
      receiverActor ! ExternalAddress(wrapper)
 
      // our printing actor needs to now the address of the receiver so send it to him
      consumerActor ! registerReceiverCmdIn(receiverActor)
 
      // by calling getAllMessages we get the messages within a time period.
      println("Scenario1: Get all the messages")
      consumerActor ! GetAllMessages()
      Thread.sleep(3000)
      consumerActor ! GetAllMessages()
      Thread.sleep(5000)
      consumerActor ! GetAllMessages()
 
      Same
    }
  }
}

Ничего особенного здесь. Мы создаем различные акторы в этом сценарии и используем ctx.spawnAdapter, чтобы получить внешний адрес получателя, который мы передаем продюссеру производителей. Далее мы передаем адрес получателя актера потребителю. Теперь мы вызываем GetAllMessages по адресу получателя, который получает сообщения от получателя и распечатывает их.

Итак, суммируем шаги, которые будут выполнены в этом примере:

  1. Мы создаем корневого субъекта, который будет запускать этот сценарий.
  2. Из этого корневого актера мы создаем трех акторов: получателя, потребителя и производителя.
  3. Затем мы получаем внешний адрес от получателя (адрес, на который мы отправляли сообщения типа T) и, используя адаптер, передаем его производителю.
  4. Производитель, получив это сообщение, переключает поведение и начинает отправлять сообщения на переданный адрес.
  5. Тем временем корневой субъект передает адрес получателя потребителю.
  6. Потребитель, когда он получает эти сообщения, меняет поведение и теперь ждет сообщений типа GetAllMessages.
  7. Корневой субъект теперь будет отправлять GetAllMessages потребителю.
  8. Когда потребитель получает эти сообщения, он использует адаптер для отправки сообщения GetAll получателю. Когда адаптер получает ответ, он распечатывает количество полученных сообщений и выполняет дальнейшую обработку для потребителя, отправляя PrintMessage для каждого полученного сообщения от получателя.

И результат этого сценария выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
Scenario1: Started, now lets start up a number of child actors to do our stuff
Scenario1: Get all the messages
Consumer: Switching behavior
Consumer: requesting all messages
Producer: Switching behavior
Producer: Adding new 'Hello(hello @ 1446277162929)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163454)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277163969)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 3 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277162929), Hello(hello @ 1446277163454), Hello(hello @ 1446277163969))
  Hello(hello @ 1446277162929)
  Hello(hello @ 1446277163454)
  Hello(hello @ 1446277163969)
Producer: Adding new 'Hello(hello @ 1446277164488)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277165008)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277165529)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166049)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277166569)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277167089)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 6 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277164488), Hello(hello @ 1446277165008), Hello(hello @ 1446277165529), Hello(hello @ 1446277166049), Hello(hello @ 1446277166569), Hello(hello @ 1446277167089))
  Hello(hello @ 1446277164488)
  Hello(hello @ 1446277165008)
  Hello(hello @ 1446277165529)
  Hello(hello @ 1446277166049)
  Hello(hello @ 1446277166569)
  Hello(hello @ 1446277167089)
Producer: Adding new 'Hello(hello @ 1446277167607)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168129)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277168650)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169169)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277169690)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277170210)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: requesting all messages
Producer: Adding new 'Hello(hello @ 1446277170729)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171249)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277171769)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277172289)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Consumer: Received 10 messages
Consumer: Printing messages: Vector(Hello(hello @ 1446277167607), Hello(hello @ 1446277168129), Hello(hello @ 1446277168650), Hello(hello @ 1446277169169), Hello(hello @ 1446277169690), Hello(hello @ 1446277170210), Hello(hello @ 1446277170729), Hello(hello @ 1446277171249), Hello(hello @ 1446277171769), Hello(hello @ 1446277172289))
  Hello(hello @ 1446277167607)
  Hello(hello @ 1446277168129)
  Hello(hello @ 1446277168650)
  Hello(hello @ 1446277169169)
  Hello(hello @ 1446277169690)
  Hello(hello @ 1446277170210)
  Hello(hello @ 1446277170729)
  Hello(hello @ 1446277171249)
  Hello(hello @ 1446277171769)
  Hello(hello @ 1446277172289)
Producer: Adding new 'Hello(hello @ 1446277172808)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173328)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277173849)' to receiver: Actor[akka://Root/user/receiver#1097367365]
Producer: Adding new 'Hello(hello @ 1446277174369)' to receiver: Actor[akka://Root/user/receiver#1097367365]

Круто верно! Как видно из последовательности сообщений, наш производитель отправляет сообщения получателю, который ставит их в очередь. Затем у нас есть потребитель, который запрашивает все полученные сообщения и распечатывает их.

Вот и все для этой статьи об Akka-Typed, в следующей мы рассмотрим шаблон Receptionist, также присутствующий в Akka-Typed.

Ссылка: Akka Typed Actors: изучение шаблона получателя от нашего партнера JCG Йоса Дирксена в блоге Smart Java .