receive
и позволить другим актерам обрабатывать. Если мы строго следуем этому правилу, Akka может легко обрабатывать сотни тысяч сообщений в секунду, используя всего несколько потоков. Не должно вызывать удивления то, что, хотя наше приложение может включать в себя тысячи, казалось бы, независимых акторов (например, по одному действующему лицу на каждое соединение HTTP, одному игроку в MMO-игре и т. Д.), Каждый действующий субъект получает только ограниченное время ЦП в пуле. нитей. При использовании 10 потоков по умолчанию, которые обрабатывают всех участников системы, достаточно одного блокирующего или спящего субъекта, чтобы снизить пропускную способность на 10%. Поэтому 10 действующих лиц, спящих одновременно, полностью останавливают систему.По этой причине звонить
sleep()
или активно ждать ответа от какого-либо другого актора крайне не рекомендуется в
receive
, К сожалению, не существует зрелой асинхронной библиотеки, эквивалентной JDBC (но смотрите
postgresql-netty ,
adbcj ,
async-mysql-connector и также связанные с ней:
mongodb-async-driver ) и использование NIO довольно проблематично. Но мы должны искать альтернативы и по возможности избегать блокировки кода. В
нашем примере приложения выборка случайных чисел из внешнего веб-сервиса была реализована довольно наивно:
val url = new URL("https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new") val connection = url.openConnection() val stream = Source.fromInputStream(connection.getInputStream) sender ! RandomOrgServerResponse(stream.getLines().map(_.toInt).toList) stream.close()
Этот код блокирует ожидание ответа HTTP в течение
одной минуты . Это означает, что наш актер не может обработать любое другое сообщение в течение нескольких секунд. Более того, он содержит один поток (по умолчанию из десяти) из рабочего пула Akka. Предполагается, что этот пул будет разделен между тысячами актеров, так что это немного эгоистично.
К счастью, существуют зрелые асинхронные клиентские библиотеки HTTP, а именно
async-http-client (на основе
netty , с
оболочкой Scala, называемой Dispatch ) и
Jetty HttpClient., Для целей тестирования мы будем использовать первый (и оставим Dispatch на потом). API довольно очевиден: он запрашивает у нас целевой URL и объект обратного вызова, который будет использоваться при получении ответа. Таким образом, отправка HTTP-запроса является асинхронной и неблокирующей (субъект может быстро потреблять больше входящих сообщений), а ответ поступает асинхронно из другого потока:
implicit def block2completionHandler[T](block: Response => T) = new AsyncCompletionHandler[T]() { def onCompleted(response: Response) = block(response) } def receive = { case FetchFromRandomOrg(batchSize) => val curSender = sender val url = "https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new" client.prepareGet(url).execute { response: Response => val numbers = response.getResponseBody.lines.map(_.toInt).toList curSender ! RandomOrgServerResponse(numbers) } }
Мы очень близки к действительно опасной ошибке в коде выше. Обратите внимание, как я делаю локальную копию
sender
звонка
curSender
. Если бы я не сделал этого, блок кода, выполняемый при получении ответа, прочитал бы
текущее значение
sender
.
Текущий , то есть если бы наш актер обрабатывал какое-то другое сообщение одновременно, это указывало бы на отправителя этого другого сообщения. Как примечание, это одна из причин, почему переменные, к которым обращаются из анонимных внутренних классов в Java, должны быть
final
. Это также хорошая причина избегать произвольных блоков кода, вызываемых асинхронно внутри акторов. Гораздо лучше извлечь их в отдельный класс за пределами субъекта, чтобы избежать случайного доступа к внутреннему состоянию субъекта.
Оставим наш пример ненадолго. Представьте, насколько масштабируемой была бы наша общая архитектура для обычного читателя RSS / Atom в качестве службы. Для каждого URL канала мы можем создать одного участника (и мы отслеживаем тысячи каналов, то есть столько участников). Актер отправляет асинхронный запрос на каждый сайт и ждет ответа. Теоретически, используя только один рабочий поток, мы можем обрабатывать тысячи каналов / серверов, обрабатывая результаты на лету по мере их поступления (в конце концов, каждый сервер имеет разное время отклика). В классической модели блокировки мы можем обрабатывать только столько потоков одновременно, сколько потоков мы можем использовать (конечно, не несколько тысяч), не говоря уже о том, что каждый поток требует значительного объема памяти.
Если вы видите некоторые сходства с
node.jsТы на правильном пути. Эта структура полностью основана на асинхронном вводе-выводе, что позволяет обрабатывать большое количество одновременных соединений, используя только один (!) Поток.
Исходный код этой статьи
доступен на GitHub в
non-blocking-io
теге .
Это был перевод моей статьи «Poznajemy Akka: nieblokujące I / O», первоначально опубликованной на scala.net.pl .