Статьи

Противодавление в действии с веб-розетками и акка-потоками

Итак, в предыдущей статье я показал, как вы можете создать сервер веб-сокетов, используя akka-streams . В этой последующей статье мы немного подробнее рассмотрим, как работает обратное давление с веб-сокетами (и, возможно, с любым протоколом на основе TCP поверх akka). Чтобы показать вам это, мы будем использовать ту же настройку, что и в статье о визуализации противодавления . Там мы использовали следующие инструменты:

  • akka-mon : некоторые инструменты мониторинга для актеров для отправки событий о метриках.
  • Statsd : Предоставляет UDP API, в который akka-mon может отправлять метрики. Statsd собирает эти метрики и может отправлять их в такие системы, как графит и effxdb.
  • Influxdb : мы используем это для сбора различных метрик.
  • Графана : Графана может визуализировать данные, хранящиеся в infxDB. Мы будем использовать это для создания линейных графиков, которые показывают обратное давление.

В этой статье мы не будем слишком углубляться в детали, для получения дополнительной информации посмотрите предыдущую статью (todo: link).

Где мы оставили

Давайте быстро рассмотрим (часть) сервер веб-сокетов, который мы создали в предыдущей статье:

01
02
03
04
05
06
07
08
09
10
val binding = Http().bindAndHandleSync({
 
  case WSRequest(req@HttpRequest(GET, Uri.Path("/simple"), _, _, _)) => handleWith(req, Flows.reverseFlow)
  case WSRequest(req@HttpRequest(GET, Uri.Path("/echo"), _, _, _)) => handleWith(req, Flows.echoFlow)
  case WSRequest(req@HttpRequest(GET, Uri.Path("/graph"), _, _, _)) => handleWith(req, Flows.graphFlow)
  case WSRequest(req@HttpRequest(GET, Uri.Path("/graphWithSource"), _, _, _)) => handleWith(req, Flows.graphFlowWithExtraSource)
  case WSRequest(req@HttpRequest(GET, Uri.Path("/stats"), _, _, _)) => handleWith(req, Flows.graphFlowWithStats(router, req.getUri().parameter("id")))
  case _: HttpRequest => HttpResponse(400, entity = "Invalid websocket request")
 
}, interface = "localhost", port = 9001)

Для проверки противодавления мы используем маршрут / stats, который мы немного изменили для этого сценария. Этот маршрут предоставляет набор статистики по следующему маршруту:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def graphFlowWithStats(router: ActorRef, id: Option[String]): Flow[Message, Message, Unit] = {
   Flow() { implicit b =>
     import FlowGraph.Implicits._
 
     id match {
       case Some(i) => println(s"Connection received for stats from id: $i")
       case _ => println(s"Connection received for stats no id")
     }
 
     // create an actor source
     val source = Source.actorPublisher[String](Props(classOf[VMStatsPublisher],router, id))
 
     // Graph elements we'll use
     val merge = b.add(Merge[String](2))
     val filter = b.add(Flow[String].filter(_ => false))
 
     // convert to int so we can connect to merge
     val mapMsgToString = b.add(Flow[Message].map[String] { msg => "" })
     val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x)))
 
     val statsSource = b.add(source)
 
     // connect the graph
     mapMsgToString ~> filter ~> merge // this part of the merge will never provide msgs
                  statsSource ~> merge ~> mapStringToMsg
 
     // expose ports
     (mapMsgToString.inlet, mapStringToMsg.outlet)
   }
 }

Вы можете видеть, что на этот раз мы также передаем параметр запроса ‘id’. Мы делаем это так, чтобы нам было легче видеть, какой поток на сервере соответствует конкретному клиенту веб-сокета.

То, что мы хотели бы видеть, — то, что actorPublisher, который мы используем здесь, замедлит отправку сообщений, если клиент не может идти в ногу. Не вдаваясь в подробности actorPublisher, он использовал следующую функцию для доставки своих сообщений:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 * Deliver the message to the subscriber. In the case of websockets over TCP, note
 * that even if we have a slow consumer, we won't notice that immediately. First the
 * buffers will fill up before we get feedback.
 */
@tailrec final def deliver(): Unit = {
  if (totalDemand == 0) {
    id match {
      case Some(i) => println(s"No more demand for $i")
      case _ => println(s"No more demand for: $this")
    }
 
  }
 
  if (queue.size == 0 && totalDemand != 0) {
    // we can response to queueupdated msgs again, since
    // we can't do anything until our queue contains stuff again.
    queueUpdated = false
  } else if (totalDemand > 0 && queue.size > 0) {
    // also send a message to the counter
    exporter.processCounter(s"count.invocation-actorpublisher-${(id.get)}")
    onNext(queue.dequeue())
    deliver()
  }
}

Обратите внимание, что он распечатывается, когда он больше не требует подключения к подключенному источнику (в нашем случае клиент websocket). Так что мы ожидаем, что в определенный момент мы увидим подобные сообщения, когда у нас медленный клиент. Чтобы показать, работает ли противодавление, рассмотрим два типа медленных клиентов. Сценарий один, который имеет быстрое соединение, но требует чрезмерного времени на обработку сообщения, и сценарий два, где сообщение обрабатывается немедленно, но использует очень медленное соединение. В качестве последнего сценария мы запустим первый сценарий, но в этот последний раз мы подключим дюжину клиентов.

Медленный клиент 1: клиенту требуется много времени для обработки сообщения

Для этого первого сценария мы используем простой клиент websocket на основе scala, созданный с использованием библиотеки Java Websocket (link-to-github), который выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.net.URI
  
import akka.actor.Actor.Receive
import org.akkamon.core.ActorStack
import org.akkamon.core.exporters.StatsdExporter
import org.java_websocket.client.WebSocketClient
import org.java_websocket.drafts.{Draft_17}
import org.java_websocket.handshake.ServerHandshake
  
/**
 * A very simple websocket client, which we'll use to simulate a slow client to show backpressure
 * in action with websockets.
 */
object WSClient extends App {
  
  val NumberOfClients = 10;
  val RandomRange = 100;
  val Base = 50;
  
  // create and connect the client
  1 to NumberOfClients foreach({ cnt =>
    val client = new Client(cnt, Math.round(Math.random() * RandomRange + Base))
    Thread.sleep(10);
    client.connect();
    }
  )
  
  // Implement specific callbacks
  class Client(id: Int, delay: Long) extends WebSocketClient(new URI(s"ws://localhost:9001/stats?id=$id"), new Draft_17) {
  
    var count = 0
    val exporter = StatsdExporter
  
    override def onMessage(message: String): Unit = {
      Thread.sleep(delay);
      exporter.processCounter(s"count.invocation-websocketclient-$id")
      count+=1
      if (count % 100 == 0) println(f"$id%2d:onmessage:$count%5d")
    }
  
    override def onClose(code: Int, reason: String, remote: Boolean): Unit = println("Websocket closed")
    override def onOpen(handshakedata: ServerHandshake): Unit = println(s"Websocket openend: delay = $delay")
    override def onError(ex: Exception): Unit = println("Websocket error" + ex);
  }
}

Как вы можете видеть из этого кода, с помощью можно создать несколько клиентов одновременно (10 в этом примере), которые все подключаются к серверу веб-сокетов. Обратите внимание, что мы также передаем идентификатор клиента, чтобы легче соотносить события клиента с сервером, как только мы начнем создавать графики с помощью grafana и просматривать сообщения журнала сервера.

Для нашего первого сценария мы просто воспользуемся одним клиентом веб-сокетов и посмотрим, будет ли обратное давление на стороне сервера. Запуск этого с одним клиентом websocket, который потребляет 20 сообщений в секунду, и сервером, который отправляет 40 сообщений в секунду, приводит к следующему графику:

WS-графана-1

Как вы можете видеть на этом графике, мы обрабатываем клиентом 200 мс за 10 секунд, а сервер отправляет 400 мс за 10 секунд. Вы также можете увидеть, что в определенный момент сервер перестает отправлять сообщения. Это когда противодавление срабатывает. Мы также можем увидеть это в файле журнала сервера:

1
2
3
4
5
6
7
Adding to router: Actor[akka://websockets/user/$a/flow-3718-7-publisherSource-stageFactory-stageFactory-bypassRouter-flexiRoute-stageFactory-stageFactory-Merge-actorPublisherSource#1915924352]
No more demand for 1
No more demand for 1
No more demand for 1
No more demand for 1
No more demand for 1
No more demand for 1

Таким образом, даже если клиент сам не поддерживает все реактивные потоки, мы все равно можем получать прибыль от реактивных потоков. Это работает потому, что стек TCP, используемый Akka-streams, связывается с akka-streams, когда его буфер заполняется. Когда это происходит, стек TCP отправляет издателю сообщение о прекращении отправки сообщений. Как только буфер TCP снова пуст, новые запросы запрашиваются у издателя. Однако это не совсем точная наука, поскольку настройки ОС влияют на объем буферизуемой памяти. Например, для моего ноутбука он установлен так:

1
2
3
4
5
6
net.inet.tcp.doautorcvbuf: 1
net.inet.tcp.autorcvbufincshift: 3
net.inet.tcp.autorcvbufmax: 1048576
net.inet.tcp.doautosndbuf: 1
net.inet.tcp.autosndbufinc: 8192
net.inet.tcp.autosndbufmax: 1048576

Я не знаю подробностей о сетевом стеке BSD, но при условии, что буфер для этого соединения заполняется до максимума как в буфере приема, так и в буфере, он кеширует большое количество сообщений. Если мы берем 5 КБ на сообщение и у нас есть всего 2 МБ буферов для заполнения, может быть 400 сообщений, буферизованных до того, как обратное давление сработает. Это также то, что вы видите, когда оглядываетесь на предыдущее изображение. В начале вы видите публикацию сообщений без перерыва. Это когда буферы ОС заполняются.

Медленный клиент 2: медленное сетевое соединение, прямая обработка сообщений

В следующем сценарии давайте посмотрим, что происходит, когда у нас есть клиент с ограниченной пропускной способностью. Чтобы смоделировать это, мы будем использовать ip_relay ( http://www.stewart.com.au/ip_relay/ ), который немного устарел, но предоставляет отличный и простой способ для формирования трафика и изменения полосы пропускания при выполнении Примеры. Давайте начнем ip_relay:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
Joss-MacBook-Pro:ip_relay-0.71 jos$ ./ip_relay.pl 9002:localhost:9001
  Resolving address (localhost).....
  .... determined as: 127.0.0.1
Useing command line parameters:
  local_port    9002
  remote_addrs  127.0.0.1
  remote_port   9001
  bandwidth 0
  forwarder 99 set.
  
  
ip_relay.pl Version: 0.71
Copyright (C) 1999,2000 Gavin Stewart
  
Passive socket setup on 0.0.0.0:9002
>

Теперь у нас есть локальный порт 9002, который пересылается на другой локальный хост: 9001, где прослушивает наш сервер веб-сокетов. По умолчанию пропускная способность не регулируется:

1
2
3
> show bandwidth
bandwidth   0
>

Но мы можем установить его с помощью следующей команды:

1
2
> set bandwidth 1000
bandwidth   1000

Это означает, что теперь у нас есть дросселированное соединение от порта 9002 к порту 9001 с максимальной пропускной способностью 1 КБ. Теперь мы изменим ws-клиент для подключения к localhost: 9002. Мы также можем использовать ip_relay, чтобы проверить, работает ли он:

01
02
03
04
05
06
07
08
09
10
11
> sh stat
  Total connections: 1
  Bandwidth set to: 1000 bytes / sec.
  Forwarding connections for:
    127.0.0.1:56771 -> 127.0.0.1:9001 (CONN000001)
        Connection Up: 1 mins, 18 secs. Idle: 0 secs.
        Bytes transfered: 78000 in, 163 out.
        Data rate       : 0.98 kB/s in, 0.00 kB/s out.
            (5 sec avg.): 0.92 kB/s in, 0.00 kB/s out.
            (1 min avg.): 0.71 kB/s in, 0.00 kB/s out.
>

На данный момент у нас есть потребитель с очень ограниченной пропускной способностью. Давайте посмотрим на графану и посмотрим, сколько сообщений в секунду она может обработать, и что делает наш отправитель:

WS-графана-2

Как и ожидалось, мы видим очень медленного потребителя и издателя, который после заполнения буфера прекращает отправку. Что происходит, когда мы отключаем ограничитель пропускной способности?

01
02
03
04
05
06
07
08
09
10
11
12
> set bandwidth 0
bandwidth   0
> sh stat
  Total connections: 1
  Bandwidth is not set.
  Forwarding connections for:
    127.0.0.1:56771 -> 127.0.0.1:9001 (CONN000001)
        Connection Up: 5 mins, 38 secs. Idle: 0 secs.
        Bytes transfered: 520000 in, 163 out.
        Data rate       : 1.50 kB/s in, 0.00 kB/s out.
            (5 sec avg.): 23.08 kB/s in, 0.00 kB/s out.
            (1 min avg.): 3.31 kB/s in, 0.00 kB/s out.

Результирующий график выглядит так:

WS-графана-3

Здесь вы можете увидеть, что клиент начинает обрабатывать много сообщений одновременно. Это исходящее буферизованное сообщение с сервера. Как только они обработаны, ставки издателя и клиента совпадают. И когда мы снова включаем ограничитель, скажем, 3000 байтов в секунду:

01
02
03
04
05
06
07
08
09
10
11
12
> set bandwidth 3000
bandwidth   3000
> sh stat
  Total connections: 1
  Bandwidth set to: 3000 bytes / sec.
  Forwarding connections for:
    127.0.0.1:56771 -> 127.0.0.1:9001 (CONN000001)
        Connection Up: 10 mins, 18 secs. Idle: 0 secs.
        Bytes transfered: 3914379 in, 163 out.
        Data rate       : 6.19 kB/s in, 0.00 kB/s out.
            (5 sec avg.): 7.89 kB/s in, 0.00 kB/s out.
            (1 min avg.): 10.61 kB/s in, 0.00 kB/s out.

Противодавление снова срабатывает (после заполнения буфера):

WS-графана-4

Очень приятно то, что нам не нужно беспокоиться о медленных клиентах и ​​медленных соединениях, которые перегружают ресурсы. Все сообщения отправляются неблокирующими и асинхронными, поэтому мы должны иметь возможность обслуживать очень большое количество клиентов с ограниченными ресурсами ЦП и памяти.

В качестве последнего сценария, давайте снова запустим сценарий 1, но на этот раз с несколькими клиентами, каждый из которых имеет собственную случайную задержку.

Медленный клиент 2: медленное сетевое соединение, прямая обработка сообщений

Давайте запустим 10 клиентов веб-сокетов и посмотрим, как сервер отвечает. Мы надеемся увидеть, что один медленный клиент не влияет на скорость, с которой быстрый клиент может обрабатывать сообщения.

Мы настраиваем клиента так:

1
2
3
val NumberOfClients = 10;
val RandomRange = 100;
val Base = 50;

Это означает, что мы запускаем 10 клиентов с базовой задержкой 50 мс и добавлением к ней случайной задержки от 1 до 100. Поскольку у нас много линий и точек данных, давайте сначала покажем скорости обработки сообщений клиентов websocket.

WS-графана-5-клиенты

Как вы можете видеть здесь, у нас сейчас 10 слушателей, каждый из которых получает сообщения с нашего сервера веб-сокетов. Каждый также обрабатывает сообщения с разной скоростью. Теперь давайте посмотрим, что делает серверная часть.

WS-графана-5-сервер

Здесь мы видим что-то очень интересное. Мы видим, что существует гораздо более тесная корреляция с количеством отправленных и обработанных сообщений (игнорируем счетчик по оси Y), чем мы видели ранее. Мы также видим, что в определенный момент спрос полностью останавливается на двух самых медленных подписчиков.

Итак, что мы можем сделать из всего этого. Основные моменты, по крайней мере для меня:

  • Вы можете использовать akka-streams не только внутри виртуальной машины, но и для противодействия TCP-вызовам.
  • Это работает с веб-сокетами, но также будет работать со стандартными вызовами HTTP.
  • Однако вы должны помнить, что на уровне ОС, как на стороне получателя, так и на стороне отправителя, вам приходится иметь дело с буферизацией TCP. В зависимости от вашей ОС, это может очень сильно повлиять на обратное давление.

И, в заключение, я просто должен сказать, что работа с реактивными потоками, а точнее с потоками akka, представляет собой очень большой шаг вперед в создании адаптивных, масштабируемых систем.