Надеюсь, вам пока весело, но наше приложение имеет серьезный дефект производительности. После измерения времени RandomOrgRandom
класса RandomOrgRandom
мы разработали в предыдущей части, мы заметим нечто действительно тревожное (диаграмма представляет время отклика последующих вызовов в миллисекундах):
Оказывается, что регулярно время отклика (время возврата одного случайного числа) на несколько порядков больше (логарифмическая шкала!). Вспоминая, как был реализован актер, причина этого совершенно очевидна: наш актер охотно выбирает 50 случайных чисел и заполняет буфер , возвращая один номер за другим. Если буфер пуст, random.org
выполняет блокирующий вызов ввода-вывода для веб-службы random.org
что занимает около половины секунды. Это хорошо видно на графике — каждый 50-й вызов намного, намного медленнее. В некоторых случаях такое поведение будет приемлемым (точно так же, как непредсказуемая сборка мусора может время от времени увеличивать задержку ответа). Но все же давайте попробуем улучшить нашу реализацию.
Есть ли у вас идеи, как мы можем сделать время отклика более предсказуемым и плоским ? Я предлагаю контролировать размер буфера, и когда он становится опасно близким к пустоте, мы инициируем выборку случайных чисел в фоновом режиме. Мы надеемся, что благодаря этому архитектурному изменению никогда не будет полностью пустым, поскольку фоновый ответ поступит до того, как мы достигнем дна. Тем не менее, мы не обязаны охотно получать слишком много случайных чисел. Вы помните оригинальную наивную реализацию с использованием синхронизации Java? У него была та же проблема, что и у нашей нынешней системы с одним актером. Но теперь мы наконец увидим истинную силу системы Akka, основанной на актерах. Для реализации предложенного выше улучшения нам понадобится фоновый поток для получения чисел и некоторый метод синхронизации в буфере (так как он будет доступен одновременно).
В Akka каждый актер логически однопоточный, поэтому нас не беспокоит синхронизация. Сначала мы разделим обязанности : один RandomOrgBuffer
( RandomOrgBuffer
) будет хранить буфер случайных чисел и возвращать их по запросу. Второй RandomOrgClient
( RandomOrgClient
) едва будет отвечать за выборку новых партий случайных чисел. Когда RandomOrgBuffer
обнаруживает низкий уровень буфера, он просит RandomOrgClient
(отправив сообщение) начать выборку новых случайных чисел. Наконец, когда RandomOrgClient
получает ответ от сервера random.org
, он отправляет новую RandomOrgBuffer
случайных чисел в RandomOrgBuffer
(конечно, снова используя ответное сообщение). RandomOrgBuffer
заполняет буфер новыми числами.
Начнем со второго актера, отвечающего за взаимодействие с веб-сервисом random.org
в фоновом режиме. Этот FetchFromRandomOrg
инициирует HTTP-запрос, когда получает сообщение FetchFromRandomOrg
— сообщение, содержащее требуемый размер пакета для извлечения. Когда приходит ответ, мы анализируем его и отправляем весь пакет отправителю (в данном случае RandomOrgBuffer
). Этот код очень похож на то, что мы уже видели в fetchRandomNumbers():
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
case class FetchFromRandomOrg(batchSize : Int) case class RandomOrgServerResponse(randomNumbers : List[Int]) class RandomOrgClient extends Actor { protected def receive = LoggingReceive { case FetchFromRandomOrg(batchSize) = > val url = new URL( "https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new" ) val connection = url.openConnection() val stream = Source.fromInputStream(connection.getInputStream) sender ! RandomOrgServerResponse(stream.getLines().map( _ .toInt).toList) stream.close() } } |
Теперь пришло время для действующего субъекта обрабатывать запросы от потенциально нескольких клиентов, запрашивающих одно случайное число. К сожалению, логика становится довольно сложной (но по крайней мере нам не нужно беспокоиться о синхронизации и безопасности потоков). Прежде всего, RandomOrgBuffer
теперь должен обрабатывать два разных сообщения: RandomRequest
как и прежде, из кода клиента и новый RandomOrgServerResponse
(см. Код выше), содержащий пакет новых случайных чисел, отправленных из RandomOrgClient
. Во-вторых, RandomOrgBuffer
должен помнить, что он инициировал процесс извлечения новых случайных кодов путем отправки FetchFromRandomOrg
. В противном случае мы рискуем запустить несколько одновременных подключений к random.org
или накапливать их без необходимости.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
class RandomOrgBuffer extends Actor with ActorLogging { val BatchSize = 50 val buffer = new Queue[Int] var waitingForResponse = false val randomOrgClient = context.actorOf(Props[RandomOrgClient], name = "client" ) preFetchIfAlmostEmpty() def receive = { case RandomRequest = > preFetchIfAlmostEmpty() sender ! buffer.dequeue() case RandomOrgServerResponse(randomNumbers) = > buffer ++ = randomNumbers waitingForResponse = false } private def preFetchIfAlmostEmpty() { if (buffer.size < = BatchSize / 4 && !waitingForResponse) { randomOrgClient ! FetchFromRandomOrg(BatchSize) waitingForResponse = true } } } |
Ключевой частью является preFetchIfAlmostEmpty()
который инициирует процесс извлечения случайных чисел (в фоновом режиме, другим актором). Если уровень буфера слишком низкий (я полагаю, 1/4 от размера пакета) и мы еще не ждем ответа от random.org
, мы отправляем соответствующее сообщение в RandomOrgClient
. Мы также вызываем этот метод сразу при запуске (когда буфер полностью пуст), чтобы разогреть буфер при поступлении первого запроса. Обратите внимание, как один актер может создать экземпляр другого актера, вызвав context.actorOf
(на самом деле это ActorRef
).
Этот код содержит огромную ошибку, вы можете ее обнаружить? Представьте, что произойдет, если RandomOrgBuffer
вдруг получит сотни сообщений RandomRequest
одновременно? Больше, чем размер буфера, даже сразу после его заполнения? Или если запрос приходит сразу после создания экземпляра субъекта, когда буфер еще пуст? К сожалению, я должен разобраться с ситуацией, когда приходит RandomRequest
и наш буфер полностью пуст, в то время random.org
ответ random.org
еще не вернулся. Однако мы не можем просто блокировать, ожидая, пока не придет ответ с новой партией случайных чисел. Мы не можем сделать это по одной простой причине: если мы RandomRequest
или ждем другим способом во время обработки RandomRequest
, мы не можем обработать любое другое сообщение, включая RandomOrgServerResponse
— помните, что один RandomOrgServerResponse
может обрабатывать только одно сообщение одновременно! Мы не только введем взаимную блокировку в нашем приложении, но и нарушим одно из самых важных правил в Akka: никогда не блокировать и не спать внутри актера — подробнее об этом в следующих статьях.
Правильный способ справиться с этой ситуацией состоит в том, чтобы иметь очередь ожидающих актеров — которая послала нам RandomRequest
но мы были неспособны немедленно отправить ответ из-за пустого буфера. Как только мы получим обратно RandomOrgServerResponse
, наш приоритет — обработать этих ожидающих участников, а затем сосредоточиться на последующих запросах в режиме реального времени. Есть интересный крайний случай — если число ожидающих актеров было настолько велико, что после выполнения их запросов буфер снова почти пуст, мы немедленно отправляем FetchFromRandomOrg
еще раз. Это может стать еще интереснее — представьте, что буфер пуст, и мы все еще не удовлетворили всех ожидающих актеров. Следующий код обрабатывает все эти повороты:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
class RandomOrgBuffer extends Actor with ActorLogging { val BatchSize = 50 val buffer = new Queue[Int] val backlog = new Queue[ActorRef] var waitingForResponse = false //... def receive = LoggingReceive { case RandomRequest = > preFetchIfAlmostEmpty() if (buffer.isEmpty) { backlog + = sender } else { sender ! buffer.dequeue() } case RandomOrgServerResponse(randomNumbers) = > buffer ++ = randomNumbers waitingForResponse = false while (!backlog.isEmpty && !buffer.isEmpty) { backlog.dequeue() ! buffer.dequeue() } preFetchIfAlmostEmpty() } } |
В нашей окончательной версии очередь невыполненных работ представляет ожидающих участников. Обратите внимание, как обрабатывается RandomOrgServerResponse
: если сначала попытается удовлетворить как можно больше ожидающих участников. Очевидно, что наша цель состояла в том, чтобы исключить чрезвычайно большое время отклика, поэтому мы должны стремиться к минимальному использованию очереди невыполненных работ, поскольку каждый размещенный там субъект с большей вероятностью будет ждать дольше. В идеале очередь RandomOrgBuffer
всегда пуста, и RandomOrgBuffer
корректирует размер пакета, запрашиваемый каждый раз, в зависимости от текущей загрузки (выборка пакетов меньшего или большего размера, а также настройка порога, ниже которого буфер считается почти пустым). Но я оставлю эти улучшения вам. Наконец, мы можем измерить RandomOrgBuffer
отклика RandomOrgBuffer
(линейный масштаб, latencies
больше не распределены неравномерно):
Обработка входящих сообщений в RandomOrgBuffer
стала довольно сложной. Меня особенно беспокоит флаг waitingForResponse
(я ненавижу флаги!). В следующей статье мы узнаем, как с этим справиться очень четко, идиоматично и объектно-ориентировано.
Это был перевод моей статьи « Poznajemy Akka: dwoch aktorow », первоначально опубликованной на scala.net.pl .
Ссылка: два актера — знакомство с Аккой от нашего партнера по JCG Томаша Нуркевича в блоге Java и по соседству .