Статьи

Ваше первое сообщение – Открывая Акку

 Akka — это платформа (фреймворк?), Вдохновленная Erlang, обещающая более легкую разработку масштабируемых, многопоточных и безопасных приложений. В то время как в большинстве популярных языков параллелизм основан на памяти, разделяемой между несколькими потоками, защищенной различными методами синхронизации, Akka предлагает модель параллелизма, основанную на акторах. Актер — это легкий объект, с которым вы можете едва взаимодействовать, отправляя ему сообщения. Каждый субъект может обрабатывать не более одного сообщения за раз и, очевидно, может отправлять сообщения другим субъектам. В одной виртуальной машине Java миллионы действующих лиц могут существовать одновременно, создавая иерархического родителя ( супервизор) — структура детей, где родитель контролирует поведение детей. Если этого недостаточно, мы можем легко разделить наших участников между несколькими узлами в кластере — без изменения одной строки кода. Каждый субъект может иметь внутреннее состояние (набор полей / переменных), но связь может происходить только через передачу сообщений, но не через общие структуры данных (счетчики, очереди).

Сочетание вышеуказанных функций приводит к созданию гораздо более безопасного, более стабильного и масштабируемого кода — за счет радикального изменения парадигмы в модели параллельного программирования. Так много модных слов и обещаний, давайте продолжим с примером. И это не будет пример « Hello, world », но мы попытаемся создать небольшое, но законченное решение. В следующих нескольких статьях мы реализуем интеграцию сAPI random.org . Этот веб-сервис позволяет нам получать действительно случайные числа (в отличие от псевдослучайных генераторов) на основе атмосферного шума (что бы это ни значило). API не так уж и сложен, пожалуйста, посетите следующий веб-сайт и обновите его пару раз:

https://www.random.org/integers/?num=20&min=1000&max=10000&col=1&base=10&format=plain

Так в чем же сложность? ? Читая инструкции для автоматизированных клиентов, мы узнаем, что:

  1. Клиентское приложение должно вызывать указанный выше URL не более чем из одного потока — запрещено одновременно получать случайные числа, используя несколько HTTP-соединений.
  2. Мы должны загружать случайные числа в пакетах, а не по одному в каждом запросе. Запрос выше принимает num=20номера за один звонок.
  3. Нас предупреждают о задержке, ответ может прийти даже через одну минуту
  4. Клиент должен периодически проверять квоту случайных чисел (услуга бесплатна только до заданного количества случайных бит в день). Все эти требования делают интеграцию с random.orgнетривиальной. В этой серии, которую я только начал, мы будем постепенно улучшать наше приложение, шаг за шагом изучая новые функции Akka. Вскоре мы поймем, что довольно крутая кривая обучения окупится быстро, как только мы поймем основные концепции платформы. Итак, давайте код!

Сегодня мы попытаемся выполнить первые два требования: это не более одного соединения в любой данный момент времени и загрузка номеров в пакетах. Для этого шага нам не нужен Akka, достаточно простой синхронизации и буфера:

val buffer = new Queue[Int]
 
def nextRandom(): Int = {
  this.synchronized {
    if(buffer.isEmpty) {
      buffer ++= fetchRandomNumbers(50)
    }
    buffer.dequeue()
  }
}
 
def fetchRandomNumbers(count: Int) = {
  val url = new URL("https://www.random.org/integers/?num=" + count + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new")
  val connection = url.openConnection()
  val stream = Source.fromInputStream(connection.getInputStream)
  val randomNumbers = stream.getLines().map(_.toInt).toList
  stream.close()
  randomNumbers
}

Этот код работает и эквивалентен
synchronizedключевому слову в Java. Способ
nextRandom()работы должен быть очевиден: если буфер пуст, заполните его 50 случайными числами, выбранными с сервера. В конце возьмите и верните первое значение из буфера. Этот код имеет несколько недостатков, начиная с
synchronizedблока на первом месте. Скорее дорогая синхронизация для каждого вызова кажется излишним. И мы даже не в кластере, где нам пришлось бы поддерживать одно активное соединение на весь кластер, а не только с одной JVM!

Начнем с реализации одного актера. Актер в основном класс, расширяющий
Actorчерты и реализующий
receiveметод. Этот метод отвечает за получение и обработку одного сообщения. Давайте повторим то, что мы уже сказали: каждый субъект может обрабатывать не более одного сообщения за раз, поэтому
receiveметод никогда не вызывается
одновременно. Если субъект уже обрабатывает какое-либо сообщение, остальные сообщения хранятся в очереди, выделенной для каждого субъекта. Благодаря этому строгому правилу мы можем избежать
любой синхронизации внутри актера, которая всегда поточно-ориентирована.

case object RandomRequest
 
class RandomOrgBuffer extends Actor {
 
  val buffer = new Queue[Int]
 
  def receive = {
    case RandomRequest =>
      if(buffer.isEmpty) {
        buffer ++= fetchRandomNumbers(50)
      }
      println(buffer.dequeue())
  }
 
}

fetchRandomNumbers()метод остается прежним. Однопоточный доступ
random.orgбыл достигнут бесплатно, поскольку субъект может обрабатывать только одно сообщение за раз. Говоря о сообщениях, в данном случае
RandomRequestэто наше сообщение — пустой объект, не передающий никакой информации, кроме его типа. В Akka сообщения почти всегда реализуются с использованием case-классов или других неизменяемых типов. Таким образом, если мы хотим поддерживать выборку произвольного числа случайных чисел, мы должны включить это как часть сообщения:

case class RandomRequest(howMany: Int)
 
class RandomOrgBuffer extends Actor with ActorLogging {
 
  val buffer = new Queue[Int]
 
  def receive = {
    case RandomRequest(howMany) =>
      if(buffer.isEmpty) {
        buffer ++= fetchRandomNumbers(50)
      }
      for(_ <- 1 to (howMany min 50)) {
        println(buffer.dequeue())
      }
  }

Теперь мы должны попытаться отправить сообщение нашему новому актеру. Очевидно, что мы не можем просто вызвать
receiveметод, передающий сообщение в качестве аргумента. Сначала мы должны запустить платформу Akka и попросить ссылку на актера. Эта ссылка позже используется для отправки сообщения с использованием слегка нелогичного первого
!метода, начиная с дней Эрланга:

object Bootstrap extends App {
  val system = ActorSystem("RandomOrgSystem")
  val randomOrgBuffer = system.actorOf(Props[RandomOrgBuffer], "buffer")
 
  randomOrgBuffer ! RandomRequest(10)  //sending a message
 
  system.shutdown()
}

После запуска программы мы должны увидеть 10 случайных чисел на консоли. Немного поэкспериментируйте с этим простым приложением (полный исходный код
доступен на GitHub ,
request-responsetag ). В частности, обратите внимание, что отправка сообщения неблокирует, а само сообщение обрабатывается в другом потоке (большая аналогия с JMS). Попробуйте отправить сообщение другого типа и исправить
receiveметод, чтобы он мог обрабатывать более одного типа.

Наше приложение пока не очень полезно. Мы хотели бы получить доступ к нашим случайным числам как-то, а не печатать их (асинхронно!) Для стандартного вывода. Как вы, вероятно, можете догадаться, поскольку связь с актером может быть установлена ​​только с помощью асинхронной передачи сообщений (актер не может »
вернуть»«результат, он также не должен помещать его в какую-либо глобальную общую память). Таким образом, актер отправит результаты обратно через ответное сообщение, отправленное непосредственно нам (отправителю). Но это будет частью следующей статьи.