В предыдущей части мы реализовали нашего первого актера и отправили ему сообщение. К сожалению, актер не смог вернуть какой-либо результат обработки этого сообщения, что сделало его довольно бесполезным. В этом эпизоде мы узнаем, как отправить ответное сообщение отправителю и как интегрировать синхронный, блокирующий API с (по определению) асинхронной системой на основе передачи сообщений.
Прежде чем мы начнем, я должен провести очень сильное различие между актером (расширяющим черту Actor
) и ссылкой на ActorRef
типа ActorRef
. При реализации актера мы расширяем черту актера, которая заставляет нас реализовать метод receive
. Однако мы не создаем экземпляры актеров напрямую, вместо этого мы просим ActorSystem
:
1
|
val randomOrgBuffer : ActorRef = system.actorOf(Props[RandomOrgBuffer], "buffer" ) |
К нашему великому удивлению, возвращаемый объект не RandomOrgBuffer
типу RandomOrgBuffer
как наш актер, это даже не актер. Благодаря ActorRef, который является оберткой (прокси) вокруг актера:
- внутреннее состояние субъекта недоступно извне (инкапсуляция)
- система Akka следит за тем, чтобы метод
receive
каждого субъекта обрабатывал не более одного сообщения за раз (однопоточное) и очереди, ожидающие сообщения - Фактический актер может быть развернут на другом компьютере в кластере,
ActorRef
прозрачно и незаметно для клиента отправляет сообщения по проводной связи на правильный узел в кластере (подробнее об этом позже в этой серии).
При этом давайте как-нибудь «вернем» случайные числа, извлеченные внутри нашего актера. Оказывается, внутри каждого актера есть метод с очень многообещающим именем отправителя. Не удивительно, если я скажу, что это ActorRef
указывающий на актера, который отправил сообщение, которое мы сейчас обрабатываем:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
object Bootstrap extends App { //... randomOrgBuffer ! RandomRequest //... } //... class RandomOrgBuffer extends Actor with ActorLogging { def receive = { case RandomRequest = > if (buffer.isEmpty) { buffer ++ = fetchRandomNumbers( 50 ) } sender ! buffer.dequeue() } } |
Я надеюсь, что вы уже привыкли! нотация используется для отправки сообщения актеру. Если нет, есть более консервативные альтернативы:
1
2
|
sender tell buffer.dequeue() sender.tell(buffer.dequeue()) |
Тем не менее, вместо того, чтобы печатать новые случайные числа на экране, мы отправляем их обратно отправителю. Быстрый тест нашей программы показывает, что … ничего не происходит. Внимательно Actor[akka://Akka/deadLetters]
ссылку на отправителя, мы обнаруживаем, что она указывает на Actor[akka://Akka/deadLetters]
. deadLetters
. deadLetters
звучит не очень хорошо, но это логично. отправитель представляет собой ссылку на актера, который отправил данное сообщение. Мы отправили сообщение в обычный класс Scala, а не от актера. Если бы мы использовали двух актеров, и первый отправил бы сообщение второму, то второй актер может использовать ссылку отправителя, указывающую на первого, чтобы отправить ответ обратно. Очевидно, что мы все равно не сможем получить ответ, несмотря на увеличение абстракции.
Скоро мы рассмотрим сценарии с несколькими актерами, а пока нам нужно научиться интегрировать нормальный, не-Akka-код с актерами. Другими словами, как получить ответ, чтобы Акка была не только черной дырой, которая принимает сообщения и никогда не отправляет никаких результатов обратно. Решение удивительно простое — мы можем дождаться ответа!
1
2
3
4
|
implicit val timeout = Timeout( 1 minutes) val future = randomOrgBuffer ? RandomRequest val veryRandom : Int = Await.result(future.mapTo[Int], 1 minute) |
Название будущее не случайно. Хотя это не экземпляр java.util.concurrent.Future
, семантически он представляет ту же концепцию. Но сначала обратите внимание, что вместо восклицательного знака мы используем вопросительный знак (?) Для отправки сообщения. Эта модель общения известна как «спросить», в отличие от уже введенного «сказать». По сути, Akka создал временного актера по имени Actor[akka://Akka/temp/$d]
, отправил сообщение от имени этого актера и теперь ждет до одной минуты ответа, отправленного обратно вышеупомянутому временному актеру. Отправка сообщения по-прежнему не блокирует, и future
объект представляет результат операции, которая еще не была завершена (она будет доступна в будущем ). Далее (теперь в режиме блокировки) мы ждем ответа. Кроме того, mapTo[Int]
необходимо, поскольку Akka не знает, какой ответ мы ожидаем.
Вы должны помнить, что использование шаблона «спросить» и ожидание / блокировка ответа очень редки. Обычно мы полагаемся на асинхронные сообщения и управляемую событиями архитектуру. Один актер никогда не должен блокировать ожидание ответа от другого актера. Но в данном конкретном случае нам нужен прямой доступ к возвращаемому значению, поскольку мы строим мост между обязательным методом запрос / ответ и управляемой сообщениями системой Akka. Имея ответ, какие интересные варианты использования мы можем поддержать? Например, мы можем спроектировать нашу собственную реализацию java.util.Random
основанную исключительно на идеальных, истинных случайных числах!
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
class RandomOrgRandom(randomOrgBuffer : ActorRef) extends java.util.Random { implicit val timeout = Timeout( 1 minutes) override def next(bits : Int) = { if (bits < = 16 ) { random 16 Bits() & (( 1 << bits) - 1 ) } else { (next(bits - 16 ) << 16 ) + random 16 Bits() } } private def random 16 Bits() : Int = { val future = randomOrgBuffer ? RandomRequest Await.result(future.mapTo[Int], 1 minute) } } |
Детали реализации не имеют значения, достаточно сказать, что мы должны реализовать метод next (), возвращающий запрошенное количество случайных битов, тогда как наш субъект всегда возвращает 16 битов. Единственное, что нам сейчас нужно, — это легкая scala.util.Random
упаковка java.util.Random
и наслаждение идеально перемешанной последовательностью чисел:
1
2
3
4
|
val javaRandom = new RandomOrgRandom(randomOrgBuffer) val scalaRandom = new scala.util.Random(javaRandom) println(scalaRandom.shuffle(( 1 to 20 ).toList)) //List(17, 15, 14, 6, 10, 2, 1, 9, 8, 3, 4, 16, 7, 18, 13, 11, 19, 5, 12, 20) |
Давайте подведем итоги. Сначала мы разработали простую систему, основанную на одном действующем субъекте, который (при необходимости) подключается к внешнему веб-сервису и буферизует пакет случайных чисел. По запросу он отправляет обратно один номер из буфера. Далее мы интегрировали асинхронный мир актеров с синхронным API. Обернув наш актер, мы реализовали нашу собственную реализацию java.util.Random (см. Также java.security.SecureRandom ). Этот класс теперь можно использовать в любом месте, где нам нужны случайные числа очень высокого качества. Однако реализация далека от совершенства, о чем мы поговорим в следующих частях.
Исходный код доступен на GitHub ( тег запрос-ответ ).
Это был перевод моей статьи « Poznajemy Akka: zadanie i odpowiedz », первоначально опубликованной на scala.net.pl .
Ссылка: Запрос и ответ — узнайте об Akka от нашего партнера по JCG Томаша Нуркевича в блоге на Java и по соседству .