RabbitMQ является одним из популярных решений для брокеров сообщений и предоставляет клиентские библиотеки для использования на различных языках программирования, включая Java, Scala, .NET, Go, Python, Ruby, PHP и т. Д. В этом руководстве мы узнаем, как использовать брокер сообщений RabbitMQ. отправлять и получать сообщения из приложения SpringBoot. Мы также рассмотрим, как отправлять сообщения в виде полезных нагрузок JSON и как обрабатывать ошибки, используя Dead Letter Queues (DLQ).
Сначала установите сервер RabbitMQ на свой локальный компьютер, как описано здесь https://www.rabbitmq.com/download.html, или запустите его в качестве образа Docker со следующим docker-compose.yml .
|
1
2
3
4
5
6
7
8
9
|
version: '3'services: rabbitmq: container_name: rabbitmq image: 'rabbitmq:management' ports: - "5672:5672" - "15672:15672" |
Теперь вы можете запустить RabbitMQ с помощью docker-compose и запустить интерфейс администрирования по адресу http: // localhost: 15672 /.
Если вы знакомы с другими брокерами обмена сообщениями, такими как ActiveMQ, мы обычно используем очереди и темы для отправки одноранговой и паб-суб-модели общения. В RabbitMQ мы отправляем сообщения в Exchange, и в зависимости от ключа маршрутизации это сообщение будет перенаправлено в очередь (и). Вы можете прочитать больше о концепциях RabbitMQ здесь https://www.rabbitmq.com/tutorials/amqp-concepts.html .
Вы можете найти исходный код этой статьи по адресу https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo
Приложение SpringBoot с RabbitMQ
Теперь давайте создадим приложение SpringBoot из http://start.spring.io/, выбрав стартеры Web , Thymeleaf и RabbitMQ .
pom.xml
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sivalabs</groupId> <artifactId>springboot-rabbitmq-demo</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RC1</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> </dependencies> </project> |
Давайте начнем с конфигурации RabbitMQ. Создайте класс конфигурации RabbitConfig и определите компоненты Queue , Exchange и Binding следующим образом:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration; @Configurationpublic class RabbitConfig { public static final String QUEUE_ORDERS = "orders-queue"; public static final String EXCHANGE_ORDERS = "orders-exchange"; @Bean Queue ordersQueue() { return QueueBuilder.durable(QUEUE_ORDERS).build(); } @Bean Queue deadLetterQueue() { return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build(); } @Bean Exchange ordersExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build(); } @Bean Binding binding(Queue ordersQueue, TopicExchange ordersExchange) { return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS); }} |
Здесь мы объявляем Очередь с именами -очередями и Exchange с именами -обменами .
Мы также определили привязку между очередями заказов и обменом заказов, чтобы любое сообщение, отправленное на обмен заказами с ключом маршрутизации в виде «очереди заказов» , было отправлено в очередь заказов.
Мы можем настроить детали сервера RabbitMQ в application.properties следующим образом:
|
1
2
3
4
|
spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest |
Давайте создадим Spring Bean OrderMessageSender, чтобы отправить сообщение на orders-exchange.
Spring Boot автоматически настраивает компоненты инфраструктуры, необходимые для отправки / получения сообщений в / из брокера RabbitMQ. Мы можем просто автоматически соединить RabbitTemplate и отправить сообщение, вызвав метод rabbitTemplate.convertAndSend («routingKey», Object) .
|
1
2
3
4
5
6
7
|
public class Order implements Serializable { private String orderNumber; private String productId; private double amount; //setters & getters} |
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service; @Servicepublic class OrderMessageSender { private final RabbitTemplate rabbitTemplate; @Autowired public OrderMessageSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendOrder(Order order) { this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order); }} |
По умолчанию Spring Boot использует org.springframework.amqp.support.converter.SimpleMessageConverter и сериализует объект в byte [] .
Теперь, когда эта конфигурация установлена, мы можем отправить сообщение в очередь заказов RabbitMQ, вызвав метод OrderMessageSender.sendOrder (Order) .
После отправки сообщения вы можете просмотреть сообщение из приложения пользовательского интерфейса администрирования, войдя в систему с учетными данными гостя / гостя . Вы можете щелкнуть вкладки Обмены / Очереди, чтобы увидеть, как были созданы ордера обмена и ордера . Вы также можете проверить привязки для обмена заказов, который выглядит следующим образом:
Теперь перейдите на вкладку Очереди и нажмите на очередь заказов. Прокрутите вниз до раздела « Получить сообщения » и, нажав на кнопку « Получить сообщение» , вы сможете просмотреть содержимое сообщения.
Теперь давайте создадим слушатель очереди заказов с помощью @RabbitListener .
Создайте Spring Bean OrderMessageListener следующим образом:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component; @Componentpublic class OrderMessageListener { static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class); @RabbitListener(queues = RabbitConfig.QUEUE_ORDERS) public void processOrder(Order order) { logger.info("Order Received: "+order); }} |
Это оно!! Просто добавив @RabbitListener и определив, какую очередь слушать, мы можем создать Listener.
Теперь, если вы отправляете сообщение в очередь заказов, которое должно быть использовано методом OrderMessageListener.processOrder (), и вы должны увидеть запись в журнале «Заказ получен:«.
Отправка и получение сообщений как полезных нагрузок JSON
Как мы видели, механизм сериализации по умолчанию преобразует объект сообщения в byte [], используя SimpleMessageConverter, и на принимающей стороне он десериализует byte [] в тип Object (в нашем случае Order), используя GenericMessageConverter .
Чтобы изменить это поведение, нам нужно настроить автоматически настраиваемые bean-компоненты Spring Boot RabbitMQ.
Отправить сообщение как JSON
Одним из быстрых способов отправки сообщения в виде полезной нагрузки JSON является использование ObjectMapper. Мы можем преобразовать объект Order в JSON и отправить его.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
@Autowiredprivate ObjectMapper objectMapper; public void sendOrder(Order order) { try { String orderJson = objectMapper.writeValueAsString(order); Message message = MessageBuilder .withBody(orderJson.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .build(); this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, message); } catch (JsonProcessingException e) { e.printStackTrace(); }} |
Но преобразование объектов в JSON, как это, является своего рода образцом. Вместо этого мы можем следовать нижеприведенному подходу.
Мы можем настроить bean- компонент org.springframework.amqp.support.converter.Jackson2JsonMessageConverter для использования RabbitTemplate так, чтобы сообщение было сериализовано как JSON, а не byte [].
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
@Configurationpublic class RabbitConfig { ... ... @Bean public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(producerJackson2MessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter producerJackson2MessageConverter() { return new Jackson2JsonMessageConverter(); }} |
Теперь, когда вы отправляете сообщение, оно будет преобразовано в JSON и отправлено в очередь.
Получить сообщение как JSON
Чтобы обработать полезную нагрузку сообщения как JSON, мы должны настроить конфигурацию RabbitMQ, реализовав RabbitListenerConfigurer .
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@Configurationpublic class RabbitConfig implements RabbitListenerConfigurer { ... ... @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); }} |
Обработка ошибок и недействительных сообщений с помощью DeadLetterQueues (DLQ)
Мы можем захотеть отправить недопустимые сообщения в отдельную очередь, чтобы мы могли проверить и обработать их позже. Мы можем использовать концепцию DLQ, чтобы автоматически сделать это, вместо того, чтобы вручную писать код для обработки таких сценариев.
Мы можем объявить dead-letter-exchange , dead-letter-routing-key для Queue при определении bean-компонента Queue следующим образом:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Configurationpublic class RabbitConfig implements RabbitListenerConfigurer { public static final String QUEUE_ORDERS = "orders-queue"; public static final String EXCHANGE_ORDERS = "orders-exchange"; public static final String QUEUE_DEAD_ORDERS = "dead-orders-queue"; @Bean Queue ordersQueue() { return QueueBuilder.durable(QUEUE_ORDERS) .withArgument("x-dead-letter-exchange", "") .withArgument("x-dead-letter-routing-key", QUEUE_DEAD_ORDERS) .withArgument("x-message-ttl", 15000) //if message is not consumed in 15 seconds send to DLQ .build(); } @Bean Queue deadLetterQueue() { return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build(); } ... ...} |
Теперь попробуйте отправить недопустимое сообщение JSON в очередь заказов, оно будет отправлено в очередь мертвых заказов.
Вы можете найти исходный код этой статьи по адресу https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo
| Опубликовано на Java Code Geeks с разрешения Сивы Редди, партнера нашей программы JCG . Смотрите оригинальную статью здесь: SpringBoot Messaging с RabbitMQ
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |
