Я недавно читал о подходе, чтобы повторить с RabbitMQ
здесь и хотел попробовать подобный подход с
Spring Integration , которая предоставляет потрясающий набор интеграционных абстракций.
TL; DR решаемая проблема состоит в том, чтобы повторить сообщение (в случае сбоев в обработке) несколько раз с большой задержкой между попытками (скажем, 10 минут +). Подход использует поддержку RabbitMQ для
Мертвая буква обменивается и выглядит примерно так
Суть потока:
1. Диспетчер работ создает «рабочие единицы» и отправляет их в очередь RabbitMQ через обмен.
2. Очередь заданий устанавливается с
Обмен мертвыми письмами . Если обработка сообщения по какой-либо причине не удалась, «Рабочий блок» заканчивается Очередью мертвых писем рабочего модуля.
3. Очередь мертвых писем рабочих единиц в свою очередь устанавливается с обменом рабочими единицами как обмен мертвыми буквами, создавая таким образом цикл. Кроме того, срок действия сообщений в очереди недоставленных сообщений устанавливается равным 10 минутам. Таким образом, после истечения срока действия сообщения оно снова возвращается в очередь рабочих единиц.
4. Чтобы прервать цикл, код обработки должен прекратить обработку после превышения определенного порога счета.
Реализация с использованием Spring Integration
Я рассмотрел простой и удобный путь, используя Spring Integration и RabbitMQ
прежде , здесь я буду в основном строить на основе этого кода.
Хорошей частью настройки является конфигурация соответствующих обменов / очередей мертвых букв, и она выглядит так, когда выражается с помощью Spring Configuration Java:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@Configurationpublic class RabbitConfig { @Autowired private ConnectionFactory rabbitConnectionFactory; @Bean Exchange worksExchange() { return ExchangeBuilder.topicExchange("work.exchange") .durable() .build(); } @Bean public Queue worksQueue() { return QueueBuilder.durable("work.queue") .withArgument("x-dead-letter-exchange", worksDlExchange().getName()) .build(); } @Bean Binding worksBinding() { return BindingBuilder .bind(worksQueue()) .to(worksExchange()).with("#").noargs(); } // Dead letter exchange for holding rejected work units.. @Bean Exchange worksDlExchange() { return ExchangeBuilder .topicExchange("work.exchange.dl") .durable() .build(); } //Queue to hold Deadletter messages from worksQueue @Bean public Queue worksDLQueue() { return QueueBuilder .durable("works.queue.dl") .withArgument("x-message-ttl", 20000) .withArgument("x-dead-letter-exchange", worksExchange().getName()) .build(); } @Bean Binding worksDlBinding() { return BindingBuilder .bind(worksDLQueue()) .to(worksDlExchange()).with("#") .noargs(); } ...} |
Обратите внимание, что здесь я установил TTL для очереди мертвых писем равной 20 секундам, это означает, что через 20 секунд сообщение с ошибкой вернется в очередь обработки. Как только эта установка будет создана и соответствующие структуры будут созданы в RabbitMQ, часть кода будет выглядеть следующим образом:
Spring Integration Java DSL :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
@Configurationpublic class WorkInbound { @Autowired private RabbitConfig rabbitConfig; @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from( Amqp.inboundAdapter(rabbitConfig.workListenerContainer())) .transform(Transformers.fromJson(WorkUnit.class)) .log() .filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel")) .handle("workHandler", "process") .get(); }} |
Большая часть логики повторов здесь обрабатывается инфраструктурой RabbitMQ, единственное изменение здесь — прерывание цикла путем явного сбрасывания сообщения после определенных 2 повторных попыток. Этот разрыв выражается в виде фильтра выше, смотря на заголовок, называемый «x-death», который RabbitMQ добавляет к сообщению, как только оно отправляется на обмен Dead Letter. По общему признанию, фильтр немного уродлив — он может быть немного лучше выражен в коде Java.
Еще одна вещь, на которую следует обратить внимание, это то, что логика повторения могла быть выражена в процессе с использованием Spring Integration, однако я хотел исследовать поток, в котором время повторения может быть высоким (скажем, 15–20 минут), которое не будет хорошо работать в процессе. и также не является безопасным для кластеров, так как я хочу, чтобы любые экземпляры приложения потенциально обрабатывали повтор сообщения.
Если вы хотите исследовать дальше, попробуйте образец на
мой репозиторий github — https://github.com/bijukunjummen/si-dsl-rabbit-sample
Ссылка:
Повторите попытку с RabbitMQ: http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq
| Ссылка: | RabbitMQ повторяет попытку, используя Spring Integration от нашего партнера JCG Биджу Кунджуммена в блоге all and sundry. |
