Статьи

Неблокирующий ввод / вывод — обнаружение Akka

Пришло время следовать некоторым хорошим практикам при реализации актеров. Одно из самых важных правил, которым мы должны следовать, — это избегать любых операций ввода-вывода, блокирующих, опроса, ожидания ожидания, ожидания и т. Д. Проще говоря, актер при обработке сообщения должен зависеть только от ЦП, и если он не нуждается в его циклировании. следует немедленно вернуться из receive и позволить другим участникам процесса. Если мы строго следуем этому правилу, Akka может легко обрабатывать сотни тысяч сообщений в секунду, используя всего несколько потоков. Не должно вызывать удивления то, что, хотя наше приложение может включать в себя тысячи, казалось бы, независимых акторов (например, по одному актору на каждое HTTP-соединение, одному игроку в MMO игре и т. Д.), Каждый актер получает только ограниченное время ЦП в пуле нитей. При использовании 10 потоков по умолчанию, которые обрабатывают всех участников системы, достаточно одного блокирующего или спящего субъекта, чтобы снизить пропускную способность на 10%. Поэтому 10 действующих лиц, спящих одновременно, полностью останавливают систему.

По этой причине вызов sleep() или активное ожидание ответа от какого-либо другого субъекта крайне нежелательно при receive . К сожалению, не существует зрелой асинхронной библиотеки, эквивалентной JDBC (но смотрите postgresql-netty , adbcj , async-mysql-connector и также связанные с ней: mongodb-async-driver ) и использование NIO довольно проблематично. Но мы должны искать альтернативы и по возможности избегать блокировки кода. В нашем примере приложения выборка случайных чисел из внешнего веб-сервиса была реализована довольно наивно:

1
2
3
4
5
val url = new URL("https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new")
val connection = url.openConnection()
val stream = Source.fromInputStream(connection.getInputStream)
sender ! RandomOrgServerResponse(stream.getLines().map(_.toInt).toList)
stream.close()

Этот код блокирует ожидание ответа HTTP в течение одной минуты . Это означает, что наш актер не может обработать любое другое сообщение в течение нескольких секунд. Более того, он содержит один поток (по умолчанию из десяти) из рабочего пула Akka. Предполагается, что этот пул будет разделен между тысячами актеров, так что это немного эгоистично.

К счастью, существуют зрелые асинхронные клиентские библиотеки HTTP, а именно async-http-client (на основе netty , с оболочкой Scala, называемой Dispatch ) и Jetty HttpClient . Для целей тестирования мы будем использовать первый (и оставим Dispatch на потом). API довольно очевиден: он запрашивает у нас целевой URL и объект обратного вызова, который будет использоваться при получении ответа. Таким образом, отправка HTTP-запроса является асинхронной и неблокирующей (субъект может быстро потреблять больше входящих сообщений), а ответ поступает асинхронно из другого потока:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
implicit def block2completionHandler[T](block: Response => T) = new AsyncCompletionHandler[T]() {
  def onCompleted(response: Response) = block(response)
}
 
def receive = {
  case FetchFromRandomOrg(batchSize) =>
    val curSender = sender
    val url = "https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new"
    client.prepareGet(url).execute {
      response: Response =>
        val numbers = response.getResponseBody.lines.map(_.toInt).toList
        curSender ! RandomOrgServerResponse(numbers)
    }
}

Мы очень близки к действительно опасной ошибке в коде выше. Обратите внимание, как я делаю локальную копию отправителя с именем curSender . Если бы я не сделал этого, блок кода, выполняемый при получении ответа, прочитал бы текущее значение отправителя. Текущий, то есть если бы наш актер обрабатывал какое-то другое сообщение одновременно, это указывало бы на отправителя этого другого сообщения. Как примечание, это одна из причин, почему переменные, к которым обращаются анонимные внутренние классы в Java, должны быть окончательными. Это также хорошая причина избегать произвольных блоков кода, вызываемых асинхронно внутри акторов. Гораздо лучше извлечь их в отдельный класс за пределами субъекта, чтобы избежать случайного доступа к внутреннему состоянию субъекта.

Оставим наш пример ненадолго. Представьте, насколько масштабируемой была бы наша общая архитектура для обычного читателя RSS / Atom в качестве службы. Для каждого URL канала мы можем создать одного участника (и мы отслеживаем тысячи каналов, то есть столько участников). Актер отправляет асинхронный запрос на каждый сайт и ждет ответа. Теоретически, используя только один рабочий поток, мы можем обрабатывать тысячи каналов / серверов, обрабатывая результаты на лету по мере их поступления (в конце концов, каждый сервер имеет разное время отклика). В классической модели блокировки мы можем обрабатывать только столько потоков одновременно, сколько потоков мы можем использовать (конечно, не несколько тысяч), не говоря уже о том, что каждый поток требует значительного объема памяти.

Если вы видите некоторые сходства с node.js, вы на правильном пути. Эта структура полностью основана на асинхронном вводе-выводе, что позволяет обрабатывать большое количество одновременных соединений, используя только один (!) Поток.

Исходный код этой статьи доступен на GitHub в теге non-blocking-io .

Это был перевод моей статьи « Poznajemy Akka: nieblokujace I / O », первоначально опубликованной на scala.net.pl .

Ссылка: неблокирующая операция ввода-вывода — обнаружение Akka от нашего партнера по JCG Томаша Нуркевича в блоге Java и соседей .