Реактивные потоки — это недавно объявленная инициатива по созданию стандарта для асинхронной обработки потоков со встроенным противодавлением в JVM. Рабочая группа состоит из таких компаний, как Typesafe, Red Hat, Oracle, Netflix и других.
Одна из ранних экспериментальных реализаций основана на Akka . Предварительная версия 0.3 включает производителей и потребителей актеров, что открывает новые возможности для интеграции.
Чтобы протестировать новую технологию, я реализовал очень простую Реактивную очередь сообщений . Код находится на стадии PoC, не обрабатывает ошибки и тому подобное, но при правильном использовании — работает!
Очередь является реагирующей, что означает, что сообщения будут доставляться заинтересованным сторонам в любое время без запроса. Обратное давление применяется как при отправке сообщений (чтобы отправители не подавляли посредника), так и при получении сообщений (чтобы посредник отправлял только столько сообщений, сколько могут получить получатели).
Посмотрим, как это работает!
Очередь
Во-первых, сама очередь является субъектом и ничего не знает о (реактивных) потоках. Код находится в пакете com.reactmq.queue . Актер принимает следующие актер-сообщения (здесь термин «сообщение» перегружен, поэтому я буду использовать обычное «сообщение», чтобы обозначать сообщения, которые мы отправляем и получаем из очереди, а «актер-сообщения» — Scala). экземпляры классов, отправленные актерам):
-
SendMessage(content)— отправляет сообщение с указанным содержимымString. Ответ (SentMessage(id)) отправляется обратно отправителю с идентификатором сообщения -
ReceiveMessages(count)— сигнализирует о том, что отправитель (ReceiveMessages(count)) хотел бы получить доcountсообщений. Количество суммируется с ранее заявленным спросом. -
DeleteMessage(id)— неудивительно, удаляет сообщение
Реализация очереди — это упрощенная версия того, что есть в ElasticMQ . После получения сообщения, если оно не было удалено (подтверждено) в течение 10 секунд, оно снова становится доступным для приема.
Когда ReceiveMessages сигнализирует о требовании к сообщениям (отправляя ReceiveMessages очереди), он должен ожидать любое количество ответов на сообщения актера ReceivedMessages(msgs) , содержащих полученные данные.
Идя реактивный
Чтобы создать и протестировать нашу реактивную очередь, нам нужны три приложения:
Мы можем запустить любое количество Senders и Receivers , но, конечно, мы должны запустить только одного Broker .
Первое, что нам нужно сделать, это соединить Sender с Broker и Receiver с Broker по сети. Мы можем сделать это с помощью расширения Akka IO и реактивного расширения TCP. Используя пару connect & bind , мы получаем поток соединений на стороне привязки:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
// sender:val connectFuture = IO(StreamTcp) ? StreamTcp.Connect(settings, sendServerAddress) connectFuture.onSuccess { case binding: StreamTcp.OutgoingTcpConnection => logger.info("Sender: connected to broker") // per-connection logic} // broker:val bindSendFuture = IO(StreamTcp) ? StreamTcp.Bind(settings, sendServerAddress) bindSendFuture.onSuccess { case serverBinding: StreamTcp.TcpServerBinding => logger.info("Broker: send bound") Flow(serverBinding.connectionStream).foreach { conn => // per-connection logic }.consume(materializer)} |
Существует другой адрес для отправки и получения сообщений.
Отправитель
Давайте сначала посмотрим на логику соединения для Sender .
|
1
2
3
4
5
6
7
|
Flow(1.second, () => { idx += 1; s"Message $idx from $senderName" }) .map { msg => logger.debug(s"Sender: sending $msg") createFrame(msg) } .toProducer(materializer) .produceTo(binding.outputStream) |
Мы создаем тик-поток, который каждую секунду генерирует новое сообщение (очень удобно для тестирования). Используя преобразователь потока map , мы создаем байтовый кадр с сообщением (подробнее об этом позже). Но это только описание того, как должен выглядеть наш (очень простой) поток; его необходимо материализовать с toProducer метода toProducer , который обеспечит конкретные реализации узлов преобразования потока. В настоящее время есть только один FlowMaterializer , который — что неудивительно — использует актеров Akka под капотом, чтобы фактически создать поток и поток.
Наконец, мы подключаем только что созданного производителя к outputStream привязки TCP, который оказывается потребителем. И теперь у нас есть реактивный поток сообщений по сети, что означает, что сообщения будут отправляться только тогда, когда Broker сможет их принять. В противном случае обратное давление будет применяться вплоть до производителя тиков.
Брокер: отправка сообщений
На другой стороне сети сидит Broker . Давайте посмотрим, что происходит, когда приходит сообщение.
|
01
02
03
04
05
06
07
08
09
10
11
12
|
Flow(serverBinding.connectionStream).foreach { conn => logger.info(s"Broker: send client connected (${conn.remoteAddress})") val sendToQueueConsumer = ActorConsumer[String]( system.actorOf(Props(new SendToQueueConsumer(queueActor)))) // sending messages to the queue, receiving from the client val reconcileFrames = new ReconcileFrames() Flow(conn.inputStream) .mapConcat(reconcileFrames.apply) .produceTo(materializer, sendToQueueConsumer)}.consume(materializer) |
Сначала мы создаем Flow из входного потока соединения — это будет входящий поток байтов. Затем мы воссоздаем экземпляры String которые были отправлены с использованием нашего кадрирования, и, наконец, направляем этот поток потребителю отправки в очередь.
SendToQueueConsumer — это мост для каждого соединения с SendToQueueConsumer главной очереди. Он использует признак ActorConsumer из реализации ActorConsumer Reactive Streams для автоматического управления запросом, который должен быть передан в восходящем направлении. Используя эту черту, мы можем создать Consumer[_] с реактивным потоком, поддерживаемый актером — так что полностью настраиваемый приемник.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
class SendToQueueConsumer(queueActor: ActorRef) extends ActorConsumer { private var inFlight = 0 override protected def requestStrategy = new MaxInFlightRequestStrategy(10) { override def inFlightInternally = inFlight } override def receive = { case OnNext(msg: String) => queueActor ! SendMessage(msg) inFlight += 1 case SentMessage(_) => inFlight -= 1 }} |
То, что необходимо предоставить ActorConsumer , — это способ измерения количества потоковых элементов, которые в настоящее время обрабатываются. Здесь мы подсчитываем количество сообщений, которые были отправлены в очередь, но для которых мы еще не получили идентификатор (поэтому они обрабатываются в очереди).
Потребитель получает новые сообщения, завернутые в OnNext актера OnNext ; поэтому OnNext отправляется OnNext потоком, а SentMessage отправляется в ответ на SendMessage SentMessage очереди.
получающий
Приемная часть выполняется аналогичным образом, хотя требует некоторых дополнительных шагов. Во-первых, если вы посмотрите на Receiver , вы увидите, что мы читаем байты из входного потока, воссоздаем сообщения из фреймов и отправляем обратно идентификаторы, следовательно, подтверждаем сообщение. В действительности, мы могли бы запустить некоторую логику обработки сообщений между получением сообщения и отправкой идентификатора.
На стороне Broker мы создаем два потока для каждого соединения.
Один — это поток сообщений, отправленных получателям, другой — поток подтвержденных идентификаторов сообщений от получателей, которые просто преобразуются для отправки сообщений DeleteMessage очереди.
Подобно потребителю, нам нужен мост для каждого соединения от субъекта очереди к потоку. Это реализовано в ReceiveFromQueueProducer . Здесь мы расширяем черту ActorProducer , которая позволяет полностью контролировать процесс фактического создания сообщений, поступающих в поток.
В этом акторе поток-запросчик посылает сообщение- Request для запроса. Когда есть спрос, мы запрашиваем сообщения из очереди. В конечном итоге очередь ответит одним или несколькими сообщениями- ReceivedMessages (когда в очереди есть какие-либо сообщения); так как количество сообщений никогда не превысит сигнализируемый спрос, мы можем безопасно вызвать метод ActorProducer.onNext , который отправляет данные элементы в нисходящий ActorProducer.onNext .
обрамление
Одна небольшая деталь заключается в том, что нам нужен собственный протокол кадрирования (спасибо Роланду Куну за разъяснения ), поскольку поток TCP — это просто поток байтов, поэтому мы можем получить произвольные фрагменты данных, которые необходимо будет повторно объединить позже. К счастью, реализация такого кадрирования довольно проста — см. Класс Framing . Каждый кадр состоит из размера сообщения и самого сообщения.
Подводя итоги
Используя Reactive Streams и реализацию Akka , очень легко создавать реактивные приложения с сквозным противодавлением. Приведенная выше очередь, несмотря на отсутствие множества функций и проверок, не позволит перегружать Broker Senders , а с другой стороны — Broker будет перегружаться Receivers . И все это без необходимости писать какой-либо код обработки противодавления!
| Ссылка: | Реактивная очередь с Akka Reactive Streams от нашего партнера JCG Адама Варски в блоге Блог Адама Варски . |

