Статьи

Противодействие в действии с веб-сокетами и акка-потоками

Итак, в  предыдущей статье я показал, как вы можете создать сервер веб-сокетов, используя akka-streams . В этой последующей статье мы немного подробнее рассмотрим, как работает обратное давление с веб-сокетами (и, возможно, с любым протоколом на основе TCP поверх akka). Чтобы показать вам это, мы будем использовать ту же настройку,  что и в статье о визуализации противодавления . Там мы использовали следующие инструменты:

  • akka-mon : некоторые инструменты мониторинга для актеров для отправки событий о метриках.
  • Statsd : Предоставляет UDP API, в который akka-mon может отправлять метрики. Statsd собирает эти метрики и может отправлять их в такие системы, как графит и effxdb.
  • Influxdb : мы используем это для сбора различных метрик.
  • Графана : Графана может визуализировать данные, хранящиеся в infxDB. Мы будем использовать это для создания линейных графиков, которые показывают обратное давление.

В этой статье мы не будем слишком углубляться в детали, для получения дополнительной информации посмотрите предыдущую статью (todo: link).

Где мы остановились

Давайте быстро рассмотрим (часть) сервер веб-сокетов, который мы создали в предыдущей статье:

  val binding = Http().bindAndHandleSync({
 
    case WSRequest(req@HttpRequest(GET, Uri.Path("/simple"), _, _, _)) => handleWith(req, Flows.reverseFlow)
    case WSRequest(req@HttpRequest(GET, Uri.Path("/echo"), _, _, _)) => handleWith(req, Flows.echoFlow)
    case WSRequest(req@HttpRequest(GET, Uri.Path("/graph"), _, _, _)) => handleWith(req, Flows.graphFlow)
    case WSRequest(req@HttpRequest(GET, Uri.Path("/graphWithSource"), _, _, _)) => handleWith(req, Flows.graphFlowWithExtraSource)
    case WSRequest(req@HttpRequest(GET, Uri.Path("/stats"), _, _, _)) => handleWith(req, Flows.graphFlowWithStats(router, req.getUri().parameter("id")))
    case _: HttpRequest => HttpResponse(400, entity = "Invalid websocket request")
 
  }, interface = "localhost", port = 9001)
Для проверки противодавления мы используем маршрут / stats, который мы немного изменили для этого сценария. Этот маршрут предоставляет набор статистики по следующему маршруту:

 def graphFlowWithStats(router: ActorRef, id: Option[String]): Flow[Message, Message, Unit] = {
    Flow() { implicit b =>
      import FlowGraph.Implicits._
 
      id match {
        case Some(i) => println(s"Connection received for stats from id: $i")
        case _ => println(s"Connection received for stats no id")
      }
 
      // create an actor source
      val source = Source.actorPublisher[String](Props(classOf[VMStatsPublisher],router, id))
 
      // Graph elements we'll use
      val merge = b.add(Merge[String](2))
      val filter = b.add(Flow[String].filter(_ => false))
 
      // convert to int so we can connect to merge
      val mapMsgToString = b.add(Flow[Message].map[String] { msg => "" })
      val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))
 
      val statsSource = b.add(source)
 
      // connect the graph
      mapMsgToString ~> filter ~> merge // this part of the merge will never provide msgs
                   statsSource ~> merge ~> mapStringToMsg
 
      // expose ports
      (mapMsgToString.inlet, mapStringToMsg.outlet)
    }
  }
 
Вы можете видеть, что на этот раз мы также передаем параметр запроса ‘id’. Мы делаем это так, чтобы нам было легче видеть, какой поток на сервере соответствует конкретному клиенту веб-сокета.

То, что мы хотели бы видеть, — то, что actorPublisher, который мы используем здесь, замедлит отправку сообщений, если клиент не может идти в ногу. Не вдаваясь в подробности actorPublisher, он использовал следующую функцию для доставки своих сообщений:

  /**
   * Deliver the message to the subscriber. In the case of websockets over TCP, note
   * that even if we have a slow consumer, we won't notice that immediately. First the
   * buffers will fill up before we get feedback.
   */
  @tailrec final def deliver(): Unit = {
    if (totalDemand == 0) {
      id match {
        case Some(i) => println(s"No more demand for $i")
        case _ => println(s"No more demand for: $this")
      }
 
    }
 
    if (queue.size == 0 && totalDemand != 0) {
      // we can response to queueupdated msgs again, since
      // we can't do anything until our queue contains stuff again.
      queueUpdated = false
    } else if (totalDemand > 0 && queue.size > 0) {
      // also send a message to the counter
      exporter.processCounter(s"count.invocation-actorpublisher-${(id.get)}")
      onNext(queue.dequeue())
      deliver()
    }
  }
Обратите внимание, что он распечатывается, когда он больше не требует подключения к подключенному источнику (в нашем случае клиент websocket). Так что мы ожидаем, что в определенный момент мы увидим подобные сообщения, когда у нас медленный клиент. Чтобы показать, работает ли противодавление, рассмотрим два типа медленных клиентов. Сценарий один, который имеет быстрое соединение, но требует чрезмерного времени на обработку сообщения, и сценарий два, где сообщение обрабатывается немедленно, но использует очень медленное соединение. В качестве последнего сценария мы запустим первый сценарий, но в этот последний раз мы подключим дюжину клиентов.

Медленный клиент 1: клиенту требуется много времени для обработки сообщения

Для этого первого сценария мы используем простой клиент websocket на основе scala, созданный с использованием библиотеки Java Websocket (link-to-github), который выглядит следующим образом:

import java.net.URI
 
import akka.actor.Actor.Receive
import org.akkamon.core.ActorStack
import org.akkamon.core.exporters.StatsdExporter
import org.java_websocket.client.WebSocketClient
import org.java_websocket.drafts.{Draft_17}
import org.java_websocket.handshake.ServerHandshake
 
/**
 * A very simple websocket client, which we'll use to simulate a slow client to show backpressure
 * in action with websockets.
 */
object WSClient extends App {
 
  val NumberOfClients = 10;
  val RandomRange = 100;
  val Base = 50;
 
  // create and connect the client
  1 to NumberOfClients foreach({ cnt =>
    val client = new Client(cnt, Math.round(Math.random() * RandomRange + Base))
    Thread.sleep(10);
    client.connect();
    }
  )
 
  // Implement specific callbacks
  class Client(id: Int, delay: Long) extends WebSocketClient(new URI(s"ws://localhost:9001/stats?id=$id"), new Draft_17) {
 
    var count = 0
    val exporter = StatsdExporter
 
    override def onMessage(message: String): Unit = {
      Thread.sleep(delay);
      exporter.processCounter(s"count.invocation-websocketclient-$id")
      count+=1
      if (count % 100 == 0) println(f"$id%2d:onmessage:$count%5d")
    }
 
    override def onClose(code: Int, reason: String, remote: Boolean): Unit = println("Websocket closed")
    override def onOpen(handshakedata: ServerHandshake): Unit = println(s"Websocket openend: delay = $delay")
    override def onError(ex: Exception): Unit = println("Websocket error" + ex);
  }
}
Как вы можете видеть из этого кода, с помощью можно создать несколько клиентов одновременно (10 в этом примере), которые все подключаются к серверу веб-сокетов. Обратите внимание, что мы также передаем идентификатор клиента, чтобы легче соотносить события клиента с сервером, как только мы начнем создавать графики с помощью grafana и просматривать сообщения журнала сервера.

Для нашего первого сценария мы просто воспользуемся одним клиентом веб-сокетов и посмотрим, будет ли обратное давление на стороне сервера. Запуск этого с одним клиентом websocket, который потребляет 20 сообщений в секунду, и сервером, который отправляет 40 сообщений в секунду, приводит к следующему графику:

WS-графана-1.png

Как вы можете видеть на этом графике, мы обрабатываем клиентом 200 мс за 10 секунд, а сервер отправляет 400 мс за 10 секунд. Вы также можете увидеть, что в определенный момент сервер перестает отправлять сообщения. Это когда противодавление срабатывает. Мы также можем увидеть это в файле журнала сервера:

Adding to router: Actor
  
   
No more demand for 1
No more demand for 1
No more demand for 1
No more demand for 1
No more demand for 1
No more demand for 1
  

Таким образом, даже если клиент сам не поддерживает все реактивные потоки, мы все равно можем получать прибыль от реактивных потоков. Это работает потому, что стек TCP, используемый Akka-streams, связывается с akka-streams, когда его буфер заполняется. Когда это происходит, стек TCP отправляет издателю сообщение о прекращении отправки сообщений. Как только буфер TCP снова пуст, новые запросы запрашиваются у издателя. Однако это не совсем точная наука, поскольку настройки ОС влияют на объем буферизуемой памяти. Например, для моего ноутбука он установлен так:

net.inet.tcp.doautorcvbuf: 1
net.inet.tcp.autorcvbufincshift: 3
net.inet.tcp.autorcvbufmax: 1048576
net.inet.tcp.doautosndbuf: 1
net.inet.tcp.autosndbufinc: 8192
net.inet.tcp.autosndbufmax: 1048576

Я не знаю подробностей о сетевом стеке BSD, но при условии, что буфер для этого соединения заполняется до максимума как при приеме, так и при отправке, он будет кэшировать большое количество сообщений. Если мы берем 5 КБ на сообщение и у нас есть всего 2 МБ буферов для заполнения, может быть 400 сообщений, буферизованных до того, как обратное давление сработает. Это также то, что вы видите, когда оглядываетесь на предыдущее изображение. В начале вы видите публикацию сообщений без перерыва. Это когда буферы ОС заполняются.

Медленный клиент 2: медленное сетевое соединение, прямая обработка сообщений

В следующем сценарии давайте посмотрим, что происходит, когда у нас есть клиент с ограниченной пропускной способностью. Чтобы смоделировать это, мы будем использовать ip_relay ([a href = «http://www.stewart.com.au/ip_relay/» style = «color: rgb (34, 98, 164); стиль контура: нет; text-художественное оформление: нет; «] http://www.stewart.com.au/ip_relay/), который немного староват, но обеспечивает отличный и простой способ формировать трафик и изменять полосу пропускания при выполнении примеров. Давайте начнем ip_relay:

Joss-MacBook-Pro:ip_relay-0.71 jos$ ./ip_relay.pl 9002:localhost:9001
  Resolving address (localhost).....
  .... determined as: 127.0.0.1
Useing command line parameters:
  local_port	9002
  remote_addrs	127.0.0.1
  remote_port	9001
  bandwidth	0
  forwarder 99 set.
 
 
ip_relay.pl Version: 0.71
Copyright (C) 1999,2000 Gavin Stewart
 
Passive socket setup on 0.0.0.0:9002
>

Теперь у нас есть локальный порт 9002, который пересылается на другой локальный хост: 9001, где прослушивает наш сервер веб-сокетов. По умолчанию пропускная способность не регулируется:

> show bandwidth
bandwidth	0
>

Но мы можем установить его с помощью следующей команды:

> set bandwidth 1000
bandwidth	1000

Это означает, что теперь у нас есть дросселированное соединение от порта 9002 к порту 9001 с максимальной пропускной способностью 1 КБ. Теперь мы изменим ws-клиент для подключения к localhost: 9002. Мы также можем использовать ip_relay, чтобы проверить, работает ли он:

> sh stat
  Total connections: 1
  Bandwidth set to: 1000 bytes / sec.
  Forwarding connections for:
    127.0.0.1:56771 -> 127.0.0.1:9001 (CONN000001)
        Connection Up: 1 mins, 18 secs. Idle: 0 secs.
        Bytes transfered: 78000 in, 163 out.
        Data rate       : 0.98 kB/s in, 0.00 kB/s out.
            (5 sec avg.): 0.92 kB/s in, 0.00 kB/s out.
            (1 min avg.): 0.71 kB/s in, 0.00 kB/s out.
>

На данный момент у нас есть потребитель с очень ограниченной пропускной способностью. Давайте посмотрим на графану и посмотрим, сколько сообщений в секунду она может обработать, и что делает наш отправитель:

WS-графана-2.png

Как и ожидалось, мы видим очень медленного потребителя и издателя, который после заполнения буфера прекращает отправку. Что происходит, когда мы отключаем ограничитель пропускной способности?

> set bandwidth 0
bandwidth	0
> sh stat
  Total connections: 1
  Bandwidth is not set.
  Forwarding connections for:
    127.0.0.1:56771 -> 127.0.0.1:9001 (CONN000001)
        Connection Up: 5 mins, 38 secs. Idle: 0 secs.
        Bytes transfered: 520000 in, 163 out.
        Data rate       : 1.50 kB/s in, 0.00 kB/s out.
            (5 sec avg.): 23.08 kB/s in, 0.00 kB/s out.
            (1 min avg.): 3.31 kB/s in, 0.00 kB/s out.

Результирующий график выглядит так:

WS-графана-3.png

Здесь вы можете увидеть, что клиент начинает обрабатывать много сообщений одновременно. Это исходящее буферизованное сообщение с сервера. Как только они обработаны, ставки издателя и клиента совпадают. И когда мы снова включаем ограничитель, скажем, 3000 байтов в секунду:

> set bandwidth 3000
bandwidth	3000
> sh stat
  Total connections: 1
  Bandwidth set to: 3000 bytes / sec.
  Forwarding connections for:
    127.0.0.1:56771 -> 127.0.0.1:9001 (CONN000001)
        Connection Up: 10 mins, 18 secs. Idle: 0 secs.
        Bytes transfered: 3914379 in, 163 out.
        Data rate       : 6.19 kB/s in, 0.00 kB/s out.
            (5 sec avg.): 7.89 kB/s in, 0.00 kB/s out.
            (1 min avg.): 10.61 kB/s in, 0.00 kB/s out.

Противодавление снова срабатывает (после заполнения буфера):

WS-графана-4.png

Очень приятно то, что нам не нужно беспокоиться о медленных клиентах и ​​медленных соединениях, которые перегружают ресурсы. Все сообщения отправляются неблокирующими и асинхронными, поэтому мы должны иметь возможность обслуживать очень большое количество клиентов с ограниченными ресурсами ЦП и памяти.

В качестве последнего сценария, давайте снова запустим сценарий 1, но на этот раз с несколькими клиентами, каждый из которых имеет собственную случайную задержку.

Медленный клиент 2: медленное сетевое соединение, прямая обработка сообщений

Давайте запустим 10 клиентов веб-сокетов и посмотрим, как сервер отвечает. Мы надеемся увидеть, что один медленный клиент не влияет на скорость, с которой быстрый клиент может обрабатывать сообщения.

Мы настраиваем клиента так:

 val NumberOfClients = 10;
  val RandomRange = 100;
  val Base = 50;
Это означает, что мы запускаем 10 клиентов с базовой задержкой 50 мс и добавлением к ней случайной задержки от 1 до 100. Поскольку у нас много линий и точек данных, давайте сначала покажем скорости обработки сообщений клиентов websocket.

WS-графана-5-clients.png

Как вы можете видеть здесь, у нас сейчас 10 слушателей, каждый из которых получает сообщения с нашего сервера веб-сокетов. Каждый также обрабатывает сообщения с разной скоростью. Теперь давайте посмотрим, что делает серверная часть.

WS-графана-5-server.png

Здесь мы видим что-то очень интересное. Мы видим, что существует гораздо более тесная корреляция с количеством отправленных и обработанных сообщений (игнорируем счетчик по оси Y), чем мы видели ранее. Мы также видим, что в определенный момент спрос полностью останавливается на двух самых медленных подписчиков.

Итак, что мы можем сделать из всего этого. Основные моменты, по крайней мере для меня:

  • Вы можете использовать akka-streams не только внутри виртуальной машины, но и для противодействия TCP-вызовам.
  • Это работает с веб-сокетами, но также будет работать со стандартными вызовами HTTP.
  • Однако вам следует помнить, что на уровне ОС, как на стороне получателя, так и на стороне отправителя, вам приходится иметь дело с буферизацией TCP. В зависимости от вашей ОС, это может очень сильно повлиять на обратное давление.

И, в заключение, я просто должен сказать, что работа с реактивными потоками, а точнее с потоками akka, представляет собой очень большой шаг вперед в создании адаптивных, масштабируемых систем.