Статьи

Интеграция с RabbitMQ с использованием Spring Cloud Stream

В моем предыдущем посте я писал об очень простом сценарии интеграции между двумя системами — одна генерирует рабочий блок, а другая — этот рабочий блок, и как Spring Integration делает такую ​​интеграцию очень простой.

WorkUnitsFlow

Здесь я продемонстрирую, как этот сценарий интеграции может быть еще более упрощен с помощью Spring Cloud Stream.

У меня есть пример кода, доступный здесь — правильные maven-зависимости для Spring Cloud Stream доступны в pom.xml .

Режиссер

Итак, снова начнем с производителя, ответственного за создание рабочих единиц. Все, что нужно сделать с помощью кода, чтобы отправлять сообщения в RabbitMQ, это иметь конфигурацию Java по следующим направлениям:

1
2
3
4
@Configuration
@EnableBinding(WorkUnitsSource.class)
@IntegrationComponentScan
public class IntegrationConfiguration {}

Это выглядит обманчиво простым, но многое делает под прикрытием, из того, что я могу понять и почерпнуть из документации, вот что вызывает эта конфигурация:

1. Создаются каналы сообщений Spring Integration на основе классов, связанных с аннотацией @EnableBinding. Приведенный выше класс WorkUnitsSource является определением пользовательского канала с именем worksChannel и выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
 
public interface WorkUnitsSource {
 
    String CHANNEL_NAME = "worksChannel";
 
    @Output
    MessageChannel worksChannel();
 
}

2. Исходя из того, какая реализация «связующего» доступна во время выполнения (скажем, RabbitMQ, Kaffka, Redis, Gemfire), канал на предыдущем шаге будет связан с соответствующими структурами в системе — так, например, я хочу, чтобы мой « WorksChannel », чтобы в свою очередь отправлять сообщения в RabbitMQ, Spring Cloud Stream позаботится об автоматическом создании темы обмена в RabbitMQ.

Я хотел еще кое-что изменить с точки зрения того, как данные отправляются в RabbitMQ — в частности, я хотел, чтобы мои доменные объекты были сериализованы в json перед отправкой, и я хочу указать имя обмена RabbitMQ, на который отправляется полезная нагрузка, это управляется определенными конфигурациями, которые можно подключить к каналу следующим образом, используя файл yaml:

1
2
3
4
5
6
7
8
spring:
  cloud:
    stream:
      bindings:
        worksChannel:
          destination: work.exchange
          contentType: application/json
          group: testgroup

И последняя деталь — способ взаимодействия остальной части приложения с Spring Cloud Stream. Это можно сделать непосредственно в Spring Integration путем определения шлюза сообщений:

01
02
03
04
05
06
07
08
09
10
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import works.service.domain.WorkUnit;
 
@MessagingGateway
public interface WorkUnitGateway {
 @Gateway(requestChannel = WorkUnitsSource.CHANNEL_NAME)
 void generate(WorkUnit workUnit);
 
}

По сути, Spring Cloud Stream теперь подключает весь поток интеграции Spring, создает соответствующие структуры в RabbitMQ.

потребитель

По аналогии с источником, сначала я хочу определить канал с именем worksChannel, который будет обрабатывать входящее сообщение от RabbitMQ:

1
2
3
4
5
6
7
8
9
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
 
public interface WorkUnitsSink {
    String CHANNEL_NAME = "worksChannel";
 
    @Input
    SubscribableChannel worksChannel();
}

и пусть Spring Cloud Stream создаст каналы и привязки RabbitMQ на основе этого определения:

1
2
3
4
5
6
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Configuration;
 
@Configuration
@EnableBinding(WorkUnitsSink.class)
public class IntegrationConfiguration {}

Для обработки сообщений Spring Cloud Stream предоставляет прослушиватель, который можно создать следующим образом:

1
2
3
4
5
6
7
8
9
@Service
public class WorkHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);
 
    @StreamListener(WorkUnitsSink.CHANNEL_NAME)
    public void process(WorkUnit workUnit) {
        LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());
    }
}

И, наконец, конфигурация, которая соединяет этот канал с инфраструктурой RabbitMQ, выраженная в файле yaml:

1
2
3
4
5
6
7
spring:
  cloud:
    stream:
      bindings:
        worksChannel:
          destination: work.exchange
          group: testgroup

Теперь, если производитель и любое количество потребителей были запущены, сообщение, отправленное через производителя, будет отправлено в обмен темой Rabbit MQ в виде json, получено потребителем, десериализовано в объект и передано в рабочий процессор.

Значительное количество вспомогательного элемента, участвующего в создании инфраструктуры RabbitMQ, теперь обрабатывается исключительно конвенциями библиотек Spring Cloud Stream. Несмотря на то, что Spring Cloud Stream пытается предоставить представление о простой интеграции Spring, полезно иметь базовые знания по интеграции Spring для эффективного использования Spring Cloud Stream.

Пример, описанный здесь, доступен в моем репозитории github.

Ссылка: Интеграция с RabbitMQ с использованием Spring Cloud Stream от нашего партнера по JCG Бижу Кунджуммена из блога all and sundry.