В предыдущей части мы реализовали нашего первого актера и отправили ему сообщение. К сожалению, актер не смог вернуть какой-либо результат обработки этого сообщения, что сделало его довольно бесполезным. В этом эпизоде мы узнаем, как отправить ответное сообщение отправителю и как интегрировать синхронный, блокирующий API с (по определению) асинхронной системой на основе передачи сообщений.
Прежде чем мы начнем, я должен провести очень сильное различие между актером (расширяющая Actor
черта) и ActorRef
типом актера . При реализации актера мы расширяем Actor
черту, которая заставляет нас реализовывать receive
метод. Однако мы не создаем экземпляры актеров напрямую, а спрашиваем ActorSystem
:
val randomOrgBuffer: ActorRef = system.actorOf(Props[RandomOrgBuffer], "buffer")
К нашему великому удивлению, возвращаемый объект не такого RandomOrgBuffer
типа, как наш актер, это даже не объект Actor
. Спасибо ActorRef
, который является оберткой ( прокси ) вокруг актера:
- внутреннее состояние субъекта недоступно извне (инкапсуляция)
- система Akka следит за тем, чтобы
receive
метод каждого субъекта обрабатывал не более одного сообщения за раз (однопоточное) и находился в очереди в ожидании сообщений - Фактический субъект может быть развернут на другом компьютере в кластере,
ActorRef
прозрачно и незаметно для клиента отправляет сообщения по проводной связи на правильный узел в кластере (подробнее об этом позже в этой серии). При этом давайте как-нибудь «вернем» случайные числа, извлеченные внутри нашего актера. Оказывается, внутри каждого актера есть метод с очень многообещающим названиемsender
. Не удивительно, если я скажу, что этоActorRef
указание на актера, который отправил сообщение, которое мы сейчас обрабатываем:
object Bootstrap extends App { //... randomOrgBuffer ! RandomRequest //... } //... class RandomOrgBuffer extends Actor with ActorLogging { def receive = { case RandomRequest => if(buffer.isEmpty) { buffer ++= fetchRandomNumbers(50) } sender ! buffer.dequeue() } }
Я надеюсь, что вы уже привыкли к !
записи, используемой для отправки сообщения актеру. Если нет, есть более консервативные альтернативы:
sender tell buffer.dequeue() sender.tell(buffer.dequeue())
Тем не менее, вместо того, чтобы печатать новые случайные числа на экране, мы отправляем их обратно отправителю. Быстрый тест нашей программы показывает, что … ничего не происходит. Внимательно изучая sender
ссылку, мы обнаруживаем, что она указывает на Actor[akka://Akka/deadLetters]
. deadLetters
звучит не очень хорошо, но это логично. sender
представляет ссылку на актера, который отправил данное сообщение. Мы отправили сообщение в обычный класс Scala, а не от актера. Если бы мы использовали двух актеров, и первый отправил бы сообщение второму, то второй актер может использовать sender
ссылку, указывающую на первого актера, чтобы отправить ответ обратно. Очевидно, что мы все равно не сможем получить ответ, несмотря на увеличение абстракции.
Скоро мы рассмотрим сценарии с несколькими актерами, а пока нам нужно научиться интегрировать нормальный, не-Akka-код с актерами. Другими словами, как получить ответ, чтобы Акка была не только черной дырой, которая принимает сообщения и никогда не отправляет никаких результатов обратно. Решение удивительно простое — мы можем дождаться ответа!
implicit val timeout = Timeout(1 minutes) val future = randomOrgBuffer ? RandomRequest val veryRandom: Int = Await.result(future.mapTo[Int], 1 minute)
Название future
не случайно. Хотя это не пример java.util.concurrent.Future
, семантически он представляет ту же самую концепцию. Но сначала обратите внимание, что вместо восклицательного знака мы используем вопросительный знак ( ?
) для отправки сообщения. Эта коммуникационная модель известна как « спросить », в отличие от уже введенного « рассказать ». По сути, Акка создал временного актера по имени , отправил сообщение от имени этого актера и теперь ждет до одной минуты ответа, отправленного обратно вышеупомянутому временному актеру. Отправка сообщения все еще не является блокирующей, и объект представляет собой результат операции, которая еще не была завершена (она будет доступна в будущем ). Далее (теперь в режиме блокировки) мы ждем ответа. К тому жеActor
future
mapTo[Int]
необходимо, так как Акка не знает, какой ответ мы ожидаем.
Вы должны помнить, что использование шаблона « спросить » и ожидание / блокировка ответа очень редки. Обычно мы полагаемся на асинхронные сообщения и управляемую событиями архитектуру. Один актер никогда не должен блокировать ожидание ответа от другого актера. Но в данном конкретном случае нам нужен прямой доступ к возвращаемому значению, поскольку мы строим мост между обязательным методом запрос / ответ и управляемой сообщениями системой Akka. Имея ответ, какие интересные варианты использования мы можем поддержать? Например, мы можем разработать нашу собственную [a href = «http://docs.oracle.com/javase/7/docs/api/java/util/Random.html»] java.util.Random
реализацию, полностью основанную на идеальных, истинных случайных числах!
class RandomOrgRandom(randomOrgBuffer: ActorRef) extends java.util.Random { implicit val timeout = Timeout(1 minutes) override def next(bits: Int) = { if(bits <= 16) { random16Bits() & ((1 << bits) - 1) } else { (next(bits - 16) << 16) + random16Bits() } } private def random16Bits(): Int = { val future = randomOrgBuffer ? RandomRequest Await.result(future.mapTo[Int], 1 minute) } }
Детали реализации не имеют значения, достаточно сказать, что мы должны реализовать next()
метод, возвращающий запрошенное количество случайных битов, тогда как наш субъект всегда возвращает 16 битов. Единственное, что нам сейчас нужно, — это легкая scala.util.Random
упаковка java.util.Random
и наслаждение идеально перемешанной последовательностью чисел:
val javaRandom = new RandomOrgRandom(randomOrgBuffer) val scalaRandom = new scala.util.Random(javaRandom) println(scalaRandom.shuffle((1 to 20).toList)) //List(17, 15, 14, 6, 10, 2, 1, 9, 8, 3, 4, 16, 7, 18, 13, 11, 19, 5, 12, 20)
Давайте подведем итоги. Сначала мы разработали простую систему, основанную на одном действующем субъекте, который (при необходимости) подключается к внешнему веб-сервису и буферизует пакет случайных чисел. По запросу он отправляет обратно один номер из буфера. Далее мы интегрировали асинхронный мир актеров с синхронным API. Обернув наш актер, мы реализовали нашу собственную java.util.Random
реализацию (см. Также java.security.SecureRandom
). Этот класс теперь можно использовать в любом месте, где нам нужны случайные числа очень высокого качества. Однако реализация далека от совершенства, о чем мы поговорим в следующих частях.
Исходный код доступен на GitHub ( тег запрос-ответ ).
Это был перевод моей статьи
« Poznajemy Akka: żądanie i odpowiedź », первоначально опубликованной на
scala.net.pl .