Статьи

Адаптеры Spring Integration MongoDB с Java DSL

1. Введение

В этом посте объясняется, как сохранять и извлекать объекты из базы данных MongoDB с помощью Spring Integration. Для этого мы настроим адаптеры входящих и исходящих каналов MongoDB с использованием расширения конфигурации Java DSL. В качестве примера мы собираемся создать приложение, которое позволит вам записывать заказы в хранилище MongoDB, а затем извлекать их для обработки.

Поток приложения можно разделить на две части:

  • Новые заказы отправляются в систему обмена сообщениями, где они будут преобразованы в реальные продукты и затем сохранены в MongoDB.
  • С другой стороны, другой компонент постоянно опрашивает базу данных и обрабатывает любой новый продукт, который он находит.

Исходный код можно найти в моем репозитории Spring Integration .

2 MessagingGateway — вход в систему обмена сообщениями

Наше приложение ничего не знает о системе обмена сообщениями. Фактически, он просто создаст новые заказы и отправит их в интерфейс (OrderService):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@SpringBootApplication
@EnableIntegration
public class MongodbBasicApplication {
     
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(MongodbBasicApplication.class, args);
        new MongodbBasicApplication().start(context);
    }
     
    public void start(ConfigurableApplicationContext context) {
        resetDatabase(context);
         
        Order order1 = new Order("1", true);
        Order order2 = new Order("2", false);
        Order order3 = new Order("3", true);
         
        InfrastructureConfiguration.OrderService orderService = context.getBean(InfrastructureConfiguration.OrderService.class);
         
        orderService.order(order1);
        orderService.order(order2);
        orderService.order(order3);
    }
     
    private void resetDatabase(ConfigurableApplicationContext context) {
        ProductRepository productRepository = context.getBean(ProductRepository.class);
        productRepository.deleteAll();
    }
}

При первоначальном рассмотрении конфигурации мы видим, что на самом деле OrderService является шлюзом обмена сообщениями.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
@Configuration
@ComponentScan("xpadro.spring.integration.endpoint")
@IntegrationComponentScan("xpadro.spring.integration.mongodb")
public class InfrastructureConfiguration {
 
    @MessagingGateway
    public interface OrderService {
 
        @Gateway(requestChannel = "sendOrder.input")
        void order(Order order);
    }
     
    ...
}

Любой заказ, отправленный методу заказа, будет представлен в системе обмена сообщениями в виде сообщения <Order> через прямой канал sendOrder.input.

3 Первая часть — обработка заказов

Первая часть потока сообщений Spring Integration состоит из следующих компонентов:

flow_firstpart

Мы используем лямбду для создания определения IntegrationFlow, которое регистрирует DirectChannel в качестве входного канала. Имя входного канала разрешается как «beanName + .input». Следовательно, имя является тем, которое мы указали в шлюзе: ‘sendOrder.input’

1
2
3
4
5
6
7
@Bean
@Autowired
public IntegrationFlow sendOrder(MongoDbFactory mongo) {
    return f -> f
        .transform(Transformers.converter(orderToProductConverter()))
        .handle(mongoOutboundAdapter(mongo));
}

Первое, что делает поток при получении нового заказа, — использует преобразователь, чтобы преобразовать заказ в продукт. Для регистрации трансформатора мы можем использовать фабрику Transformers, предоставляемую DSL API. Здесь у нас разные возможности. Тот, который я выбрал, использует PayloadTypeConvertingTransformer , который делегирует преобразователю преобразование полезной нагрузки в объект.

1
2
3
4
5
6
7
public class OrderToProductConverter implements Converter<Order, Product> {
 
    @Override
    public Product convert(Order order) {
        return new Product(order.getId(), order.isPremium());
    }
}

Следующим шагом в потоке заказов является сохранение вновь созданного продукта в базе данных. Здесь мы используем исходящий адаптер MongoDB:

1
2
3
4
5
6
7
@Bean
@Autowired
public MessageHandler mongoOutboundAdapter(MongoDbFactory mongo) {
    MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongo);
    mongoHandler.setCollectionNameExpression(new LiteralExpression("product"));
    return mongoHandler;
}

Если вам интересно, что на самом деле делает обработчик сообщений, он использует mongoTemplate для сохранения сущности:

1
2
3
4
5
6
7
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
    String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);
    Object payload = message.getPayload();
     
    this.mongoTemplate.save(payload, collectionName);
}

4 Вторая часть — переработка продуктов

Во второй части у нас есть еще один процесс интеграции продуктов обработки:

flow_secondpart

Чтобы получить ранее созданные продукты, мы определили адаптер входящего канала, который будет постоянно опрашивать базу данных MongoDB:

1
2
3
4
5
6
7
8
@Bean
@Autowired
public IntegrationFlow processProduct(MongoDbFactory mongo) {
    return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(Pollers.fixedDelay(3, TimeUnit.SECONDS)))
        .route(Product::isPremium, this::routeProducts)
        .handle(mongoOutboundAdapter(mongo))
        .get();
}

Адаптер входящего канала MongoDB отвечает за опрос продуктов из базы данных. Уточняем запрос в конструкторе. В этом случае мы опрашиваем один необработанный продукт каждый раз:

01
02
03
04
05
06
07
08
09
10
@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
    MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'processed' : false}"));
    messageSource.setExpectSingleResult(true);
    messageSource.setEntityClass(Product.class);
    messageSource.setCollectionNameExpression(new LiteralExpression("product"));
     
    return messageSource;
}

Определение маршрутизатора показывает, как продукт отправляется в другой метод активации службы в зависимости от поля «premium»:

1
2
3
4
5
private RouterSpec<Boolean, MethodInvokingRouter> routeProducts(RouterSpec<Boolean, MethodInvokingRouter> mapping) {
    return mapping
        .subFlowMapping(true, sf -> sf.handle(productProcessor(), "fastProcess"))
        .subFlowMapping(false, sf -> sf.handle(productProcessor(), "process"));
}

Как активатор сервиса, у нас есть простой компонент, который регистрирует сообщение и устанавливает продукт как обработанный. Затем он вернет продукт, чтобы он мог быть обработан следующей конечной точкой в ​​потоке.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
public class ProductProcessor {
 
    public Product process(Product product) {
        return doProcess(product, String.format("Processing product %s", product.getId()));
    }
 
    public Product fastProcess(Product product) {
        return doProcess(product, String.format("Fast processing product %s", product.getId()));
    }
 
    private Product doProcess(Product product, String message) {
        System.out.println(message);
        product.setProcessed(true);
        return product;
    }
}

Причина установки продукта как обработанного заключается в том, что следующим шагом является обновление его статуса в базе данных, чтобы не опрашивать его снова. Мы сохраняем его, перенаправляя поток на адаптер исходящего канала mongoDb.

5. Заключение

Вы видели, какие конечные точки вы должны использовать для взаимодействия с базой данных MongoDB с помощью Spring Integration. Адаптер исходящего канала пассивно сохраняет продукты в базе данных, в то время как адаптер входящего канала активно опрашивает базу данных для получения новых продуктов.

Если вы нашли этот пост полезным, поделитесь им или пометьте мой репозиторий. Я ценю его 🙂

Я публикую свои новые сообщения в Google Plus и Twitter. Следуйте за мной, если вы хотите быть в курсе нового контента.