Я недавно читал о подходе, чтобы повторить с RabbitMQ
здесь и хотел попробовать подобный подход с
Spring Integration , которая предоставляет потрясающий набор интеграционных абстракций.
TL; DR решаемая проблема состоит в том, чтобы повторить сообщение (в случае сбоев в обработке) несколько раз с большой задержкой между попытками (скажем, 10 минут +). Подход использует поддержку RabbitMQ для
Мертвая буква обменивается и выглядит примерно так
Суть потока:
1. Диспетчер работ создает «рабочие единицы» и отправляет их в очередь RabbitMQ через обмен.
2. Очередь заданий устанавливается с
Обмен мертвыми письмами . Если обработка сообщения по какой-либо причине не удалась, «Рабочий блок» заканчивается Очередью мертвых писем рабочего модуля.
3. Очередь мертвых писем рабочих единиц в свою очередь устанавливается с обменом рабочими единицами как обмен мертвыми буквами, создавая таким образом цикл. Кроме того, срок действия сообщений в очереди недоставленных сообщений устанавливается равным 10 минутам. Таким образом, после истечения срока действия сообщения оно снова возвращается в очередь рабочих единиц.
4. Чтобы прервать цикл, код обработки должен прекратить обработку после превышения определенного порога счета.
Реализация с использованием Spring Integration
Я рассмотрел простой и удобный путь, используя Spring Integration и RabbitMQ
прежде , здесь я буду в основном строить на основе этого кода.
Хорошей частью настройки является конфигурация соответствующих обменов / очередей мертвых букв, и она выглядит так, когда выражается с помощью Spring Configuration Java:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@Configuration public class RabbitConfig { @Autowired private ConnectionFactory rabbitConnectionFactory; @Bean Exchange worksExchange() { return ExchangeBuilder.topicExchange( "work.exchange" ) .durable() .build(); } @Bean public Queue worksQueue() { return QueueBuilder.durable( "work.queue" ) .withArgument( "x-dead-letter-exchange" , worksDlExchange().getName()) .build(); } @Bean Binding worksBinding() { return BindingBuilder .bind(worksQueue()) .to(worksExchange()).with( "#" ).noargs(); } // Dead letter exchange for holding rejected work units.. @Bean Exchange worksDlExchange() { return ExchangeBuilder .topicExchange( "work.exchange.dl" ) .durable() .build(); } //Queue to hold Deadletter messages from worksQueue @Bean public Queue worksDLQueue() { return QueueBuilder .durable( "works.queue.dl" ) .withArgument( "x-message-ttl" , 20000 ) .withArgument( "x-dead-letter-exchange" , worksExchange().getName()) .build(); } @Bean Binding worksDlBinding() { return BindingBuilder .bind(worksDLQueue()) .to(worksDlExchange()).with( "#" ) .noargs(); } ... } |
Обратите внимание, что здесь я установил TTL для очереди мертвых писем равной 20 секундам, это означает, что через 20 секунд сообщение с ошибкой вернется в очередь обработки. Как только эта установка будет создана и соответствующие структуры будут созданы в RabbitMQ, часть кода будет выглядеть следующим образом:
Spring Integration Java DSL :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
@Configuration public class WorkInbound { @Autowired private RabbitConfig rabbitConfig; @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from( Amqp.inboundAdapter(rabbitConfig.workListenerContainer())) .transform(Transformers.fromJson(WorkUnit. class )) .log() .filter( "(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true" , f -> f.discardChannel( "nullChannel" )) .handle( "workHandler" , "process" ) .get(); } } |
Большая часть логики повторов здесь обрабатывается инфраструктурой RabbitMQ, единственное изменение здесь — прерывание цикла путем явного сбрасывания сообщения после определенных 2 повторных попыток. Этот разрыв выражается в виде фильтра выше, смотря на заголовок, называемый «x-death», который RabbitMQ добавляет к сообщению, как только оно отправляется на обмен Dead Letter. По общему признанию, фильтр немного уродлив — он может быть немного лучше выражен в коде Java.
Еще одна вещь, на которую следует обратить внимание, это то, что логика повторения могла быть выражена в процессе с использованием Spring Integration, однако я хотел исследовать поток, в котором время повторения может быть высоким (скажем, 15–20 минут), которое не будет хорошо работать в процессе. и также не является безопасным для кластеров, так как я хочу, чтобы любые экземпляры приложения потенциально обрабатывали повтор сообщения.
Если вы хотите исследовать дальше, попробуйте образец на
мой репозиторий github — https://github.com/bijukunjummen/si-dsl-rabbit-sample
Ссылка:
Повторите попытку с RabbitMQ: http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq
Ссылка: | RabbitMQ повторяет попытку, используя Spring Integration от нашего партнера JCG Биджу Кунджуммена в блоге all and sundry. |