Статьи

RabbitMQ: запланированная доставка сообщений

Ранее в этом месяце я выступил с презентацией на ComoRichWeb на RabbitMQ, и один из участников спросил: «Можно ли опубликовать сообщение для последующего использования?» Я ответил, что, насколько мне известно, это было невозможно, но для этого может потребоваться взлом. Что ж, сегодня вечером, пытаясь выяснить, как использовать модель push-in-polling для временных уведомлений, я обнаружил хитрый взлом, используя временные очереди, x-message-ttl и обмен мертвыми буквами.

Основная идея заключается в использовании новой функции, доступной в 2.8.0, обмене недоставленными буквами . Это расширение AMQP позволяет вам указать обмен в очереди, в который сообщения должны публиковаться, когда сообщение истекает или отклоняется с требованием, установленным в false.

Имея это в виду, мы можем просто создать очередь для сообщений, которые мы хотим доставить позже, с x-message-ttl, установленным на время, которое мы хотим ждать, прежде чем оно будет доставлено. И чтобы обеспечить передачу сообщения в другую очередь, мы просто определяем обмен x-dead-letter-exchange для обмена, который мы создали (в этом случае я назову его немедленным), и привязываем к нему очередь («right.now. очередь»).

В coffeescript с node-amqp это выглядит так:

01
02
03
04
05
06
07
08
09
10
11
amqp   = require 'amqp'
conn   = amqp.createConnection()
   
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->'
  conn.queue key, {
    arguments:{
      "x-dead-letter-exchange":"immediate"
    , "x-message-ttl": 5000
    }
  }

Далее я определяю немедленный обмен, привязываю к нему очередь и подписываюсь.

1
2
3
4
5
6
7
conn.exchange 'immediate'
 
conn.queue 'right.now.queue', {autoDelete: false, durable: true}, (q) ->
  q.bind('immediate', 'right.now.queue')
  q.subscribe (msg, headers, deliveryInfo) ->
    console.log msg
    console.log headers

Наконец, после определения очереди, которую я создал ранее, мы хотим опубликовать в ней сообщение. Поэтому, чтобы вернуться к более раннему определению очереди, мы добавляем вызов публикации для публикации непосредственно в очередь (используя обмен по умолчанию).

1
2
3
4
5
6
7
8
conn.on 'ready', ->
  conn.queue key, {
    arguments:{
      "x-dead-letter-exchange":"immediate"
    , "x-message-ttl": 5000
    }
  }, ->
    conn.publish key, {v:1}, {contentType:'application/json'}

В результате мы увидим 5-секундное ожидание, а затем содержимое сообщения и заголовки будут выгружены на консоль. Поскольку в этом сценарии очередь используется только временно, я также установил срок действия атрибута x-expires очереди через разумное время после истечения срока действия сообщения. Это гарантирует, что мы не получим массу неиспользованных очередей, просто сидящих без дела.

Вот результат этого упражнения в полном объеме.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
amqp   = require 'amqp'
events = require 'events'
em     = new events.EventEmitter()
conn   = amqp.createConnection()
   
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
  conn.queue key, {
    arguments:{
      "x-dead-letter-exchange":"immediate"
    , "x-message-ttl": 5000
    , "x-expires": 6000
    }
  }, ->
    conn.publish key, {v:1}, {contentType:'application/json'}
   
  conn.exchange 'immediate'
 
  conn.queue 'right.now.queue', {
      autoDelete: false
    , durable: true
  }, (q) ->
    q.bind('immediate', 'right.now.queue')
    q.subscribe (msg, headers, deliveryInfo) ->
      console.log msg
      console.log headers

Вы можете получить это упражнение полностью на github .

Это довольно интересно, и я планирую поэкспериментировать с использованием этого в одном из моих производственных приложений node.js, которые используют опрос на основе интервалов для запуска запланированных событий.

Ссылка: запланированная доставка сообщений с RabbitMQ от нашего партнера JCG Джеймса Карра в блоге Rants and Musings of Agile Developer .