Статьи

RabbitMQ повторяет использование Spring Integration

Я недавно читал о подходе, чтобы повторить с RabbitMQ
здесь и хотел попробовать подобный подход с
Spring Integration , которая предоставляет потрясающий набор интеграционных абстракций.

TL; DR решаемая проблема состоит в том, чтобы повторить сообщение (в случае сбоев в обработке) несколько раз с большой задержкой между попытками (скажем, 10 минут +). Подход использует поддержку RabbitMQ для
Мертвая буква обменивается и выглядит примерно так

холст-2

Суть потока:

1. Диспетчер работ создает «рабочие единицы» и отправляет их в очередь RabbitMQ через обмен.

2. Очередь заданий устанавливается с
Обмен мертвыми письмами . Если обработка сообщения по какой-либо причине не удалась, «Рабочий блок» заканчивается Очередью мертвых писем рабочего модуля.

3. Очередь мертвых писем рабочих единиц в свою очередь устанавливается с обменом рабочими единицами как обмен мертвыми буквами, создавая таким образом цикл. Кроме того, срок действия сообщений в очереди недоставленных сообщений устанавливается равным 10 минутам. Таким образом, после истечения срока действия сообщения оно снова возвращается в очередь рабочих единиц.

4. Чтобы прервать цикл, код обработки должен прекратить обработку после превышения определенного порога счета.

Реализация с использованием Spring Integration

Я рассмотрел простой и удобный путь, используя Spring Integration и RabbitMQ
прежде , здесь я буду в основном строить на основе этого кода.

Хорошей частью настройки является конфигурация соответствующих обменов / очередей мертвых букв, и она выглядит так, когда выражается с помощью Spring Configuration Java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration
public class RabbitConfig {
 
    @Autowired
    private ConnectionFactory rabbitConnectionFactory;
 
    @Bean
    Exchange worksExchange() {
        return ExchangeBuilder.topicExchange("work.exchange")
                .durable()
                .build();
    }
 
 
    @Bean
    public Queue worksQueue() {
        return QueueBuilder.durable("work.queue")
                .withArgument("x-dead-letter-exchange", worksDlExchange().getName())
                .build();
    }
 
    @Bean
    Binding worksBinding() {
        return BindingBuilder
                .bind(worksQueue())
                .to(worksExchange()).with("#").noargs();
    }
     
    // Dead letter exchange for holding rejected work units..
    @Bean
    Exchange worksDlExchange() {
        return ExchangeBuilder
                .topicExchange("work.exchange.dl")
                .durable()
                .build();
    }
 
    //Queue to hold Deadletter messages from worksQueue
    @Bean
    public Queue worksDLQueue() {
        return QueueBuilder
                .durable("works.queue.dl")
                .withArgument("x-message-ttl", 20000)
                .withArgument("x-dead-letter-exchange", worksExchange().getName())
                .build();
    }
 
    @Bean
    Binding worksDlBinding() {
        return BindingBuilder
                .bind(worksDLQueue())
                .to(worksDlExchange()).with("#")
                .noargs();
    }
    ...
}

Обратите внимание, что здесь я установил TTL для очереди мертвых писем равной 20 секундам, это означает, что через 20 секунд сообщение с ошибкой вернется в очередь обработки. Как только эта установка будет создана и соответствующие структуры будут созданы в RabbitMQ, часть кода будет выглядеть следующим образом:
Spring Integration Java DSL :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
@Configuration
public class WorkInbound {
 
    @Autowired
    private RabbitConfig rabbitConfig;
 
    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(
                Amqp.inboundAdapter(rabbitConfig.workListenerContainer()))
                .transform(Transformers.fromJson(WorkUnit.class))
                .log()
                .filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel"))
                .handle("workHandler", "process")
                .get();
    }
 
}

Большая часть логики повторов здесь обрабатывается инфраструктурой RabbitMQ, единственное изменение здесь — прерывание цикла путем явного сбрасывания сообщения после определенных 2 повторных попыток. Этот разрыв выражается в виде фильтра выше, смотря на заголовок, называемый «x-death», который RabbitMQ добавляет к сообщению, как только оно отправляется на обмен Dead Letter. По общему признанию, фильтр немного уродлив — он может быть немного лучше выражен в коде Java.

Еще одна вещь, на которую следует обратить внимание, это то, что логика повторения могла быть выражена в процессе с использованием Spring Integration, однако я хотел исследовать поток, в котором время повторения может быть высоким (скажем, 15–20 минут), которое не будет хорошо работать в процессе. и также не является безопасным для кластеров, так как я хочу, чтобы любые экземпляры приложения потенциально обрабатывали повтор сообщения.

Если вы хотите исследовать дальше, попробуйте образец на
мой репозиторий github — https://github.com/bijukunjummen/si-dsl-rabbit-sample

Ссылка:

Повторите попытку с RabbitMQ: http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq

Ссылка: RabbitMQ повторяет попытку, используя Spring Integration от нашего партнера JCG Биджу Кунджуммена в блоге all and sundry.