Основная идея заключается в использовании новой функции, доступной в 2.8.0, обмене недоставленными буквами . Это расширение AMQP позволяет вам указать обмен в очереди, в который сообщения должны публиковаться, когда сообщение истекает или отклоняется с требованием, установленным в false.
Имея это в виду, мы можем просто создать очередь для сообщений, которые мы хотим доставить позже, с x-message-ttl, установленным на время, которое мы хотим ждать, прежде чем оно будет доставлено. И чтобы обеспечить передачу сообщения в другую очередь, мы просто определяем обмен x-dead-letter-exchange для обмена, который мы создали (в этом случае я назову его немедленным), и привязываем к нему очередь («right.now. очередь»).
В coffeescript с node-amqp это выглядит так:
01
02
03
04
05
06
07
08
09
10
11
|
amqp = require 'amqp' conn = amqp.createConnection() key = "send.later.#{new Date().getTime()}" conn.on 'ready' , ->' conn.queue key, { arguments:{ "x-dead-letter-exchange" : "immediate" , "x-message-ttl" : 5000 } } |
Далее я определяю немедленный обмен, привязываю к нему очередь и подписываюсь.
1
2
3
4
5
6
7
|
conn.exchange 'immediate' conn.queue 'right.now.queue' , {autoDelete: false , durable: true }, (q) -> q.bind( 'immediate' , 'right.now.queue' ) q.subscribe (msg, headers, deliveryInfo) -> console.log msg console.log headers |
Наконец, после определения очереди, которую я создал ранее, мы хотим опубликовать в ней сообщение. Поэтому, чтобы вернуться к более раннему определению очереди, мы добавляем вызов публикации для публикации непосредственно в очередь (используя обмен по умолчанию).
1
2
3
4
5
6
7
8
|
conn.on 'ready' , -> conn.queue key, { arguments:{ "x-dead-letter-exchange" : "immediate" , "x-message-ttl" : 5000 } }, -> conn.publish key, { v :1}, {contentType: 'application/json' } |
В результате мы увидим 5-секундное ожидание, а затем содержимое сообщения и заголовки будут выгружены на консоль. Поскольку в этом сценарии очередь используется только временно, я также установил срок действия атрибута x-expires очереди через разумное время после истечения срока действия сообщения. Это гарантирует, что мы не получим массу неиспользованных очередей, просто сидящих без дела.
Вот результат этого упражнения в полном объеме.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
amqp = require 'amqp' events = require 'events' em = new events.EventEmitter() conn = amqp.createConnection() key = "send.later.#{new Date().getTime()}" conn.on 'ready' , -> conn.queue key, { arguments:{ "x-dead-letter-exchange" : "immediate" , "x-message-ttl" : 5000 , "x-expires" : 6000 } }, -> conn.publish key, { v :1}, {contentType: 'application/json' } conn.exchange 'immediate' conn.queue 'right.now.queue' , { autoDelete: false , durable: true }, (q) -> q.bind( 'immediate' , 'right.now.queue' ) q.subscribe (msg, headers, deliveryInfo) -> console.log msg console.log headers |
Вы можете получить это упражнение полностью на github .
Это довольно интересно, и я планирую поэкспериментировать с использованием этого в одном из моих производственных приложений node.js, которые используют опрос на основе интервалов для запуска запланированных событий.
Ссылка: запланированная доставка сообщений с RabbitMQ от нашего партнера JCG Джеймса Карра в блоге Rants and Musings of Agile Developer .