Если вам когда-нибудь понадобится обрабатывать сообщения последовательно с RabbitMQ с кластером слушателей, обрабатывающих сообщения, лучший способ, который я видел, — это использовать флаг «эксклюзивного потребителя» на слушателе с 1 потоком на каждом слушателе, обрабатывающем сообщения.
Флаг исключительного потребителя гарантирует, что только 1 потребитель может читать сообщения из определенной очереди, а 1 поток этого потребителя гарантирует, что сообщения обрабатываются последовательно. Однако есть одна проблема, я расскажу об этом позже.
Позвольте мне продемонстрировать это поведение с потребителем сообщений RabbitMQ на основе Spring Boot и Spring Integration.
Во-первых, это конфигурация для настройки очереди с использованием Java-конфигурации Spring, обратите внимание, что, поскольку это приложение Spring Boot, оно автоматически создает фабрику соединений RabbitMQ, когда библиотека Spring-amqp добавляется в список зависимостей:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
@Configuration @Configuration public class RabbitConfig { @Autowired private ConnectionFactory rabbitConnectionFactory; @Bean public Queue sampleQueue() { return new Queue( "sample.queue" , true , false , false ); } } |
Учитывая этот пример очереди, слушатель, который получает сообщения из этой очереди и обрабатывает их, выглядит следующим образом, поток записывается с использованием превосходной библиотеки Java DSL интеграции Spring :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
@Configuration public class RabbitInboundFlow { private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow. class ); @Autowired private RabbitConfig rabbitConfig; @Autowired private ConnectionFactory connectionFactory; @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory( this .connectionFactory); listenerContainer.setQueues( this .rabbitConfig.sampleQueue()); listenerContainer.setConcurrentConsumers( 1 ); listenerContainer.setExclusive( true ); return listenerContainer; } @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer())) .transform(Transformers.objectToString()) .handle((m) -> { logger.info( "Processed {}" , m.getPayload()); }) .get(); } } |
Поток очень кратко выражен в методе inboundFlow, полезная нагрузка сообщения из RabbitMQ преобразуется из байтового массива в String и, наконец, обрабатывается путем простой записи сообщения в журналы.
Важной частью потока является конфигурация слушателя, обратите внимание на флаг, который устанавливает потребителя в качестве исключительного потребителя, и в этом потребителе число потоков обрабатывается равным 1. Учитывая это, даже если запускается только несколько экземпляров приложения 1 из слушателей сможет подключаться и обрабатывать сообщения.
Теперь для улова рассмотрим случай, когда обработка сообщений занимает некоторое время и откатывается во время обработки сообщения. Если экземпляр приложения, обрабатывающего сообщение, должен был быть остановлен в середине обработки такого сообщения, то поведение другого экземпляра начнет обрабатывать сообщения в очереди, когда остановленный экземпляр откатывает сообщение, откат сообщение затем доставляется новому эксклюзивному потребителю, таким образом получая сообщение не в порядке.
- Если вы заинтересованы в дальнейшем изучении этого, вот проект github, чтобы поиграть с этой функцией: https://github.com/bijukunjummen/test-rabbit-exclusive.
Ссылка: | RabbitMQ — Обработка сообщений поочередно с помощью Spring DS Java DSL от нашего партнера по JCG Биджу Кунджуммена из блога all and sundry. |