Итак, в предыдущей статье я показал, как вы можете создать сервер веб-сокетов, используя akka-streams . В этой последующей статье мы немного подробнее рассмотрим, как работает обратное давление с веб-сокетами (и, возможно, с любым протоколом на основе TCP поверх akka). Чтобы показать вам это, мы будем использовать ту же настройку, что и в статье о визуализации противодавления . Там мы использовали следующие инструменты:
- akka-mon : некоторые инструменты мониторинга для актеров для отправки событий о метриках.
- Statsd : Предоставляет UDP API, в который akka-mon может отправлять метрики. Statsd собирает эти метрики и может отправлять их в такие системы, как графит и effxdb.
- Influxdb : мы используем это для сбора различных метрик.
- Графана : Графана может визуализировать данные, хранящиеся в infxDB. Мы будем использовать это для создания линейных графиков, которые показывают обратное давление.
В этой статье мы не будем слишком углубляться в детали, для получения дополнительной информации посмотрите предыдущую статью (todo: link).
Где мы оставили
Давайте быстро рассмотрим (часть) сервер веб-сокетов, который мы создали в предыдущей статье:
01
02
03
04
05
06
07
08
09
10
|
val binding = Http().bindAndHandleSync({ case WSRequest(req @HttpRequest (GET, Uri.Path( "/simple" ), _, _, _)) => handleWith(req, Flows.reverseFlow) case WSRequest(req @HttpRequest (GET, Uri.Path( "/echo" ), _, _, _)) => handleWith(req, Flows.echoFlow) case WSRequest(req @HttpRequest (GET, Uri.Path( "/graph" ), _, _, _)) => handleWith(req, Flows.graphFlow) case WSRequest(req @HttpRequest (GET, Uri.Path( "/graphWithSource" ), _, _, _)) => handleWith(req, Flows.graphFlowWithExtraSource) case WSRequest(req @HttpRequest (GET, Uri.Path( "/stats" ), _, _, _)) => handleWith(req, Flows.graphFlowWithStats(router, req.getUri().parameter( "id" ))) case _: HttpRequest => HttpResponse( 400 , entity = "Invalid websocket request" ) }, interface = "localhost" , port = 9001 ) |
Для проверки противодавления мы используем маршрут / stats, который мы немного изменили для этого сценария. Этот маршрут предоставляет набор статистики по следующему маршруту:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
def graphFlowWithStats(router: ActorRef, id: Option[String]): Flow[Message, Message, Unit] = { Flow() { implicit b => import FlowGraph.Implicits._ id match { case Some(i) => println(s "Connection received for stats from id: $i" ) case _ => println(s "Connection received for stats no id" ) } // create an actor source val source = Source.actorPublisher[String](Props(classOf[VMStatsPublisher],router, id)) // Graph elements we'll use val merge = b.add(Merge[String]( 2 )) val filter = b.add(Flow[String].filter(_ => false )) // convert to int so we can connect to merge val mapMsgToString = b.add(Flow[Message].map[String] { msg => "" }) val mapStringToMsg = b.add(Flow[String].map[Message]( x => TextMessage.Strict(x))) val statsSource = b.add(source) // connect the graph mapMsgToString ~> filter ~> merge // this part of the merge will never provide msgs statsSource ~> merge ~> mapStringToMsg // expose ports (mapMsgToString.inlet, mapStringToMsg.outlet) } } |
Вы можете видеть, что на этот раз мы также передаем параметр запроса ‘id’. Мы делаем это так, чтобы нам было легче видеть, какой поток на сервере соответствует конкретному клиенту веб-сокета.
То, что мы хотели бы видеть, — то, что actorPublisher, который мы используем здесь, замедлит отправку сообщений, если клиент не может идти в ногу. Не вдаваясь в подробности actorPublisher, он использовал следующую функцию для доставки своих сообщений:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/** * Deliver the message to the subscriber. In the case of websockets over TCP, note * that even if we have a slow consumer, we won't notice that immediately. First the * buffers will fill up before we get feedback. */ @tailrec final def deliver(): Unit = { if (totalDemand == 0 ) { id match { case Some(i) => println(s "No more demand for $i" ) case _ => println(s "No more demand for: $this" ) } } if (queue.size == 0 && totalDemand != 0 ) { // we can response to queueupdated msgs again, since // we can't do anything until our queue contains stuff again. queueUpdated = false } else if (totalDemand > 0 && queue.size > 0 ) { // also send a message to the counter exporter.processCounter(s "count.invocation-actorpublisher-${(id.get)}" ) onNext(queue.dequeue()) deliver() } } |
Обратите внимание, что он распечатывается, когда он больше не требует подключения к подключенному источнику (в нашем случае клиент websocket). Так что мы ожидаем, что в определенный момент мы увидим подобные сообщения, когда у нас медленный клиент. Чтобы показать, работает ли противодавление, рассмотрим два типа медленных клиентов. Сценарий один, который имеет быстрое соединение, но требует чрезмерного времени на обработку сообщения, и сценарий два, где сообщение обрабатывается немедленно, но использует очень медленное соединение. В качестве последнего сценария мы запустим первый сценарий, но в этот последний раз мы подключим дюжину клиентов.
Медленный клиент 1: клиенту требуется много времени для обработки сообщения
Для этого первого сценария мы используем простой клиент websocket на основе scala, созданный с использованием библиотеки Java Websocket (link-to-github), который выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
import java.net.URI import akka.actor.Actor.Receive import org.akkamon.core.ActorStack import org.akkamon.core.exporters.StatsdExporter import org.java_websocket.client.WebSocketClient import org.java_websocket.drafts.{Draft_17} import org.java_websocket.handshake.ServerHandshake /** * A very simple websocket client, which we'll use to simulate a slow client to show backpressure * in action with websockets. */ object WSClient extends App { val NumberOfClients = 10 ; val RandomRange = 100 ; val Base = 50 ; // create and connect the client 1 to NumberOfClients foreach({ cnt => val client = new Client(cnt, Math.round(Math.random() * RandomRange + Base)) Thread.sleep( 10 ); client.connect(); } ) // Implement specific callbacks class Client(id: Int, delay: Long) extends WebSocketClient( new URI(s "ws://localhost:9001/stats?id=$id" ), new Draft_17) { var count = 0 val exporter = StatsdExporter override def onMessage(message: String): Unit = { Thread.sleep(delay); exporter.processCounter(s "count.invocation-websocketclient-$id" ) count+= 1 if (count % 100 == 0 ) println(f "$id%2d:onmessage:$count%5d" ) } override def onClose(code: Int, reason: String, remote: Boolean): Unit = println( "Websocket closed" ) override def onOpen(handshakedata: ServerHandshake): Unit = println(s "Websocket openend: delay = $delay" ) override def onError(ex: Exception): Unit = println( "Websocket error" + ex); } } |
Как вы можете видеть из этого кода, с помощью можно создать несколько клиентов одновременно (10 в этом примере), которые все подключаются к серверу веб-сокетов. Обратите внимание, что мы также передаем идентификатор клиента, чтобы легче соотносить события клиента с сервером, как только мы начнем создавать графики с помощью grafana и просматривать сообщения журнала сервера.
Для нашего первого сценария мы просто воспользуемся одним клиентом веб-сокетов и посмотрим, будет ли обратное давление на стороне сервера. Запуск этого с одним клиентом websocket, который потребляет 20 сообщений в секунду, и сервером, который отправляет 40 сообщений в секунду, приводит к следующему графику:
Как вы можете видеть на этом графике, мы обрабатываем клиентом 200 мс за 10 секунд, а сервер отправляет 400 мс за 10 секунд. Вы также можете увидеть, что в определенный момент сервер перестает отправлять сообщения. Это когда противодавление срабатывает. Мы также можем увидеть это в файле журнала сервера:
1
2
3
4
5
6
7
|
Adding to router: Actor[akka: //websockets/user/$a/flow-3718-7-publisherSource-stageFactory-stageFactory-bypassRouter-flexiRoute-stageFactory-stageFactory-Merge-actorPublisherSource#1915924352] No more demand for 1 No more demand for 1 No more demand for 1 No more demand for 1 No more demand for 1 No more demand for 1 |
Таким образом, даже если клиент сам не поддерживает все реактивные потоки, мы все равно можем получать прибыль от реактивных потоков. Это работает потому, что стек TCP, используемый Akka-streams, связывается с akka-streams, когда его буфер заполняется. Когда это происходит, стек TCP отправляет издателю сообщение о прекращении отправки сообщений. Как только буфер TCP снова пуст, новые запросы запрашиваются у издателя. Однако это не совсем точная наука, поскольку настройки ОС влияют на объем буферизуемой памяти. Например, для моего ноутбука он установлен так:
1
2
3
4
5
6
|
net.inet.tcp.doautorcvbuf: 1 net.inet.tcp.autorcvbufincshift: 3 net.inet.tcp.autorcvbufmax: 1048576 net.inet.tcp.doautosndbuf: 1 net.inet.tcp.autosndbufinc: 8192 net.inet.tcp.autosndbufmax: 1048576 |
Я не знаю подробностей о сетевом стеке BSD, но при условии, что буфер для этого соединения заполняется до максимума как в буфере приема, так и в буфере, он кеширует большое количество сообщений. Если мы берем 5 КБ на сообщение и у нас есть всего 2 МБ буферов для заполнения, может быть 400 сообщений, буферизованных до того, как обратное давление сработает. Это также то, что вы видите, когда оглядываетесь на предыдущее изображение. В начале вы видите публикацию сообщений без перерыва. Это когда буферы ОС заполняются.
Медленный клиент 2: медленное сетевое соединение, прямая обработка сообщений
В следующем сценарии давайте посмотрим, что происходит, когда у нас есть клиент с ограниченной пропускной способностью. Чтобы смоделировать это, мы будем использовать ip_relay ( http://www.stewart.com.au/ip_relay/ ), который немного устарел, но предоставляет отличный и простой способ для формирования трафика и изменения полосы пропускания при выполнении Примеры. Давайте начнем ip_relay:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
Joss-MacBook-Pro:ip_relay- 0.71 jos$ ./ip_relay.pl 9002 :localhost: 9001 Resolving address (localhost)..... .... determined as: 127.0 . 0.1 Useing command line parameters: local_port 9002 remote_addrs 127.0 . 0.1 remote_port 9001 bandwidth 0 forwarder 99 set. ip_relay.pl Version: 0.71 Copyright (C) 1999 , 2000 Gavin Stewart Passive socket setup on 0.0 . 0.0 : 9002 > |
Теперь у нас есть локальный порт 9002, который пересылается на другой локальный хост: 9001, где прослушивает наш сервер веб-сокетов. По умолчанию пропускная способность не регулируется:
1
2
3
|
> show bandwidth bandwidth 0 > |
Но мы можем установить его с помощью следующей команды:
1
2
|
> set bandwidth 1000 bandwidth 1000 |
Это означает, что теперь у нас есть дросселированное соединение от порта 9002 к порту 9001 с максимальной пропускной способностью 1 КБ. Теперь мы изменим ws-клиент для подключения к localhost: 9002. Мы также можем использовать ip_relay, чтобы проверить, работает ли он:
01
02
03
04
05
06
07
08
09
10
11
|
> sh stat Total connections: 1 Bandwidth set to: 1000 bytes / sec. Forwarding connections for : 127.0 . 0.1 : 56771 -> 127.0 . 0.1 : 9001 (CONN000001) Connection Up: 1 mins, 18 secs. Idle: 0 secs. Bytes transfered: 78000 in, 163 out. Data rate : 0.98 kB/s in, 0.00 kB/s out. ( 5 sec avg.): 0.92 kB/s in, 0.00 kB/s out. ( 1 min avg.): 0.71 kB/s in, 0.00 kB/s out. > |
На данный момент у нас есть потребитель с очень ограниченной пропускной способностью. Давайте посмотрим на графану и посмотрим, сколько сообщений в секунду она может обработать, и что делает наш отправитель:
Как и ожидалось, мы видим очень медленного потребителя и издателя, который после заполнения буфера прекращает отправку. Что происходит, когда мы отключаем ограничитель пропускной способности?
01
02
03
04
05
06
07
08
09
10
11
12
|
> set bandwidth 0 bandwidth 0 > sh stat Total connections: 1 Bandwidth is not set. Forwarding connections for : 127.0 . 0.1 : 56771 -> 127.0 . 0.1 : 9001 (CONN000001) Connection Up: 5 mins, 38 secs. Idle: 0 secs. Bytes transfered: 520000 in, 163 out. Data rate : 1.50 kB/s in, 0.00 kB/s out. ( 5 sec avg.): 23.08 kB/s in, 0.00 kB/s out. ( 1 min avg.): 3.31 kB/s in, 0.00 kB/s out. |
Результирующий график выглядит так:
Здесь вы можете увидеть, что клиент начинает обрабатывать много сообщений одновременно. Это исходящее буферизованное сообщение с сервера. Как только они обработаны, ставки издателя и клиента совпадают. И когда мы снова включаем ограничитель, скажем, 3000 байтов в секунду:
01
02
03
04
05
06
07
08
09
10
11
12
|
> set bandwidth 3000 bandwidth 3000 > sh stat Total connections: 1 Bandwidth set to: 3000 bytes / sec. Forwarding connections for : 127.0 . 0.1 : 56771 -> 127.0 . 0.1 : 9001 (CONN000001) Connection Up: 10 mins, 18 secs. Idle: 0 secs. Bytes transfered: 3914379 in, 163 out. Data rate : 6.19 kB/s in, 0.00 kB/s out. ( 5 sec avg.): 7.89 kB/s in, 0.00 kB/s out. ( 1 min avg.): 10.61 kB/s in, 0.00 kB/s out. |
Противодавление снова срабатывает (после заполнения буфера):
Очень приятно то, что нам не нужно беспокоиться о медленных клиентах и медленных соединениях, которые перегружают ресурсы. Все сообщения отправляются неблокирующими и асинхронными, поэтому мы должны иметь возможность обслуживать очень большое количество клиентов с ограниченными ресурсами ЦП и памяти.
В качестве последнего сценария, давайте снова запустим сценарий 1, но на этот раз с несколькими клиентами, каждый из которых имеет собственную случайную задержку.
Медленный клиент 2: медленное сетевое соединение, прямая обработка сообщений
Давайте запустим 10 клиентов веб-сокетов и посмотрим, как сервер отвечает. Мы надеемся увидеть, что один медленный клиент не влияет на скорость, с которой быстрый клиент может обрабатывать сообщения.
Мы настраиваем клиента так:
1
2
3
|
val NumberOfClients = 10 ; val RandomRange = 100 ; val Base = 50 ; |
Это означает, что мы запускаем 10 клиентов с базовой задержкой 50 мс и добавлением к ней случайной задержки от 1 до 100. Поскольку у нас много линий и точек данных, давайте сначала покажем скорости обработки сообщений клиентов websocket.
Как вы можете видеть здесь, у нас сейчас 10 слушателей, каждый из которых получает сообщения с нашего сервера веб-сокетов. Каждый также обрабатывает сообщения с разной скоростью. Теперь давайте посмотрим, что делает серверная часть.
Здесь мы видим что-то очень интересное. Мы видим, что существует гораздо более тесная корреляция с количеством отправленных и обработанных сообщений (игнорируем счетчик по оси Y), чем мы видели ранее. Мы также видим, что в определенный момент спрос полностью останавливается на двух самых медленных подписчиков.
Итак, что мы можем сделать из всего этого. Основные моменты, по крайней мере для меня:
- Вы можете использовать akka-streams не только внутри виртуальной машины, но и для противодействия TCP-вызовам.
- Это работает с веб-сокетами, но также будет работать со стандартными вызовами HTTP.
- Однако вы должны помнить, что на уровне ОС, как на стороне получателя, так и на стороне отправителя, вам приходится иметь дело с буферизацией TCP. В зависимости от вашей ОС, это может очень сильно повлиять на обратное давление.
И, в заключение, я просто должен сказать, что работа с реактивными потоками, а точнее с потоками akka, представляет собой очень большой шаг вперед в создании адаптивных, масштабируемых систем.
Ссылка: | Противодействие в действии с веб-сокетами и акка-потоками от нашего партнера JCG Йоса Дирксена в блоге Smart Java . |