Статьи

Два Актера — Открывая Акку

 Надеюсь, вам пока весело, но наше приложение имеет серьезный дефект производительности. После измерения времени отклика RandomOrgRandomкласса, который мы разработали в предыдущей части, мы заметим что-то действительно тревожное (диаграмма представляет время отклика последующих вызовов в миллисекундах):


Оказывается, что регулярно время отклика (время возврата одного случайного числа) на несколько порядков больше (логарифмическая шкала!). Вспоминая, как был реализован актер, причина этого совершенно очевидна: наш актер охотно выбирает 50 случайных чисел и заполняет буфер , возвращая один номер за другим. Если буфер пуст, субъект выполняет блокирующий вызов ввода-вывода для
random.orgвеб-службы, что занимает около половины секунды. Это хорошо видно на графике — каждый 50-й вызов намного, намного медленнее. В некоторых случаях такое поведение будет приемлемым (точно так же, как непредсказуемая сборка мусора может время от времени увеличивать задержку ответа). Но все же давайте попробуем улучшить нашу реализацию.

Есть ли у вас идеи, как мы можем сделать время отклика более предсказуемым и
плоским? Я предлагаю контролировать размер буфера, и когда он становится опасно близким к пустоте, мы инициируем выборку случайных чисел в фоновом режиме. Мы надеемся, что благодаря этому архитектурному изменению никогда не будет полностью пустым, поскольку фоновый ответ поступит до того, как мы достигнем дна. Тем не менее, мы не обязаны охотно получать слишком много случайных чисел. Вы помните оригинальную наивную реализацию с использованием синхронизации Java? У него была та же проблема, что и у нашей нынешней системы с одним актером. Но теперь мы наконец увидим истинную силу системы Akka, основанной на актерах. Для реализации предложенного выше улучшения нам понадобится фоновый поток для получения чисел и некоторый метод синхронизации в буфере (так как он будет доступен одновременно).

В Akka каждый актер логически однопоточный, поэтому нас не беспокоит синхронизация. Сначала мы
разделим обязанности : один actor (
RandomOrgBuffer) будет хранить буфер случайных чисел и возвращать их по запросу. Второй актер (
RandomOrgClient) вряд ли будет отвечать за выборку новых серий случайных чисел. Когда
RandomOrgBufferобнаруживается низкий уровень буфера, он просит
RandomOrgClient(отправив сообщение) начать выборку новых случайных чисел. Наконец, когда
RandomOrgClientполучает ответ от
random.orgсервера, он отправляет новую партию случайных чисел
RandomOrgBuffer(конечно, снова используя ответное сообщение).
RandomOrgBufferзаполняет буфер новыми номерами.

Начнем со второго актера, ответственного за общение с
random.orgвеб-сервис в фоновом режиме. Этот субъект инициирует HTTP-запрос, когда получает
FetchFromRandomOrgсообщение — сообщение, содержащее желаемый размер пакета для извлечения. Когда приходит ответ, мы анализируем его и отправляем всю партию обратно отправителю (
RandomOrgBufferв данном случае). Этот код очень похож на то, что мы уже видели в
fetchRandomNumbers():

case class FetchFromRandomOrg(batchSize: Int)
 
case class RandomOrgServerResponse(randomNumbers: List[Int])
 
class RandomOrgClient extends Actor {
  protected def receive = LoggingReceive {
    case FetchFromRandomOrg(batchSize) =>
      val url = new URL("https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new")
      val connection = url.openConnection()
      val stream = Source.fromInputStream(connection.getInputStream)
      sender ! RandomOrgServerResponse(stream.getLines().map(_.toInt).toList)
      stream.close()
    }
  }

Теперь пришло время для действующего субъекта обрабатывать запросы от потенциально нескольких клиентов, запрашивающих одно случайное число. К сожалению, логика становится довольно сложной (но по крайней мере нам не нужно беспокоиться о синхронизации и безопасности потоков). Прежде всего,
RandomOrgBufferтеперь нужно обработать два разных сообщения:
RandomRequestкак прежде, чем пришло из кода клиента, так и new
RandomOrgServerResponse(см. Код выше), содержащий пакет новых случайных чисел, отправленных с
RandomOrgClient. Во-вторых,
RandomOrgBufferнужно помнить, что он инициировал процесс получения новых случайных кодов путем отправки
FetchFromRandomOrg. В противном случае мы рискуем запустить несколько одновременных соединений
random.orgили накапливать их без необходимости.

class RandomOrgBuffer extends Actor with ActorLogging {
 
  val BatchSize = 50
 
  val buffer = new Queue[Int]
  var waitingForResponse = false
 
  val randomOrgClient = context.actorOf(Props[RandomOrgClient], name="client")
  preFetchIfAlmostEmpty()
 
  def receive = {
    case RandomRequest =>
      preFetchIfAlmostEmpty()
      sender ! buffer.dequeue()
    case RandomOrgServerResponse(randomNumbers) =>
      buffer ++= randomNumbers
      waitingForResponse = false
  }
 
  private def preFetchIfAlmostEmpty() {
    if(buffer.size <= BatchSize / 4 && !waitingForResponse) {
      randomOrgClient ! FetchFromRandomOrg(BatchSize)
      waitingForResponse = true
    }
  }
 
}

Ключевой частью является
preFetchIfAlmostEmpty()метод, который инициирует процесс извлечения случайных чисел (в фоновом режиме, другим субъектом). Если уровень буфера слишком низкий (я полагаю, 1/4 от размера пакета), и мы еще не ждем ответа от
random.org, мы отправляем соответствующее сообщение
RandomOrgClient. Мы также вызываем этот метод сразу при запуске (когда буфер полностью пуст), чтобы разогреть буфер при поступлении первого запроса. Обратите внимание, как один актер может
создать экземпляр другого актера, вызвав
context.actorOf(на самом деле это
ActorRef).

Этот код содержит огромную ошибку, вы можете ее обнаружить? Представьте, что произойдет, если вдруг
RandomOrgBufferполучит сотни
RandomRequestсообщения одновременно? Больше, чем размер буфера, даже сразу после его заполнения? Или если запрос приходит сразу после создания экземпляра субъекта, когда буфер еще пуст? К сожалению, я должен разобраться с ситуацией, когда
RandomRequestприходит, и наш буфер полностью пуст, в то время как
random.orgответ еще не вернулся. Однако мы не можем просто блокировать, ожидая, пока не придет ответ с новой партией случайных чисел. Мы не можем сделать это по одной простой причине: если мы спим или ждем во время обработки другим способом
RandomRequest, мы не можем обработать любое другое сообщение, включая
RandomOrgServerResponse— помните, что один актер может обрабатывать только одно сообщение одновременно! Мы не только введем взаимную блокировку в нашем приложении, но и нарушим одно из самых важных правил в Akka: никогда не блокировать и не спать внутри актера — подробнее об этом в следующих статьях.

Правильный способ справиться с этой ситуацией состоит в том, чтобы иметь очередь ожидающих актеров — которая отправила нас,
RandomRequestно мы были неспособны немедленно отправить ответ из-за пустого буфера. Как только мы
RandomOrgServerResponseвернемся, наш приоритет — обработать этих ожидающих участников, а затем сосредоточиться на последующих запросах в режиме реального времени. Существует интересный крайний случай — если число ожидающих актеров было настолько велико, что после выполнения их запросов буфер снова почти пуст, мы отправляем
FetchFromRandomOrgнемедленно еще раз. Это может стать еще интереснее — представьте, что буфер пуст, и мы все еще не удовлетворили всех ожидающих актеров. Следующий код обрабатывает все эти повороты:

class RandomOrgBuffer extends Actor with ActorLogging {
 
  val BatchSize = 50
 
  val buffer = new Queue[Int]
  val backlog = new Queue[ActorRef]
  var waitingForResponse = false
 
  //...
 
  def receive = LoggingReceive {
    case RandomRequest =>
      preFetchIfAlmostEmpty()
      if(buffer.isEmpty) {
        backlog += sender
      } else {
        sender ! buffer.dequeue()
      }
    case RandomOrgServerResponse(randomNumbers) =>
      buffer ++= randomNumbers
      waitingForResponse = false
      while(!backlog.isEmpty && !buffer.isEmpty) {
        backlog.dequeue() ! buffer.dequeue()
      }
      preFetchIfAlmostEmpty()
  }
 
}

В нашей окончательной версии
backlogочередь представляет ожидающих актеров. Обратите внимание, как
RandomOrgServerResponseобрабатывается: если сначала попытаться удовлетворить как можно больше ожидающих актеров. Очевидно, что наша цель состояла в том, чтобы исключить чрезвычайно большое время отклика, поэтому мы должны стремиться к минимальному
backlogиспользованию очереди, поскольку каждый размещенный там актер с большей вероятностью будет ждать дольше. В идеале
backlogочередь всегда пуста и
RandomOrgBufferкорректирует размер пакета, запрашиваемый каждый раз, в зависимости от текущей загрузки (выборка пакетов меньшего или большего размера, а также настройка порога, ниже которого буфер считается почти пустым). Но я оставлю эти улучшения вам. Наконец, мы можем измерить
RandomOrgBufferвремя отклика (линейная шкала, задержки больше не распределены неравномерно):

Обработка входящих сообщений
RandomOrgBufferстала довольно сложной. Меня особенно беспокоит
waitingForResponseфлаг (я ненавижу флаги!). В следующей статье мы узнаем, как с этим справиться очень четко, идиоматично и объектно-ориентировано.