Статьи

ElasticMQ 0.7.0: длинный опрос, неблокирующая реализация с использованием Akka и Spray

ElasticMQ 0.7.0 , система очереди сообщений с актерским интерфейсом, совместимым с Scala и Amazon SQS , только что была выпущена.

Это основная перезапись с использованием акторов Akka в ядре и Spray для уровня REST. До настоящего времени были переписаны только модули ядра и SQS; ведение журнала, SQL-сервер и репликация еще не завершены.

Основные улучшения на стороне клиента:

  • поддержка длинных опросов, которая была добавлена ​​в SQS некоторое время назад
  • более простой автономный сервер — всего одна банка для загрузки

При длительном опросе при получении сообщения вы можете указать дополнительный атрибут MessageWaitTime . Если в очереди нет сообщений, вместо того, чтобы завершить запрос пустым ответом, ElasticMQ будет ждать до секунд MessageWaitTime пока сообщения не поступят. Это помогает как уменьшить используемую полосу пропускания (нет необходимости в очень частых запросах), улучшить общую производительность системы (сообщения принимаются сразу после отправки), так и снизить затраты на SQS.

Автономный сервер теперь представляет собой одну банку. Чтобы запустить локальную реализацию SQS в памяти (например, для тестирования приложения, использующего SQS), все, что вам нужно сделать, это загрузить файл jar и запустить:

1
java -jar elasticmq-server-0.7.0.jar

Это запустит сервер на http://localhost:9324 . Конечно, интерфейс и порт настраиваются, подробности смотрите в README . Как и раньше, вы также можете запустить встроенный сервер с любого языка на основе JVM.

Замечания по реализации

Для любопытных, вот краткое описание того, как реализован ElasticMQ, включая базовую систему, уровень REST, использование потока данных Akka и реализацию длительного опроса. Весь код доступен на GitHub .

Как уже упоминалось, ElasticMQ теперь реализован с использованием Akka и Spray и не содержит блокирующих вызовов . Все асинхронно.

ядро

Основная система основана на актерах. Есть один главный субъект ( QueueManagerActor ), который знает, какие очереди в настоящее время создаются в системе, и дает возможность создавать и удалять очереди.

Для общения с актерами используется типизированный шаблон Ask. Например, для поиска очереди (очередь также является актером) определяется сообщение:

1
case class LookupQueue(queueName: String) extends Replyable[Option[ActorRef]]

Использование выглядит так:

1
2
import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor ? LookupQueue("q2")

Как уже упоминалось, каждая очередь является субъектом и инкапсулирует состояние очереди. Мы можем использовать простые изменяемые структуры данных без необходимости синхронизации потоков, поскольку модель акторов позаботится об этом за нас. Существует ряд сообщений, которые могут быть отправлены субъекту очереди, например:

1
2
3
4
case class SendMessage(message: NewMessageData)   extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int,
           waitForMessages: Option[Duration])     extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]

Слой отдыха

Уровень запросов / REST SQS реализован с использованием Spray , облегченного инструментария REST / HTTP, основанного на Akka.

Помимо неблокирующей реализации ввода-вывода на основе акторов, Spray также предлагает мощную библиотеку маршрутизации Spray spray-routing . Он содержит ряд встроенных директив для сопоставления по методу запроса (get / post и т. Д.), Извлечения запроса параметров формы или сопоставления по пути запроса. Но это также позволяет вам определять свои собственные директивы, используя простую композицию директив. Типичный маршрут ElasticMQ выглядит следующим образом:

1
2
3
4
5
6
7
8
val listQueuesDirective =
  action("ListQueues") {
    rootPath {
      anyParam("QueueNamePrefix"?) { prefixOption =>
        // logic
      }
    }
  }

Если action соответствует имени действия, указанному в URL-адресе "Action" параметра body, и принимает / отклоняет запрос, rootPath соответствует пустому пути и т. Д. Спрей имеет хороший учебник , поэтому я советую вам посмотреть там, если вы заинтересованы.

Как использовать акторы очереди из маршрутов для выполнения HTTP-запросов?

Хорошая вещь в Spray — это то, что он просто передает экземпляр RequestContext вашим маршрутам, ничего не ожидая взамен. Это зависит от маршрута, чтобы полностью отменить запрос или дополнить его значением. Запрос также может быть выполнен в другом потоке — или, например, когда будет завершено какое-то будущее. Именно это и делает ElasticMQ. Здесь очень удобны map , flatMap и for-comprehensions flatMap (которые являются более приятным синтаксисом для map / flatMap ), например (упрощенно):

1
2
3
4
5
6
7
8
// Looking up the queue and deleting it are going to be called in sequence,
// but asynchronously, as ? returns a Future
for {
   queueActor <- queueManagerActor ? LookupQueue(queueName)
   _ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {
   requestContext.complete(200, "message deleted")
}

Иногда, когда поток усложняется, ElasticMQ использует Akka Dataflow , что требует включения плагина продолжения. Есть также похожий проект, использующий макросы, Scala Async , но он находится на ранней стадии разработки.

Используя Akka Dataflow, вы можете написать код, который использует Future как если бы это был обычный последовательный код. Плагин CPS преобразует его для использования обратных вызовов, где это необходимо. Пример, взятый из CreateQueueDirectives :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
flow {
  val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()
  queueActorOption match {
    case None => {
      val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()
      createResult match {
        case Left(e) => throw new SQSException("Queue already created: " + e.message)
        case Right(_) => newQueueData
      }
    }
    case Some(queueActor) => {
      (queueActor ? GetQueueData()).apply()
    }
  }
}

Важными частями здесь являются блок flow , который ограничивает область трансформации, и вызовы apply() объектов Future которые извлекают содержимое будущего. Это выглядит как совершенно нормальный последовательный код, но при выполнении, так как первое использование в будущем будет выполняться асинхронно.

Длинный опрос

Поскольку весь код был асинхронным и неблокирующим, реализовать длительный опрос было довольно просто. Обратите внимание, что при получении сообщений из очереди мы получаем Future[List[MessageData]] . В ответ на завершение этого будущего HTTP-запрос также завершается с соответствующим ответом. Однако это будущее может быть завершено почти сразу (как обычно), или, например, через 10 секунд — в коде не требуется никаких изменений для поддержки этого. Таким образом, единственное, что нужно сделать, — это отложить завершение будущего до тех пор, пока не пройдет указанное количество времени или не появятся новые сообщения.

Реализация находится в QueueActorWaitForMessagesOps . Когда приходит запрос на получение сообщений, и в очереди ничего нет, вместо немедленного ответа (то есть отправки пустого списка субъекту-отправителю) мы сохраняем ссылку на исходный запрос и субъект-отправителя на карте. Используя планировщик Akka, мы также планируем отправку обратно пустого списка и удаление записи после указанного времени ожидания.

Когда приходят новые сообщения, мы просто берем запрос с карты и пытаемся его выполнить. Опять же, все проблемы синхронизации и параллелизма обрабатываются Akka и актерской моделью.