RabbitMQ является одним из популярных решений для брокеров сообщений и предоставляет клиентские библиотеки для использования на различных языках программирования, включая Java, Scala, .NET, Go, Python, Ruby, PHP и т. Д. В этом руководстве мы узнаем, как использовать брокер сообщений RabbitMQ. отправлять и получать сообщения из приложения SpringBoot. Мы также рассмотрим, как отправлять сообщения в виде полезных нагрузок JSON и как обрабатывать ошибки, используя Dead Letter Queues (DLQ).
Сначала установите сервер RabbitMQ на свой локальный компьютер, как описано здесь https://www.rabbitmq.com/download.html, или запустите его в качестве образа Docker со следующим docker-compose.yml .
1
2
3
4
5
6
7
8
9
|
version: '3' services: rabbitmq: container_name: rabbitmq image: 'rabbitmq:management' ports: - "5672:5672" - "15672:15672" |
Теперь вы можете запустить RabbitMQ с помощью docker-compose и запустить интерфейс администрирования по адресу http: // localhost: 15672 /.
Если вы знакомы с другими брокерами обмена сообщениями, такими как ActiveMQ, мы обычно используем очереди и темы для отправки одноранговой и паб-суб-модели общения. В RabbitMQ мы отправляем сообщения в Exchange, и в зависимости от ключа маршрутизации это сообщение будет перенаправлено в очередь (и). Вы можете прочитать больше о концепциях RabbitMQ здесь https://www.rabbitmq.com/tutorials/amqp-concepts.html .
Вы можете найти исходный код этой статьи по адресу https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo
Приложение SpringBoot с RabbitMQ
Теперь давайте создадим приложение SpringBoot из http://start.spring.io/, выбрав стартеры Web , Thymeleaf и RabbitMQ .
pom.xml
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
<? xml version = "1.0" encoding = "UTF-8" ?> < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion >4.0.0</ modelVersion > < groupId >com.sivalabs</ groupId > < artifactId >springboot-rabbitmq-demo</ artifactId > < version >1.0-SNAPSHOT</ version > < parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >2.0.0.RC1</ version > < relativePath /> </ parent > < properties > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > < project.reporting.outputEncoding >UTF-8</ project.reporting.outputEncoding > < java.version >1.8</ java.version > </ properties > < dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-amqp</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-web</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-thymeleaf</ artifactId > </ dependency > </ dependencies > </ project > |
Давайте начнем с конфигурации RabbitMQ. Создайте класс конфигурации RabbitConfig и определите компоненты Queue , Exchange и Binding следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String QUEUE_ORDERS = "orders-queue" ; public static final String EXCHANGE_ORDERS = "orders-exchange" ; @Bean Queue ordersQueue() { return QueueBuilder.durable(QUEUE_ORDERS).build(); } @Bean Queue deadLetterQueue() { return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build(); } @Bean Exchange ordersExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build(); } @Bean Binding binding(Queue ordersQueue, TopicExchange ordersExchange) { return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS); } } |
Здесь мы объявляем Очередь с именами -очередями и Exchange с именами -обменами .
Мы также определили привязку между очередями заказов и обменом заказов, чтобы любое сообщение, отправленное на обмен заказами с ключом маршрутизации в виде «очереди заказов» , было отправлено в очередь заказов.
Мы можем настроить детали сервера RabbitMQ в application.properties следующим образом:
1
2
3
4
|
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest |
Давайте создадим Spring Bean OrderMessageSender, чтобы отправить сообщение на orders-exchange.
Spring Boot автоматически настраивает компоненты инфраструктуры, необходимые для отправки / получения сообщений в / из брокера RabbitMQ. Мы можем просто автоматически соединить RabbitTemplate и отправить сообщение, вызвав метод rabbitTemplate.convertAndSend («routingKey», Object) .
1
2
3
4
5
6
7
|
public class Order implements Serializable { private String orderNumber; private String productId; private double amount; //setters & getters } |
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class OrderMessageSender { private final RabbitTemplate rabbitTemplate; @Autowired public OrderMessageSender(RabbitTemplate rabbitTemplate) { this .rabbitTemplate = rabbitTemplate; } public void sendOrder(Order order) { this .rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order); } } |
По умолчанию Spring Boot использует org.springframework.amqp.support.converter.SimpleMessageConverter и сериализует объект в byte [] .
Теперь, когда эта конфигурация установлена, мы можем отправить сообщение в очередь заказов RabbitMQ, вызвав метод OrderMessageSender.sendOrder (Order) .
После отправки сообщения вы можете просмотреть сообщение из приложения пользовательского интерфейса администрирования, войдя в систему с учетными данными гостя / гостя . Вы можете щелкнуть вкладки Обмены / Очереди, чтобы увидеть, как были созданы ордера обмена и ордера . Вы также можете проверить привязки для обмена заказов, который выглядит следующим образом:
Теперь перейдите на вкладку Очереди и нажмите на очередь заказов. Прокрутите вниз до раздела « Получить сообщения » и, нажав на кнопку « Получить сообщение» , вы сможете просмотреть содержимое сообщения.
Теперь давайте создадим слушатель очереди заказов с помощью @RabbitListener .
Создайте Spring Bean OrderMessageListener следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class OrderMessageListener { static final Logger logger = LoggerFactory.getLogger(OrderMessageListener. class ); @RabbitListener (queues = RabbitConfig.QUEUE_ORDERS) public void processOrder(Order order) { logger.info( "Order Received: " +order); } } |
Это оно!! Просто добавив @RabbitListener и определив, какую очередь слушать, мы можем создать Listener.
Теперь, если вы отправляете сообщение в очередь заказов, которое должно быть использовано методом OrderMessageListener.processOrder (), и вы должны увидеть запись в журнале «Заказ получен:«.
Отправка и получение сообщений как полезных нагрузок JSON
Как мы видели, механизм сериализации по умолчанию преобразует объект сообщения в byte [], используя SimpleMessageConverter, и на принимающей стороне он десериализует byte [] в тип Object (в нашем случае Order), используя GenericMessageConverter .
Чтобы изменить это поведение, нам нужно настроить автоматически настраиваемые bean-компоненты Spring Boot RabbitMQ.
Отправить сообщение как JSON
Одним из быстрых способов отправки сообщения в виде полезной нагрузки JSON является использование ObjectMapper. Мы можем преобразовать объект Order в JSON и отправить его.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
@Autowired private ObjectMapper objectMapper; public void sendOrder(Order order) { try { String orderJson = objectMapper.writeValueAsString(order); Message message = MessageBuilder .withBody(orderJson.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .build(); this .rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, message); } catch (JsonProcessingException e) { e.printStackTrace(); } } |
Но преобразование объектов в JSON, как это, является своего рода образцом. Вместо этого мы можем следовать нижеприведенному подходу.
Мы можем настроить bean- компонент org.springframework.amqp.support.converter.Jackson2JsonMessageConverter для использования RabbitTemplate так, чтобы сообщение было сериализовано как JSON, а не byte [].
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
@Configuration public class RabbitConfig { ... ... @Bean public RabbitTemplate rabbitTemplate( final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(producerJackson2MessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter producerJackson2MessageConverter() { return new Jackson2JsonMessageConverter(); } } |
Теперь, когда вы отправляете сообщение, оно будет преобразовано в JSON и отправлено в очередь.
Получить сообщение как JSON
Чтобы обработать полезную нагрузку сообщения как JSON, мы должны настроить конфигурацию RabbitMQ, реализовав RabbitListenerConfigurer .
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@Configuration public class RabbitConfig implements RabbitListenerConfigurer { ... ... @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); } } |
Обработка ошибок и недействительных сообщений с помощью DeadLetterQueues (DLQ)
Мы можем захотеть отправить недопустимые сообщения в отдельную очередь, чтобы мы могли проверить и обработать их позже. Мы можем использовать концепцию DLQ, чтобы автоматически сделать это, вместо того, чтобы вручную писать код для обработки таких сценариев.
Мы можем объявить dead-letter-exchange , dead-letter-routing-key для Queue при определении bean-компонента Queue следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Configuration public class RabbitConfig implements RabbitListenerConfigurer { public static final String QUEUE_ORDERS = "orders-queue" ; public static final String EXCHANGE_ORDERS = "orders-exchange" ; public static final String QUEUE_DEAD_ORDERS = "dead-orders-queue" ; @Bean Queue ordersQueue() { return QueueBuilder.durable(QUEUE_ORDERS) .withArgument( "x-dead-letter-exchange" , "" ) .withArgument( "x-dead-letter-routing-key" , QUEUE_DEAD_ORDERS) .withArgument( "x-message-ttl" , 15000 ) //if message is not consumed in 15 seconds send to DLQ .build(); } @Bean Queue deadLetterQueue() { return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build(); } ... ... } |
Теперь попробуйте отправить недопустимое сообщение JSON в очередь заказов, оно будет отправлено в очередь мертвых заказов.
Вы можете найти исходный код этой статьи по адресу https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo
Опубликовано на Java Code Geeks с разрешения Сивы Редди, партнера нашей программы JCG . Смотрите оригинальную статью здесь: SpringBoot Messaging с RabbitMQ
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |