Надеюсь, вам пока весело, но наше приложение имеет серьезный дефект производительности. После измерения времени отклика RandomOrgRandom
класса, который мы разработали в предыдущей части, мы заметим что-то действительно тревожное (диаграмма представляет время отклика последующих вызовов в миллисекундах):
Оказывается, что регулярно время отклика (время возврата одного случайного числа) на несколько порядков больше (логарифмическая шкала!). Вспоминая, как был реализован актер, причина этого совершенно очевидна: наш актер охотно выбирает 50 случайных чисел и заполняет буфер , возвращая один номер за другим. Если буфер пуст, субъект выполняет блокирующий вызов ввода-вывода для
random.org
веб-службы, что занимает около половины секунды. Это хорошо видно на графике — каждый 50-й вызов намного, намного медленнее. В некоторых случаях такое поведение будет приемлемым (точно так же, как непредсказуемая сборка мусора может время от времени увеличивать задержку ответа). Но все же давайте попробуем улучшить нашу реализацию.
Есть ли у вас идеи, как мы можем сделать время отклика более предсказуемым и
плоским? Я предлагаю контролировать размер буфера, и когда он становится опасно близким к пустоте, мы инициируем выборку случайных чисел в фоновом режиме. Мы надеемся, что благодаря этому архитектурному изменению никогда не будет полностью пустым, поскольку фоновый ответ поступит до того, как мы достигнем дна. Тем не менее, мы не обязаны охотно получать слишком много случайных чисел. Вы помните оригинальную наивную реализацию с использованием синхронизации Java? У него была та же проблема, что и у нашей нынешней системы с одним актером. Но теперь мы наконец увидим истинную силу системы Akka, основанной на актерах. Для реализации предложенного выше улучшения нам понадобится фоновый поток для получения чисел и некоторый метод синхронизации в буфере (так как он будет доступен одновременно).
В Akka каждый актер логически однопоточный, поэтому нас не беспокоит синхронизация. Сначала мы
разделим обязанности : один actor (
RandomOrgBuffer
) будет хранить буфер случайных чисел и возвращать их по запросу. Второй актер (
RandomOrgClient
) вряд ли будет отвечать за выборку новых серий случайных чисел. Когда
RandomOrgBuffer
обнаруживается низкий уровень буфера, он просит
RandomOrgClient
(отправив сообщение) начать выборку новых случайных чисел. Наконец, когда
RandomOrgClient
получает ответ от
random.org
сервера, он отправляет новую партию случайных чисел
RandomOrgBuffer
(конечно, снова используя ответное сообщение).
RandomOrgBuffer
заполняет буфер новыми номерами.
Начнем со второго актера, ответственного за общение с
random.org
веб-сервис в фоновом режиме. Этот субъект инициирует HTTP-запрос, когда получает
FetchFromRandomOrg
сообщение — сообщение, содержащее желаемый размер пакета для извлечения. Когда приходит ответ, мы анализируем его и отправляем всю партию обратно отправителю (
RandomOrgBuffer
в данном случае). Этот код очень похож на то, что мы уже видели в
fetchRandomNumbers()
:
case class FetchFromRandomOrg(batchSize: Int) case class RandomOrgServerResponse(randomNumbers: List[Int]) class RandomOrgClient extends Actor { protected def receive = LoggingReceive { case FetchFromRandomOrg(batchSize) => val url = new URL("https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new") val connection = url.openConnection() val stream = Source.fromInputStream(connection.getInputStream) sender ! RandomOrgServerResponse(stream.getLines().map(_.toInt).toList) stream.close() } }
Теперь пришло время для действующего субъекта обрабатывать запросы от потенциально нескольких клиентов, запрашивающих одно случайное число. К сожалению, логика становится довольно сложной (но по крайней мере нам не нужно беспокоиться о синхронизации и безопасности потоков). Прежде всего,
RandomOrgBuffer
теперь нужно обработать два разных сообщения:
RandomRequest
как прежде, чем пришло из кода клиента, так и new
RandomOrgServerResponse
(см. Код выше), содержащий пакет новых случайных чисел, отправленных с
RandomOrgClient
. Во-вторых,
RandomOrgBuffer
нужно помнить, что он инициировал процесс получения новых случайных кодов путем отправки
FetchFromRandomOrg
. В противном случае мы рискуем запустить несколько одновременных соединений
random.org
или накапливать их без необходимости.
class RandomOrgBuffer extends Actor with ActorLogging { val BatchSize = 50 val buffer = new Queue[Int] var waitingForResponse = false val randomOrgClient = context.actorOf(Props[RandomOrgClient], name="client") preFetchIfAlmostEmpty() def receive = { case RandomRequest => preFetchIfAlmostEmpty() sender ! buffer.dequeue() case RandomOrgServerResponse(randomNumbers) => buffer ++= randomNumbers waitingForResponse = false } private def preFetchIfAlmostEmpty() { if(buffer.size <= BatchSize / 4 && !waitingForResponse) { randomOrgClient ! FetchFromRandomOrg(BatchSize) waitingForResponse = true } } }
Ключевой частью является
preFetchIfAlmostEmpty()
метод, который инициирует процесс извлечения случайных чисел (в фоновом режиме, другим субъектом). Если уровень буфера слишком низкий (я полагаю, 1/4 от размера пакета), и мы еще не ждем ответа от
random.org
, мы отправляем соответствующее сообщение
RandomOrgClient
. Мы также вызываем этот метод сразу при запуске (когда буфер полностью пуст), чтобы разогреть буфер при поступлении первого запроса. Обратите внимание, как один актер может
создать экземпляр другого актера, вызвав
context.actorOf
(на самом деле это
ActorRef
).
Этот код содержит огромную ошибку, вы можете ее обнаружить? Представьте, что произойдет, если вдруг
RandomOrgBuffer
получит сотни
RandomRequest
сообщения одновременно? Больше, чем размер буфера, даже сразу после его заполнения? Или если запрос приходит сразу после создания экземпляра субъекта, когда буфер еще пуст? К сожалению, я должен разобраться с ситуацией, когда
RandomRequest
приходит, и наш буфер полностью пуст, в то время как
random.org
ответ еще не вернулся. Однако мы не можем просто блокировать, ожидая, пока не придет ответ с новой партией случайных чисел. Мы не можем сделать это по одной простой причине: если мы спим или ждем во время обработки другим способом
RandomRequest
, мы не можем обработать любое другое сообщение, включая
RandomOrgServerResponse
— помните, что один актер может обрабатывать только одно сообщение одновременно! Мы не только введем взаимную блокировку в нашем приложении, но и нарушим одно из самых важных правил в Akka: никогда не блокировать и не спать внутри актера — подробнее об этом в следующих статьях.
Правильный способ справиться с этой ситуацией состоит в том, чтобы иметь очередь ожидающих актеров — которая отправила нас,
RandomRequest
но мы были неспособны немедленно отправить ответ из-за пустого буфера. Как только мы
RandomOrgServerResponse
вернемся, наш приоритет — обработать этих ожидающих участников, а затем сосредоточиться на последующих запросах в режиме реального времени. Существует интересный крайний случай — если число ожидающих актеров было настолько велико, что после выполнения их запросов буфер снова почти пуст, мы отправляем
FetchFromRandomOrg
немедленно еще раз. Это может стать еще интереснее — представьте, что буфер пуст, и мы все еще не удовлетворили всех ожидающих актеров. Следующий код обрабатывает все эти повороты:
class RandomOrgBuffer extends Actor with ActorLogging { val BatchSize = 50 val buffer = new Queue[Int] val backlog = new Queue[ActorRef] var waitingForResponse = false //... def receive = LoggingReceive { case RandomRequest => preFetchIfAlmostEmpty() if(buffer.isEmpty) { backlog += sender } else { sender ! buffer.dequeue() } case RandomOrgServerResponse(randomNumbers) => buffer ++= randomNumbers waitingForResponse = false while(!backlog.isEmpty && !buffer.isEmpty) { backlog.dequeue() ! buffer.dequeue() } preFetchIfAlmostEmpty() } }
В нашей окончательной версии
backlog
очередь представляет ожидающих актеров. Обратите внимание, как
RandomOrgServerResponse
обрабатывается: если сначала попытаться удовлетворить как можно больше ожидающих актеров. Очевидно, что наша цель состояла в том, чтобы исключить чрезвычайно большое время отклика, поэтому мы должны стремиться к минимальному
backlog
использованию очереди, поскольку каждый размещенный там актер с большей вероятностью будет ждать дольше. В идеале
backlog
очередь всегда пуста и
RandomOrgBuffer
корректирует размер пакета, запрашиваемый каждый раз, в зависимости от текущей загрузки (выборка пакетов меньшего или большего размера, а также настройка порога, ниже которого буфер считается почти пустым). Но я оставлю эти улучшения вам. Наконец, мы можем измерить
RandomOrgBuffer
время отклика (линейная шкала, задержки больше не распределены неравномерно):
Обработка входящих сообщений
RandomOrgBuffer
стала довольно сложной. Меня особенно беспокоит
waitingForResponse
флаг (я ненавижу флаги!). В следующей статье мы узнаем, как с этим справиться очень четко, идиоматично и объектно-ориентировано.