Статьи

Обеспечение долговременной реактивной очереди с сохранением Akka

Некоторое время назад я писал, как реализовать реактивную очередь сообщений с помощью Akka Streams . Очередь поддерживает потоковые операции отправки и получения с обратным давлением, но имеет один недостаток: все сообщения хранятся в памяти и, следовательно, в случае перезапуска теряются.

Но это можно легко решить с помощью экспериментального akka-persistenceмодуля, который только что получил обновление в Akka 2.3.4 .

Очередь актера-переподготовки

Чтобы сделать очередь долговечной, нам нужно только изменить действующего субъекта; реактивные / потоковые части остаются нетронутыми. Как напоминание, реактивная очередь состоит из:

  • один субъект очереди , который содержит внутреннюю очередь сообщений, подлежащих доставке. Участник очереди принимает сообщения субъекта для отправки, получения и удаления сообщений очереди
  • брокер , который создает актер очереди, ожидает соединений от отправителей и получателей, и создает реактивные потоки , когда установлено соединение
  • отправителя , который отправляет сообщения в очередь (для тестирования, одно сообщение каждый второй). Несколько отправителей могут быть запущены. Сообщения отправляются только в том случае, если они могут быть приняты (обратное давление со стороны брокера)
  • приемник , который принимает сообщения из очереди, как они становятся доступными , и как они могут быть обработаны (обратное давление от приемника)

Действующие лица

Идти настойчиво (оставаясь реактивным)

Необходимые изменения весьма минимальны.

Прежде всего, QueueActorнеобходимо расширить PersistentActorи определить два метода:

  • receiveCommand, который определяет «нормальное» поведение при поступлении актер-сообщений (команд)
  • receiveRecover, который используется только во время восстановления, и куда отправляются события воспроизведения

Но для того, чтобы выздороветь, сначала нужно сохранить некоторые события! Это, конечно, должно быть сделано при обработке операций очереди сообщений.

Например, при отправке сообщения MessageAddedсобытие сохраняется с помощью persistAsync:

def handleQueueMsg: Receive = {
  case SendMessage(content) =>
    val msg = sendMessage(content)
    persistAsync(msg.toMessageAdded) { msgAdded =>
      sender() ! SentMessage(msgAdded.id)
      tryReply()
    }
 
   // ...
}

persistAsyncэто один из способов сохранения событий с использованием akka-persistence. Другой persist(который также используется по умолчанию) буферизует последующие команды (сообщения актера) до тех пор, пока событие не будет сохранено; это немного медленнее, но также легче рассуждать и оставаться последовательным. Однако в случае очереди сообщений такое поведение не является необходимым. Единственная гарантия, что нам нужно, это то, что отправка сообщения подтверждается только после того, как событие сохраняется; и именно поэтому ответ отправляется в обработчик событий after-persist. Вы можете прочитать больше о persistAsyncдокументах .

Аналогично, события сохраняются для других команд (см QueueActorReceive. Сообщения об актере ). И для удалений, и для получения, которые мы используем persistAsync, поскольку очередь стремится обеспечить как минимум однократную гарантию доставки.

Последний компонент — это обработчик восстановления, который определен в QueueActorRecover(и затем используется в QueueActor). Восстановление довольно просто: события соответствуют добавлению нового сообщения, обновлению временной метки «следующая доставка» или удалению.

Внутреннее представление использует как очередь с приоритетами, так и карту по идентификатору для эффективности, поэтому, когда события обрабатываются во время реконвертации, мы только строим карту, а также используем RecoveryCompletedспециальное событие для построения очереди. Специальное событие отправляется akka-persistence автоматически.

И это все! Если вы сейчас запустите посредника, отправите несколько сообщений, остановите посредника, запустите его снова, вы увидите, что сообщения восстановлены, и, действительно, они будут получены, если получатель запущен.

Конечно, код не готов к работе. Журнал событий будет постоянно расти, поэтому было бы целесообразно использовать моментальные снимки , а также удалять старые события / моментальные снимки, чтобы уменьшить размер хранилища и ускорить восстановление.

копирование

Теперь, когда очередь долговечна, мы также можем иметь реплицированную постоянную очередь почти бесплатно: нам просто нужно использовать другой плагин журнала ! По умолчанию используется LevelDB и записывает данные на локальный диск. Доступны другие реализации: для Cassandra , HBase и Mongo .

Делая простое переключение бэкэнда персистентности, мы можем реплицировать наши сообщения в кластере.

Резюме

С помощью двух экспериментальных модулей Akka, реактивных потоков и персистентности , мы смогли создать долговременную реактивную очередь с минимальным количеством кода. И это только начало, так как две технологии только начинают развиваться!

Если вы хотите изменить / разветвить код, он доступен на Github .