Статьи

Ваше первое сообщение — открытие Акки

Akka — это платформа (фреймворк?), Вдохновленная Erlang, обещающая более легкую разработку масштабируемых, многопоточных и безопасных приложений. В то время как в большинстве популярных языков параллелизм основан на памяти, разделяемой между несколькими потоками, защищенной различными методами синхронизации, Akka предлагает модель параллелизма, основанную на акторах. Актер — это легкий объект, с которым вы можете едва взаимодействовать, отправляя ему сообщения. Каждый субъект может обрабатывать не более одного сообщения за раз и, очевидно, может отправлять сообщения другим субъектам. В одной виртуальной машине Java миллионы действующих лиц могут существовать одновременно, создавая иерархическую структуру parent ( supervisor ) — children, где parent контролирует поведение потомков. Если этого недостаточно, мы можем легко разделить наших участников между несколькими узлами в кластере — без изменения одной строки кода. Каждый субъект может иметь внутреннее состояние (набор полей / переменных), но связь может происходить только через передачу сообщений, но не через общие структуры данных (счетчики, очереди).

Сочетание вышеуказанных функций приводит к созданию гораздо более безопасного, более стабильного и масштабируемого кода — за счет радикального изменения парадигмы в модели параллельного программирования. Так много модных слов и обещаний, давайте продолжим с примером. И это не будет пример «Привет, мир», но мы попытаемся создать небольшое, но законченное решение. В следующих нескольких статьях мы будем реализовывать интеграцию с API random.org . Этот веб-сервис позволяет нам получать действительно случайные числа (в отличие от псевдослучайных генераторов) на основе атмосферного шума (что бы это ни значило). API не так уж и сложен, пожалуйста, посетите следующий веб-сайт и обновите его пару раз:

https://www.random.org/integers/?num=20&min=1000&max=10000&col=1&base=10&format=plain

Так в чем же трудность? Читая инструкции для автоматизированных клиентов, мы узнаем, что:

  1. Клиентское приложение должно вызывать указанный выше URL не более чем из одного потока — запрещено одновременно получать случайные числа, используя несколько HTTP-соединений.
  2. Мы должны загружать случайные числа в пакетах, а не по одному в каждом запросе. Запрос выше занимает num = 20 номеров за один звонок.
  3. Нас предупреждают о задержке, ответ может прийти даже через одну минуту
  4. Клиент должен периодически проверять квоту случайных чисел (услуга бесплатна только до заданного количества случайных бит в день)

Все эти требования делают интеграцию с random.org нетривиальной. В этой серии, которую я только начал, мы будем постепенно улучшать наше приложение, шаг за шагом изучая новые функции Akka. Вскоре мы поймем, что довольно крутая кривая обучения окупится быстро, как только мы поймем основные концепции платформы. Итак, давайте код!

Сегодня мы попытаемся выполнить первые два требования: это не более одного соединения в любой данный момент времени и загрузка номеров в пакетах. Для этого шага нам не нужен Akka, достаточно простой синхронизации и буфера:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
val buffer = new Queue[Int]
 
def nextRandom(): Int = {
  this.synchronized {
    if(buffer.isEmpty) {
      buffer ++= fetchRandomNumbers(50)
    }
    buffer.dequeue()
  }
}
 
def fetchRandomNumbers(count: Int) = {
  val url = new URL("https://www.random.org/integers/?num=" + count + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new")
  val connection = url.openConnection()
  val stream = Source.fromInputStream(connection.getInputStream)
  val randomNumbers = stream.getLines().map(_.toInt).toList
  stream.close()
  randomNumbers
}

Этот код работает и эквивалентен synchronized keyword in Java . Принцип работы nextRandom() должен быть очевиден: если буфер пуст, заполните его 50 случайными числами, выбранными с сервера. В конце возьмите и верните первое значение из буфера. Этот код имеет несколько недостатков, начиная с synchronized блока в первую очередь. Скорее дорогая синхронизация для каждого вызова кажется излишним. И мы даже не в кластере, где нам пришлось бы поддерживать одно активное соединение на весь кластер, а не только с одной JVM!

Начнем с реализации одного актера. Actor — это класс, расширяющий черту Actor и реализующий метод receive . Этот метод отвечает за получение и обработку одного сообщения. Давайте повторим то, что мы уже сказали: каждый субъект может обрабатывать не более одного сообщения за раз, поэтому метод приема никогда не вызывается одновременно. Если субъект уже обрабатывает какое-либо сообщение, остальные сообщения хранятся в очереди, выделенной для каждого субъекта. Благодаря этому строгому правилу мы можем избежать любой синхронизации внутри актера, которая всегда поточно-ориентирована.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
case object RandomRequest
 
class RandomOrgBuffer extends Actor {
 
  val buffer = new Queue[Int]
 
  def receive = {
    case RandomRequest =>
      if(buffer.isEmpty) {
        buffer ++= fetchRandomNumbers(50)
      }
      println(buffer.dequeue())
  }
}

fetchRandomNumbers() остается прежним. Однопоточный доступ к random.org был осуществлен бесплатно, поскольку random.org может обрабатывать только одно сообщение за раз. Говоря о сообщениях, в данном случае RandomRequest — это наше сообщение — пустой объект, не передающий никакой информации, кроме его типа. В Akka сообщения почти всегда реализуются с использованием case-классов или других неизменяемых типов. Таким образом, если мы хотим поддерживать выборку произвольного числа случайных чисел, мы должны включить это как часть сообщения:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
case class RandomRequest(howMany: Int)
 
class RandomOrgBuffer extends Actor with ActorLogging {
 
  val buffer = new Queue[Int]
 
  def receive = {
    case RandomRequest(howMany) =>
      if(buffer.isEmpty) {
        buffer ++= fetchRandomNumbers(50)
      }
      for(_ <- 1 to (howMany min 50)) {
        println(buffer.dequeue())
      }
  }

Теперь мы должны попытаться отправить сообщение нашему новому актеру. Очевидно, мы не можем просто вызвать метод receive передавая сообщение в качестве аргумента. Сначала мы должны запустить платформу Akka и попросить ссылку на актера. Эта ссылка позже используется для отправки сообщения, используя сначала немного нелогично! метод, относящийся ко времени Эрланга:

1
2
3
4
5
6
7
8
object Bootstrap extends App {
  val system = ActorSystem("RandomOrgSystem")
  val randomOrgBuffer = system.actorOf(Props[RandomOrgBuffer], "buffer")
 
  randomOrgBuffer ! RandomRequest(10//sending a message
 
  system.shutdown()
}

После запуска программы мы должны увидеть 10 случайных чисел на консоли. Немного поэкспериментируйте с этим простым приложением (полный исходный код доступен на GitHub , тег request-response ). В частности, обратите внимание, что отправка сообщения неблокирует, а само сообщение обрабатывается в другом потоке (большая аналогия с JMS). Попробуйте отправить сообщение другого типа и исправить метод receive чтобы он мог обрабатывать более одного типа.

Наше приложение пока не очень полезно. Мы хотели бы получить доступ к нашим случайным числам как-то, а не печатать их (асинхронно!) Для стандартного вывода. Как вы, вероятно, можете догадаться, поскольку связь с субъектом может быть установлена ​​только посредством асинхронной передачи сообщений (субъект не может «вернуть» результат, равно как и не должен помещать его в какую-либо глобальную общую память). Таким образом, актер отправит результаты обратно через ответное сообщение, отправленное непосредственно нам (отправителю). Но это будет частью следующей статьи.

Это был перевод моей статьи « Познаемый Акка: pierwszy komunikat », первоначально опубликованной на scala.net.pl .

Ссылка: Ваше первое сообщение — знакомство с Akka от нашего партнера по JCG Томаша Нуркевича в блоге Java и соседей .