Статьи

Два актера — открывают Акку

Надеюсь, вам пока весело, но наше приложение имеет серьезный дефект производительности. После измерения времени RandomOrgRandom класса RandomOrgRandom мы разработали в предыдущей части, мы заметим нечто действительно тревожное (диаграмма представляет время отклика последующих вызовов в миллисекундах):

Оказывается, что регулярно время отклика (время возврата одного случайного числа) на несколько порядков больше (логарифмическая шкала!). Вспоминая, как был реализован актер, причина этого совершенно очевидна: наш актер охотно выбирает 50 случайных чисел и заполняет буфер , возвращая один номер за другим. Если буфер пуст, random.org выполняет блокирующий вызов ввода-вывода для веб-службы random.org что занимает около половины секунды. Это хорошо видно на графике — каждый 50-й вызов намного, намного медленнее. В некоторых случаях такое поведение будет приемлемым (точно так же, как непредсказуемая сборка мусора может время от времени увеличивать задержку ответа). Но все же давайте попробуем улучшить нашу реализацию.

Есть ли у вас идеи, как мы можем сделать время отклика более предсказуемым и плоским ? Я предлагаю контролировать размер буфера, и когда он становится опасно близким к пустоте, мы инициируем выборку случайных чисел в фоновом режиме. Мы надеемся, что благодаря этому архитектурному изменению никогда не будет полностью пустым, поскольку фоновый ответ поступит до того, как мы достигнем дна. Тем не менее, мы не обязаны охотно получать слишком много случайных чисел. Вы помните оригинальную наивную реализацию с использованием синхронизации Java? У него была та же проблема, что и у нашей нынешней системы с одним актером. Но теперь мы наконец увидим истинную силу системы Akka, основанной на актерах. Для реализации предложенного выше улучшения нам понадобится фоновый поток для получения чисел и некоторый метод синхронизации в буфере (так как он будет доступен одновременно).

В Akka каждый актер логически однопоточный, поэтому нас не беспокоит синхронизация. Сначала мы разделим обязанности : один RandomOrgBuffer ( RandomOrgBuffer ) будет хранить буфер случайных чисел и возвращать их по запросу. Второй RandomOrgClient ( RandomOrgClient ) едва будет отвечать за выборку новых партий случайных чисел. Когда RandomOrgBuffer обнаруживает низкий уровень буфера, он просит RandomOrgClient (отправив сообщение) начать выборку новых случайных чисел. Наконец, когда RandomOrgClient получает ответ от сервера random.org , он отправляет новую RandomOrgBuffer случайных чисел в RandomOrgBuffer (конечно, снова используя ответное сообщение). RandomOrgBuffer заполняет буфер новыми числами.

Начнем со второго актера, отвечающего за взаимодействие с веб-сервисом random.org в фоновом режиме. Этот FetchFromRandomOrg инициирует HTTP-запрос, когда получает сообщение FetchFromRandomOrg — сообщение, содержащее требуемый размер пакета для извлечения. Когда приходит ответ, мы анализируем его и отправляем весь пакет отправителю (в данном случае RandomOrgBuffer ). Этот код очень похож на то, что мы уже видели в fetchRandomNumbers():

01
02
03
04
05
06
07
08
09
10
11
12
13
14
case class FetchFromRandomOrg(batchSize: Int)
 
case class RandomOrgServerResponse(randomNumbers: List[Int])
 
class RandomOrgClient extends Actor {
  protected def receive = LoggingReceive {
    case FetchFromRandomOrg(batchSize) =>
      val url = new URL("https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new")
      val connection = url.openConnection()
      val stream = Source.fromInputStream(connection.getInputStream)
      sender ! RandomOrgServerResponse(stream.getLines().map(_.toInt).toList)
      stream.close()
    }
  }

Теперь пришло время для действующего субъекта обрабатывать запросы от потенциально нескольких клиентов, запрашивающих одно случайное число. К сожалению, логика становится довольно сложной (но по крайней мере нам не нужно беспокоиться о синхронизации и безопасности потоков). Прежде всего, RandomOrgBuffer теперь должен обрабатывать два разных сообщения: RandomRequest как и прежде, из кода клиента и новый RandomOrgServerResponse (см. Код выше), содержащий пакет новых случайных чисел, отправленных из RandomOrgClient . Во-вторых, RandomOrgBuffer должен помнить, что он инициировал процесс извлечения новых случайных кодов путем отправки FetchFromRandomOrg . В противном случае мы рискуем запустить несколько одновременных подключений к random.org или накапливать их без необходимости.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class RandomOrgBuffer extends Actor with ActorLogging {
 
  val BatchSize = 50
 
  val buffer = new Queue[Int]
  var waitingForResponse = false
 
  val randomOrgClient = context.actorOf(Props[RandomOrgClient], name="client")
  preFetchIfAlmostEmpty()
 
  def receive = {
    case RandomRequest =>
      preFetchIfAlmostEmpty()
      sender ! buffer.dequeue()
    case RandomOrgServerResponse(randomNumbers) =>
      buffer ++= randomNumbers
      waitingForResponse = false
  }
 
  private def preFetchIfAlmostEmpty() {
    if(buffer.size <= BatchSize / 4 && !waitingForResponse) {
      randomOrgClient ! FetchFromRandomOrg(BatchSize)
      waitingForResponse = true
    }
  }
 
}

Ключевой частью является preFetchIfAlmostEmpty() который инициирует процесс извлечения случайных чисел (в фоновом режиме, другим актором). Если уровень буфера слишком низкий (я полагаю, 1/4 от размера пакета) и мы еще не ждем ответа от random.org , мы отправляем соответствующее сообщение в RandomOrgClient . Мы также вызываем этот метод сразу при запуске (когда буфер полностью пуст), чтобы разогреть буфер при поступлении первого запроса. Обратите внимание, как один актер может создать экземпляр другого актера, вызвав context.actorOf (на самом деле это ActorRef ).

Этот код содержит огромную ошибку, вы можете ее обнаружить? Представьте, что произойдет, если RandomOrgBuffer вдруг получит сотни сообщений RandomRequest одновременно? Больше, чем размер буфера, даже сразу после его заполнения? Или если запрос приходит сразу после создания экземпляра субъекта, когда буфер еще пуст? К сожалению, я должен разобраться с ситуацией, когда приходит RandomRequest и наш буфер полностью пуст, в то время random.org ответ random.org еще не вернулся. Однако мы не можем просто блокировать, ожидая, пока не придет ответ с новой партией случайных чисел. Мы не можем сделать это по одной простой причине: если мы RandomRequest или ждем другим способом во время обработки RandomRequest , мы не можем обработать любое другое сообщение, включая RandomOrgServerResponse — помните, что один RandomOrgServerResponse может обрабатывать только одно сообщение одновременно! Мы не только введем взаимную блокировку в нашем приложении, но и нарушим одно из самых важных правил в Akka: никогда не блокировать и не спать внутри актера — подробнее об этом в следующих статьях.

Правильный способ справиться с этой ситуацией состоит в том, чтобы иметь очередь ожидающих актеров — которая послала нам RandomRequest но мы были неспособны немедленно отправить ответ из-за пустого буфера. Как только мы получим обратно RandomOrgServerResponse , наш приоритет — обработать этих ожидающих участников, а затем сосредоточиться на последующих запросах в режиме реального времени. Есть интересный крайний случай — если число ожидающих актеров было настолько велико, что после выполнения их запросов буфер снова почти пуст, мы немедленно отправляем FetchFromRandomOrg еще раз. Это может стать еще интереснее — представьте, что буфер пуст, и мы все еще не удовлетворили всех ожидающих актеров. Следующий код обрабатывает все эти повороты:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class RandomOrgBuffer extends Actor with ActorLogging {
 
  val BatchSize = 50
 
  val buffer = new Queue[Int]
  val backlog = new Queue[ActorRef]
  var waitingForResponse = false
 
  //...
 
  def receive = LoggingReceive {
    case RandomRequest =>
      preFetchIfAlmostEmpty()
      if(buffer.isEmpty) {
        backlog += sender
      } else {
        sender ! buffer.dequeue()
      }
    case RandomOrgServerResponse(randomNumbers) =>
      buffer ++= randomNumbers
      waitingForResponse = false
      while(!backlog.isEmpty && !buffer.isEmpty) {
        backlog.dequeue() ! buffer.dequeue()
      }
      preFetchIfAlmostEmpty()
  }
 
}

В нашей окончательной версии очередь невыполненных работ представляет ожидающих участников. Обратите внимание, как обрабатывается RandomOrgServerResponse : если сначала попытается удовлетворить как можно больше ожидающих участников. Очевидно, что наша цель состояла в том, чтобы исключить чрезвычайно большое время отклика, поэтому мы должны стремиться к минимальному использованию очереди невыполненных работ, поскольку каждый размещенный там субъект с большей вероятностью будет ждать дольше. В идеале очередь RandomOrgBuffer всегда пуста, и RandomOrgBuffer корректирует размер пакета, запрашиваемый каждый раз, в зависимости от текущей загрузки (выборка пакетов меньшего или большего размера, а также настройка порога, ниже которого буфер считается почти пустым). Но я оставлю эти улучшения вам. Наконец, мы можем измерить RandomOrgBuffer отклика RandomOrgBuffer (линейный масштаб, latencies больше не распределены неравномерно):

Обработка входящих сообщений в RandomOrgBuffer стала довольно сложной. Меня особенно беспокоит флаг waitingForResponse (я ненавижу флаги!). В следующей статье мы узнаем, как с этим справиться очень четко, идиоматично и объектно-ориентировано.

Это был перевод моей статьи « Poznajemy Akka: dwoch aktorow », первоначально опубликованной на scala.net.pl .

Ссылка: два актера — знакомство с Аккой от нашего партнера по JCG Томаша Нуркевича в блоге Java и по соседству .