Статьи

Обмен сообщениями SpringBoot с RabbitMQ

RabbitMQ является одним из популярных решений для брокеров сообщений и предоставляет клиентские библиотеки для использования на различных языках программирования, включая Java, Scala, .NET, Go, Python, Ruby, PHP и т. Д. В этом руководстве мы узнаем, как использовать брокер сообщений RabbitMQ. отправлять и получать сообщения из приложения SpringBoot. Мы также рассмотрим, как отправлять сообщения в виде полезных нагрузок JSON и как обрабатывать ошибки, используя Dead Letter Queues (DLQ).

Сначала установите сервер RabbitMQ на свой локальный компьютер, как описано здесь https://www.rabbitmq.com/download.html, или запустите его в качестве образа Docker со следующим docker-compose.yml .

1
2
3
4
5
6
7
8
9
version: '3'
services:
  
  rabbitmq:
    container_name: rabbitmq
    image: 'rabbitmq:management'
    ports:
      - "5672:5672"
      - "15672:15672"

Теперь вы можете запустить RabbitMQ с помощью docker-compose и запустить интерфейс администрирования по адресу http: // localhost: 15672 /.

Если вы знакомы с другими брокерами обмена сообщениями, такими как ActiveMQ, мы обычно используем очереди и темы для отправки одноранговой и паб-суб-модели общения. В RabbitMQ мы отправляем сообщения в Exchange, и в зависимости от ключа маршрутизации это сообщение будет перенаправлено в очередь (и). Вы можете прочитать больше о концепциях RabbitMQ здесь https://www.rabbitmq.com/tutorials/amqp-concepts.html .

Вы можете найти исходный код этой статьи по адресу https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo

Приложение SpringBoot с RabbitMQ

Теперь давайте создадим приложение SpringBoot из http://start.spring.io/, выбрав стартеры Web , Thymeleaf и RabbitMQ .

pom.xml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xml version="1.0" encoding="UTF-8"?>
    <modelVersion>4.0.0</modelVersion>
  
    <groupId>com.sivalabs</groupId>
    <artifactId>springboot-rabbitmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
  
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RC1</version>
        <relativePath/>
    </parent>
  
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
  
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
    </dependencies>
      
</project>

Давайте начнем с конфигурации RabbitMQ. Создайте класс конфигурации RabbitConfig и определите компоненты Queue , Exchange и Binding следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
  
@Configuration
public class RabbitConfig 
{
    public static final String QUEUE_ORDERS = "orders-queue";
    public static final String EXCHANGE_ORDERS = "orders-exchange";
  
    @Bean
    Queue ordersQueue() {
        return QueueBuilder.durable(QUEUE_ORDERS).build();
    }
  
    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();
    }
  
    @Bean
    Exchange ordersExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build();
    }
  
    @Bean
    Binding binding(Queue ordersQueue, TopicExchange ordersExchange) {
        return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS);
    }
}

Здесь мы объявляем Очередь с именами -очередями и Exchange с именами -обменами .
Мы также определили привязку между очередями заказов и обменом заказов, чтобы любое сообщение, отправленное на обмен заказами с ключом маршрутизации в виде «очереди заказов» , было отправлено в очередь заказов.

Мы можем настроить детали сервера RabbitMQ в application.properties следующим образом:

1
2
3
4
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

Давайте создадим Spring Bean OrderMessageSender, чтобы отправить сообщение на orders-exchange.

Spring Boot автоматически настраивает компоненты инфраструктуры, необходимые для отправки / получения сообщений в / из брокера RabbitMQ. Мы можем просто автоматически соединить RabbitTemplate и отправить сообщение, вызвав метод rabbitTemplate.convertAndSend («routingKey», Object) .

1
2
3
4
5
6
7
public class Order implements Serializable {
    private String orderNumber;
    private String productId;
    private double amount;
  
    //setters & getters
}
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
  
@Service
public class OrderMessageSender {
    private final RabbitTemplate rabbitTemplate;
  
    @Autowired
    public OrderMessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
  
    public void sendOrder(Order order) {
        this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order);
    }
}

По умолчанию Spring Boot использует org.springframework.amqp.support.converter.SimpleMessageConverter и сериализует объект в byte [] .

Теперь, когда эта конфигурация установлена, мы можем отправить сообщение в очередь заказов RabbitMQ, вызвав метод OrderMessageSender.sendOrder (Order) .

После отправки сообщения вы можете просмотреть сообщение из приложения пользовательского интерфейса администрирования, войдя в систему с учетными данными гостя / гостя . Вы можете щелкнуть вкладки Обмены / Очереди, чтобы увидеть, как были созданы ордера обмена и ордера . Вы также можете проверить привязки для обмена заказов, который выглядит следующим образом:

Теперь перейдите на вкладку Очереди и нажмите на очередь заказов. Прокрутите вниз до раздела « Получить сообщения » и, нажав на кнопку « Получить сообщение» , вы сможете просмотреть содержимое сообщения.

Теперь давайте создадим слушатель очереди заказов с помощью @RabbitListener .

Создайте Spring Bean OrderMessageListener следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
  
@Component
public class OrderMessageListener {
  
    static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);
  
    @RabbitListener(queues = RabbitConfig.QUEUE_ORDERS)
    public void processOrder(Order order) {
        logger.info("Order Received: "+order);
    }
}

Это оно!! Просто добавив @RabbitListener и определив, какую очередь слушать, мы можем создать Listener.

Теперь, если вы отправляете сообщение в очередь заказов, которое должно быть использовано методом OrderMessageListener.processOrder (), и вы должны увидеть запись в журнале «Заказ получен:«.

Отправка и получение сообщений как полезных нагрузок JSON

Как мы видели, механизм сериализации по умолчанию преобразует объект сообщения в byte [], используя SimpleMessageConverter, и на принимающей стороне он десериализует byte [] в тип Object (в нашем случае Order), используя GenericMessageConverter .

Чтобы изменить это поведение, нам нужно настроить автоматически настраиваемые bean-компоненты Spring Boot RabbitMQ.

Отправить сообщение как JSON

Одним из быстрых способов отправки сообщения в виде полезной нагрузки JSON является использование ObjectMapper. Мы можем преобразовать объект Order в JSON и отправить его.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
@Autowired
private ObjectMapper objectMapper;
  
public void sendOrder(Order order) {
    try {
        String orderJson = objectMapper.writeValueAsString(order);
        Message message = MessageBuilder
                            .withBody(orderJson.getBytes())
                            .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                            .build();
        this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, message);
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
}

Но преобразование объектов в JSON, как это, является своего рода образцом. Вместо этого мы можем следовать нижеприведенному подходу.

Мы можем настроить bean- компонент org.springframework.amqp.support.converter.Jackson2JsonMessageConverter для использования RabbitTemplate так, чтобы сообщение было сериализовано как JSON, а не byte [].

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
@Configuration
public class RabbitConfig
{
    ...
    ...
  
    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }
  
    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

Теперь, когда вы отправляете сообщение, оно будет преобразовано в JSON и отправлено в очередь.

Получить сообщение как JSON

Чтобы обработать полезную нагрузку сообщения как JSON, мы должны настроить конфигурацию RabbitMQ, реализовав RabbitListenerConfigurer .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {
    ...
    ...
    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }
  
    @Bean
    MessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
        return messageHandlerMethodFactory;
    }
  
    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }
}

Обработка ошибок и недействительных сообщений с помощью DeadLetterQueues (DLQ)

Мы можем захотеть отправить недопустимые сообщения в отдельную очередь, чтобы мы могли проверить и обработать их позже. Мы можем использовать концепцию DLQ, чтобы автоматически сделать это, вместо того, чтобы вручную писать код для обработки таких сценариев.

Мы можем объявить dead-letter-exchange , dead-letter-routing-key для Queue при определении bean-компонента Queue следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {
  
    public static final String QUEUE_ORDERS = "orders-queue";
    public static final String EXCHANGE_ORDERS = "orders-exchange";
    public static final String QUEUE_DEAD_ORDERS = "dead-orders-queue";
      
    @Bean
    Queue ordersQueue() {
  
        return QueueBuilder.durable(QUEUE_ORDERS)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", QUEUE_DEAD_ORDERS)
                .withArgument("x-message-ttl", 15000) //if message is not consumed in 15 seconds send to DLQ
                .build();
    }
  
    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();
    }
  
    ...
    ...
}

Теперь попробуйте отправить недопустимое сообщение JSON в очередь заказов, оно будет отправлено в очередь мертвых заказов.

Вы можете найти исходный код этой статьи по адресу https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo

Опубликовано на Java Code Geeks с разрешения Сивы Редди, партнера нашей программы JCG . Смотрите оригинальную статью здесь: SpringBoot Messaging с RabbitMQ

Мнения, высказанные участниками Java Code Geeks, являются их собственными.