Статьи

RabbitMQ — последовательная обработка сообщений с использованием Spring DS Java DSL

Если вам когда-нибудь понадобится обрабатывать сообщения последовательно с RabbitMQ с кластером слушателей, обрабатывающих сообщения, лучший способ, который я видел, — это использовать флаг «эксклюзивного потребителя» на слушателе с 1 потоком на каждом слушателе, обрабатывающем сообщения.

Флаг исключительного потребителя гарантирует, что только 1 потребитель может читать сообщения из определенной очереди, а 1 поток этого потребителя гарантирует, что сообщения обрабатываются последовательно. Однако есть одна проблема, я расскажу об этом позже.

Позвольте мне продемонстрировать это поведение с потребителем сообщений RabbitMQ на основе Spring Boot и Spring Integration.

Во-первых, это конфигурация для настройки очереди с использованием Java-конфигурации Spring, обратите внимание, что, поскольку это приложение Spring Boot, оно автоматически создает фабрику соединений RabbitMQ, когда библиотека Spring-amqp добавляется в список зависимостей:

01
02
03
04
05
06
07
08
09
10
11
12
13
@Configuration
@Configuration
public class RabbitConfig {
 
    @Autowired
    private ConnectionFactory rabbitConnectionFactory;
 
    @Bean
    public Queue sampleQueue() {
        return new Queue("sample.queue", true, false, false);
    }
 
}

Учитывая этот пример очереди, слушатель, который получает сообщения из этой очереди и обрабатывает их, выглядит следующим образом, поток записывается с использованием превосходной библиотеки Java DSL интеграции Spring :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class RabbitInboundFlow {
    private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);
 
    @Autowired
    private RabbitConfig rabbitConfig;
 
    @Autowired
    private ConnectionFactory connectionFactory;
 
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(this.connectionFactory);
        listenerContainer.setQueues(this.rabbitConfig.sampleQueue());
        listenerContainer.setConcurrentConsumers(1);
        listenerContainer.setExclusive(true);
        return listenerContainer;
    }
 
    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer()))
                .transform(Transformers.objectToString())
                .handle((m) -> {
                    logger.info("Processed  {}", m.getPayload());
                })
                .get();
    }
 
}

Поток очень кратко выражен в методе inboundFlow, полезная нагрузка сообщения из RabbitMQ преобразуется из байтового массива в String и, наконец, обрабатывается путем простой записи сообщения в журналы.

Важной частью потока является конфигурация слушателя, обратите внимание на флаг, который устанавливает потребителя в качестве исключительного потребителя, и в этом потребителе число потоков обрабатывается равным 1. Учитывая это, даже если запускается только несколько экземпляров приложения 1 из слушателей сможет подключаться и обрабатывать сообщения.

Теперь для улова рассмотрим случай, когда обработка сообщений занимает некоторое время и откатывается во время обработки сообщения. Если экземпляр приложения, обрабатывающего сообщение, должен был быть остановлен в середине обработки такого сообщения, то поведение другого экземпляра начнет обрабатывать сообщения в очереди, когда остановленный экземпляр откатывает сообщение, откат сообщение затем доставляется новому эксклюзивному потребителю, таким образом получая сообщение не в порядке.

  • Если вы заинтересованы в дальнейшем изучении этого, вот проект github, чтобы поиграть с этой функцией: https://github.com/bijukunjummen/test-rabbit-exclusive.
Ссылка: RabbitMQ — Обработка сообщений поочередно с помощью Spring DS Java DSL от нашего партнера по JCG Биджу Кунджуммена из блога all and sundry.