Статьи

Реактивная очередь с реактивными потоками Akka

Реактивные потоки — это недавно объявленная инициатива по созданию стандарта для асинхронной обработки потоков со встроенным противодавлением в JVM. Рабочая группа состоит из таких компаний, как Typesafe, Red Hat, Oracle, Netflix и других.

Одна из ранних экспериментальных реализаций основана на Akka . Предварительная версия 0.3 включает производителей и потребителей актеров, что открывает новые возможности для интеграции.

iStock_000040449260Large-300x198

Чтобы протестировать новую технологию, я реализовал очень простую Реактивную очередь сообщений . Код находится на стадии PoC, не обрабатывает ошибки и тому подобное, но при правильном использовании — работает!

Очередь является реагирующей, что означает, что сообщения будут доставляться заинтересованным сторонам в любое время без запроса. Обратное давление применяется как при отправке сообщений (чтобы отправители не подавляли посредника), так и при получении сообщений (чтобы посредник отправлял только столько сообщений, сколько могут получить получатели).

Посмотрим, как это работает!

Очередь

Во-первых, сама очередь является субъектом и ничего не знает о (реактивных) потоках. Код находится в пакете com.reactmq.queue . Актер принимает следующие актер-сообщения (здесь термин «сообщение» перегружен, поэтому я буду использовать обычное «сообщение», чтобы обозначать сообщения, которые мы отправляем и получаем из очереди, а «актер-сообщения» — Scala). экземпляры классов, отправленные актерам):

  • SendMessage(content) — отправляет сообщение с указанным содержимым String . Ответ ( SentMessage(id) ) отправляется обратно отправителю с идентификатором сообщения
  • ReceiveMessages(count) — сигнализирует о том, что отправитель ( ReceiveMessages(count) ) хотел бы получить до count сообщений. Количество суммируется с ранее заявленным спросом.
  • DeleteMessage(id) — неудивительно, удаляет сообщение

Реализация очереди — это упрощенная версия того, что есть в ElasticMQ . После получения сообщения, если оно не было удалено (подтверждено) в течение 10 секунд, оно снова становится доступным для приема.

Когда ReceiveMessages сигнализирует о требовании к сообщениям (отправляя ReceiveMessages очереди), он должен ожидать любое количество ответов на сообщения актера ReceivedMessages(msgs) , содержащих полученные данные.

Идя реактивный

Чтобы создать и протестировать нашу реактивную очередь, нам нужны три приложения:

Мы можем запустить любое количество Senders и Receivers , но, конечно, мы должны запустить только одного Broker .

Первое, что нам нужно сделать, это соединить Sender с Broker и Receiver с Broker по сети. Мы можем сделать это с помощью расширения Akka IO и реактивного расширения TCP. Используя пару connect & bind , мы получаем поток соединений на стороне привязки:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
// sender:
val connectFuture = IO(StreamTcp) ? StreamTcp.Connect(settings, sendServerAddress)
  
connectFuture.onSuccess {
  case binding: StreamTcp.OutgoingTcpConnection =>
    logger.info("Sender: connected to broker")
    // per-connection logic
}
  
// broker:
val bindSendFuture = IO(StreamTcp) ? StreamTcp.Bind(settings, sendServerAddress)
  
bindSendFuture.onSuccess {
  case serverBinding: StreamTcp.TcpServerBinding =>
    logger.info("Broker: send bound")
  
    Flow(serverBinding.connectionStream).foreach { conn =>
       // per-connection logic
    }.consume(materializer)
}

Существует другой адрес для отправки и получения сообщений.

Отправитель

Давайте сначала посмотрим на логику соединения для Sender .

1
2
3
4
5
6
7
Flow(1.second, () => { idx += 1; s"Message $idx from $senderName" })
  .map { msg =>
    logger.debug(s"Sender: sending $msg")
    createFrame(msg)
  }
  .toProducer(materializer)
  .produceTo(binding.outputStream)

Мы создаем тик-поток, который каждую секунду генерирует новое сообщение (очень удобно для тестирования). Используя преобразователь потока map , мы создаем байтовый кадр с сообщением (подробнее об этом позже). Но это только описание того, как должен выглядеть наш (очень простой) поток; его необходимо материализовать с toProducer метода toProducer , который обеспечит конкретные реализации узлов преобразования потока. В настоящее время есть только один FlowMaterializer , который — что неудивительно — использует актеров Akka под капотом, чтобы фактически создать поток и поток.

Наконец, мы подключаем только что созданного производителя к outputStream привязки TCP, который оказывается потребителем. И теперь у нас есть реактивный поток сообщений по сети, что означает, что сообщения будут отправляться только тогда, когда Broker сможет их принять. В противном случае обратное давление будет применяться вплоть до производителя тиков.

reactmq-actors1-248x300

Брокер: отправка сообщений

На другой стороне сети сидит Broker . Давайте посмотрим, что происходит, когда приходит сообщение.

01
02
03
04
05
06
07
08
09
10
11
12
Flow(serverBinding.connectionStream).foreach { conn =>
  logger.info(s"Broker: send client connected (${conn.remoteAddress})")
  
  val sendToQueueConsumer = ActorConsumer[String](
    system.actorOf(Props(new SendToQueueConsumer(queueActor))))
  
  // sending messages to the queue, receiving from the client
  val reconcileFrames = new ReconcileFrames()
  Flow(conn.inputStream)
    .mapConcat(reconcileFrames.apply)
    .produceTo(materializer, sendToQueueConsumer)
}.consume(materializer)

Сначала мы создаем Flow из входного потока соединения — это будет входящий поток байтов. Затем мы воссоздаем экземпляры String которые были отправлены с использованием нашего кадрирования, и, наконец, направляем этот поток потребителю отправки в очередь.

SendToQueueConsumer — это мост для каждого соединения с SendToQueueConsumer главной очереди. Он использует признак ActorConsumer из реализации ActorConsumer Reactive Streams для автоматического управления запросом, который должен быть передан в восходящем направлении. Используя эту черту, мы можем создать Consumer[_] с реактивным потоком, поддерживаемый актером — так что полностью настраиваемый приемник.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
class SendToQueueConsumer(queueActor: ActorRef) extends ActorConsumer {
  
  private var inFlight = 0
  
  override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
    override def inFlightInternally = inFlight
  }
  
  override def receive = {
    case OnNext(msg: String) =>
      queueActor ! SendMessage(msg)
      inFlight += 1
  
    case SentMessage(_) => inFlight -= 1
  }
}

То, что необходимо предоставить ActorConsumer , — это способ измерения количества потоковых элементов, которые в настоящее время обрабатываются. Здесь мы подсчитываем количество сообщений, которые были отправлены в очередь, но для которых мы еще не получили идентификатор (поэтому они обрабатываются в очереди).

Потребитель получает новые сообщения, завернутые в OnNext актера OnNext ; поэтому OnNext отправляется OnNext потоком, а SentMessage отправляется в ответ на SendMessage SentMessage очереди.

получающий

Приемная часть выполняется аналогичным образом, хотя требует некоторых дополнительных шагов. Во-первых, если вы посмотрите на Receiver , вы увидите, что мы читаем байты из входного потока, воссоздаем сообщения из фреймов и отправляем обратно идентификаторы, следовательно, подтверждаем сообщение. В действительности, мы могли бы запустить некоторую логику обработки сообщений между получением сообщения и отправкой идентификатора.

На стороне Broker мы создаем два потока для каждого соединения.

Один — это поток сообщений, отправленных получателям, другой — поток подтвержденных идентификаторов сообщений от получателей, которые просто преобразуются для отправки сообщений DeleteMessage очереди.

Подобно потребителю, нам нужен мост для каждого соединения от субъекта очереди к потоку. Это реализовано в ReceiveFromQueueProducer . Здесь мы расширяем черту ActorProducer , которая позволяет полностью контролировать процесс фактического создания сообщений, поступающих в поток.

В этом акторе поток-запросчик посылает сообщение- Request для запроса. Когда есть спрос, мы запрашиваем сообщения из очереди. В конечном итоге очередь ответит одним или несколькими сообщениями- ReceivedMessages (когда в очереди есть какие-либо сообщения); так как количество сообщений никогда не превысит сигнализируемый спрос, мы можем безопасно вызвать метод ActorProducer.onNext , который отправляет данные элементы в нисходящий ActorProducer.onNext .

обрамление

Одна небольшая деталь заключается в том, что нам нужен собственный протокол кадрирования (спасибо Роланду Куну за разъяснения ), поскольку поток TCP — это просто поток байтов, поэтому мы можем получить произвольные фрагменты данных, которые необходимо будет повторно объединить позже. К счастью, реализация такого кадрирования довольно проста — см. Класс Framing . Каждый кадр состоит из размера сообщения и самого сообщения.

Подводя итоги

Используя Reactive Streams и реализацию Akka , очень легко создавать реактивные приложения с сквозным противодавлением. Приведенная выше очередь, несмотря на отсутствие множества функций и проверок, не позволит перегружать Broker Senders , а с другой стороны — Broker будет перегружаться Receivers . И все это без необходимости писать какой-либо код обработки противодавления!