Реактивные потоки — это недавно объявленная инициатива по созданию стандарта для асинхронной обработки потоков со встроенным противодавлением в JVM. Рабочая группа состоит из таких компаний, как Typesafe, Red Hat, Oracle, Netflix и других.
Одна из ранних экспериментальных реализаций основана на Akka . Предварительная версия 0.3 включает производителей и потребителей актеров, что открывает новые возможности для интеграции.
Чтобы протестировать новую технологию, я реализовал очень простую Реактивную очередь сообщений . Код находится на стадии PoC, не обрабатывает ошибки и тому подобное, но при правильном использовании — работает!
Очередь является реагирующей, что означает, что сообщения будут доставляться заинтересованным сторонам в любое время без запроса. Обратное давление применяется как при отправке сообщений (чтобы отправители не подавляли посредника), так и при получении сообщений (чтобы посредник отправлял только столько сообщений, сколько могут получить получатели).
Посмотрим, как это работает!
Очередь
Во-первых, сама очередь является субъектом и ничего не знает о (реактивных) потоках. Код находится в пакете com.reactmq.queue
. Актер принимает следующие актер-сообщения (здесь термин «сообщение» перегружен, поэтому я буду использовать обычное «сообщение», чтобы обозначать сообщения, которые мы отправляем и получаем из очереди, а «актер-сообщения» — Scala). экземпляры классов, отправленные актерам):
-
SendMessage(content)
— отправляет сообщение с указанным содержимымString
. Ответ (SentMessage(id)
) отправляется обратно отправителю с идентификатором сообщения -
ReceiveMessages(count)
— сигнализирует о том, что отправитель (ReceiveMessages(count)
) хотел бы получить доcount
сообщений. Количество суммируется с ранее заявленным спросом. -
DeleteMessage(id)
— неудивительно, удаляет сообщение
Реализация очереди — это упрощенная версия того, что есть в ElasticMQ . После получения сообщения, если оно не было удалено (подтверждено) в течение 10 секунд, оно снова становится доступным для приема.
Когда ReceiveMessages
сигнализирует о требовании к сообщениям (отправляя ReceiveMessages
очереди), он должен ожидать любое количество ответов на сообщения актера ReceivedMessages(msgs)
, содержащих полученные данные.
Идя реактивный
Чтобы создать и протестировать нашу реактивную очередь, нам нужны три приложения:
Мы можем запустить любое количество Senders
и Receivers
, но, конечно, мы должны запустить только одного Broker
.
Первое, что нам нужно сделать, это соединить Sender
с Broker
и Receiver
с Broker
по сети. Мы можем сделать это с помощью расширения Akka IO и реактивного расширения TCP. Используя пару connect
& bind
, мы получаем поток соединений на стороне привязки:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
// sender: val connectFuture = IO(StreamTcp) ? StreamTcp.Connect(settings, sendServerAddress) connectFuture.onSuccess { case binding : StreamTcp.OutgoingTcpConnection = > logger.info( "Sender: connected to broker" ) // per-connection logic } // broker: val bindSendFuture = IO(StreamTcp) ? StreamTcp.Bind(settings, sendServerAddress) bindSendFuture.onSuccess { case serverBinding : StreamTcp.TcpServerBinding = > logger.info( "Broker: send bound" ) Flow(serverBinding.connectionStream).foreach { conn = > // per-connection logic }.consume(materializer) } |
Существует другой адрес для отправки и получения сообщений.
Отправитель
Давайте сначала посмотрим на логику соединения для Sender
.
1
2
3
4
5
6
7
|
Flow( 1 .second, () = > { idx + = 1 ; s "Message $idx from $senderName" }) .map { msg = > logger.debug(s "Sender: sending $msg" ) createFrame(msg) } .toProducer(materializer) .produceTo(binding.outputStream) |
Мы создаем тик-поток, который каждую секунду генерирует новое сообщение (очень удобно для тестирования). Используя преобразователь потока map
, мы создаем байтовый кадр с сообщением (подробнее об этом позже). Но это только описание того, как должен выглядеть наш (очень простой) поток; его необходимо материализовать с toProducer
метода toProducer
, который обеспечит конкретные реализации узлов преобразования потока. В настоящее время есть только один FlowMaterializer
, который — что неудивительно — использует актеров Akka под капотом, чтобы фактически создать поток и поток.
Наконец, мы подключаем только что созданного производителя к outputStream
привязки TCP, который оказывается потребителем. И теперь у нас есть реактивный поток сообщений по сети, что означает, что сообщения будут отправляться только тогда, когда Broker
сможет их принять. В противном случае обратное давление будет применяться вплоть до производителя тиков.
Брокер: отправка сообщений
На другой стороне сети сидит Broker
. Давайте посмотрим, что происходит, когда приходит сообщение.
01
02
03
04
05
06
07
08
09
10
11
12
|
Flow(serverBinding.connectionStream).foreach { conn = > logger.info(s "Broker: send client connected (${conn.remoteAddress})" ) val sendToQueueConsumer = ActorConsumer[String]( system.actorOf(Props( new SendToQueueConsumer(queueActor)))) // sending messages to the queue, receiving from the client val reconcileFrames = new ReconcileFrames() Flow(conn.inputStream) .mapConcat(reconcileFrames.apply) .produceTo(materializer, sendToQueueConsumer) }.consume(materializer) |
Сначала мы создаем Flow
из входного потока соединения — это будет входящий поток байтов. Затем мы воссоздаем экземпляры String
которые были отправлены с использованием нашего кадрирования, и, наконец, направляем этот поток потребителю отправки в очередь.
SendToQueueConsumer
— это мост для каждого соединения с SendToQueueConsumer
главной очереди. Он использует признак ActorConsumer
из реализации ActorConsumer
Reactive Streams для автоматического управления запросом, который должен быть передан в восходящем направлении. Используя эту черту, мы можем создать Consumer[_]
с реактивным потоком, поддерживаемый актером — так что полностью настраиваемый приемник.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
class SendToQueueConsumer(queueActor : ActorRef) extends ActorConsumer { private var inFlight = 0 override protected def requestStrategy = new MaxInFlightRequestStrategy( 10 ) { override def inFlightInternally = inFlight } override def receive = { case OnNext(msg : String) = > queueActor ! SendMessage(msg) inFlight + = 1 case SentMessage( _ ) = > inFlight - = 1 } } |
То, что необходимо предоставить ActorConsumer
, — это способ измерения количества потоковых элементов, которые в настоящее время обрабатываются. Здесь мы подсчитываем количество сообщений, которые были отправлены в очередь, но для которых мы еще не получили идентификатор (поэтому они обрабатываются в очереди).
Потребитель получает новые сообщения, завернутые в OnNext
актера OnNext
; поэтому OnNext
отправляется OnNext
потоком, а SentMessage
отправляется в ответ на SendMessage
SentMessage
очереди.
получающий
Приемная часть выполняется аналогичным образом, хотя требует некоторых дополнительных шагов. Во-первых, если вы посмотрите на Receiver
, вы увидите, что мы читаем байты из входного потока, воссоздаем сообщения из фреймов и отправляем обратно идентификаторы, следовательно, подтверждаем сообщение. В действительности, мы могли бы запустить некоторую логику обработки сообщений между получением сообщения и отправкой идентификатора.
На стороне Broker
мы создаем два потока для каждого соединения.
Один — это поток сообщений, отправленных получателям, другой — поток подтвержденных идентификаторов сообщений от получателей, которые просто преобразуются для отправки сообщений DeleteMessage
очереди.
Подобно потребителю, нам нужен мост для каждого соединения от субъекта очереди к потоку. Это реализовано в ReceiveFromQueueProducer
. Здесь мы расширяем черту ActorProducer
, которая позволяет полностью контролировать процесс фактического создания сообщений, поступающих в поток.
В этом акторе поток-запросчик посылает сообщение- Request
для запроса. Когда есть спрос, мы запрашиваем сообщения из очереди. В конечном итоге очередь ответит одним или несколькими сообщениями- ReceivedMessages
(когда в очереди есть какие-либо сообщения); так как количество сообщений никогда не превысит сигнализируемый спрос, мы можем безопасно вызвать метод ActorProducer.onNext
, который отправляет данные элементы в нисходящий ActorProducer.onNext
.
обрамление
Одна небольшая деталь заключается в том, что нам нужен собственный протокол кадрирования (спасибо Роланду Куну за разъяснения ), поскольку поток TCP — это просто поток байтов, поэтому мы можем получить произвольные фрагменты данных, которые необходимо будет повторно объединить позже. К счастью, реализация такого кадрирования довольно проста — см. Класс Framing
. Каждый кадр состоит из размера сообщения и самого сообщения.
Подводя итоги
Используя Reactive Streams и реализацию Akka , очень легко создавать реактивные приложения с сквозным противодавлением. Приведенная выше очередь, несмотря на отсутствие множества функций и проверок, не позволит перегружать Broker
Senders
, а с другой стороны — Broker
будет перегружаться Receivers
. И все это без необходимости писать какой-либо код обработки противодавления!
Ссылка: | Реактивная очередь с Akka Reactive Streams от нашего партнера JCG Адама Варски в блоге Блог Адама Варски . |