1. Введение
В этом посте объясняется, как сохранять и извлекать объекты из базы данных MongoDB с помощью Spring Integration. Для этого мы настроим адаптеры входящих и исходящих каналов MongoDB с использованием расширения конфигурации Java DSL. В качестве примера мы собираемся создать приложение, которое позволит вам записывать заказы в хранилище MongoDB, а затем извлекать их для обработки.
Поток приложения можно разделить на две части:
- Новые заказы отправляются в систему обмена сообщениями, где они будут преобразованы в реальные продукты и затем сохранены в MongoDB.
- С другой стороны, другой компонент постоянно опрашивает базу данных и обрабатывает любой новый продукт, который он находит.
Исходный код можно найти в моем репозитории Spring Integration .
2 MessagingGateway — вход в систему обмена сообщениями
Наше приложение ничего не знает о системе обмена сообщениями. Фактически, он просто создаст новые заказы и отправит их в интерфейс (OrderService):
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
@SpringBootApplication@EnableIntegrationpublic class MongodbBasicApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(MongodbBasicApplication.class, args); new MongodbBasicApplication().start(context); } public void start(ConfigurableApplicationContext context) { resetDatabase(context); Order order1 = new Order("1", true); Order order2 = new Order("2", false); Order order3 = new Order("3", true); InfrastructureConfiguration.OrderService orderService = context.getBean(InfrastructureConfiguration.OrderService.class); orderService.order(order1); orderService.order(order2); orderService.order(order3); } private void resetDatabase(ConfigurableApplicationContext context) { ProductRepository productRepository = context.getBean(ProductRepository.class); productRepository.deleteAll(); }} |
При первоначальном рассмотрении конфигурации мы видим, что на самом деле OrderService является шлюзом обмена сообщениями.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
@Configuration@ComponentScan("xpadro.spring.integration.endpoint")@IntegrationComponentScan("xpadro.spring.integration.mongodb")public class InfrastructureConfiguration { @MessagingGateway public interface OrderService { @Gateway(requestChannel = "sendOrder.input") void order(Order order); } ...} |
Любой заказ, отправленный методу заказа, будет представлен в системе обмена сообщениями в виде сообщения <Order> через прямой канал sendOrder.input.
3 Первая часть — обработка заказов
Первая часть потока сообщений Spring Integration состоит из следующих компонентов:
Мы используем лямбду для создания определения IntegrationFlow, которое регистрирует DirectChannel в качестве входного канала. Имя входного канала разрешается как «beanName + .input». Следовательно, имя является тем, которое мы указали в шлюзе: ‘sendOrder.input’
|
1
2
3
4
5
6
7
|
@Bean@Autowiredpublic IntegrationFlow sendOrder(MongoDbFactory mongo) { return f -> f .transform(Transformers.converter(orderToProductConverter())) .handle(mongoOutboundAdapter(mongo));} |
Первое, что делает поток при получении нового заказа, — использует преобразователь, чтобы преобразовать заказ в продукт. Для регистрации трансформатора мы можем использовать фабрику Transformers, предоставляемую DSL API. Здесь у нас разные возможности. Тот, который я выбрал, использует PayloadTypeConvertingTransformer , который делегирует преобразователю преобразование полезной нагрузки в объект.
|
1
2
3
4
5
6
7
|
public class OrderToProductConverter implements Converter<Order, Product> { @Override public Product convert(Order order) { return new Product(order.getId(), order.isPremium()); }} |
Следующим шагом в потоке заказов является сохранение вновь созданного продукта в базе данных. Здесь мы используем исходящий адаптер MongoDB:
|
1
2
3
4
5
6
7
|
@Bean@Autowiredpublic MessageHandler mongoOutboundAdapter(MongoDbFactory mongo) { MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongo); mongoHandler.setCollectionNameExpression(new LiteralExpression("product")); return mongoHandler;} |
Если вам интересно, что на самом деле делает обработчик сообщений, он использует mongoTemplate для сохранения сущности:
|
1
2
3
4
5
6
7
|
@Overrideprotected void handleMessageInternal(Message<?> message) throws Exception { String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class); Object payload = message.getPayload(); this.mongoTemplate.save(payload, collectionName);} |
4 Вторая часть — переработка продуктов
Во второй части у нас есть еще один процесс интеграции продуктов обработки:
Чтобы получить ранее созданные продукты, мы определили адаптер входящего канала, который будет постоянно опрашивать базу данных MongoDB:
|
1
2
3
4
5
6
7
8
|
@Bean@Autowiredpublic IntegrationFlow processProduct(MongoDbFactory mongo) { return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(Pollers.fixedDelay(3, TimeUnit.SECONDS))) .route(Product::isPremium, this::routeProducts) .handle(mongoOutboundAdapter(mongo)) .get();} |
Адаптер входящего канала MongoDB отвечает за опрос продуктов из базы данных. Уточняем запрос в конструкторе. В этом случае мы опрашиваем один необработанный продукт каждый раз:
|
01
02
03
04
05
06
07
08
09
10
|
@Bean@Autowiredpublic MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) { MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{'processed' : false}")); messageSource.setExpectSingleResult(true); messageSource.setEntityClass(Product.class); messageSource.setCollectionNameExpression(new LiteralExpression("product")); return messageSource;} |
Определение маршрутизатора показывает, как продукт отправляется в другой метод активации службы в зависимости от поля «premium»:
|
1
2
3
4
5
|
private RouterSpec<Boolean, MethodInvokingRouter> routeProducts(RouterSpec<Boolean, MethodInvokingRouter> mapping) { return mapping .subFlowMapping(true, sf -> sf.handle(productProcessor(), "fastProcess")) .subFlowMapping(false, sf -> sf.handle(productProcessor(), "process"));} |
Как активатор сервиса, у нас есть простой компонент, который регистрирует сообщение и устанавливает продукт как обработанный. Затем он вернет продукт, чтобы он мог быть обработан следующей конечной точкой в потоке.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
public class ProductProcessor { public Product process(Product product) { return doProcess(product, String.format("Processing product %s", product.getId())); } public Product fastProcess(Product product) { return doProcess(product, String.format("Fast processing product %s", product.getId())); } private Product doProcess(Product product, String message) { System.out.println(message); product.setProcessed(true); return product; }} |
Причина установки продукта как обработанного заключается в том, что следующим шагом является обновление его статуса в базе данных, чтобы не опрашивать его снова. Мы сохраняем его, перенаправляя поток на адаптер исходящего канала mongoDb.
5. Заключение
Вы видели, какие конечные точки вы должны использовать для взаимодействия с базой данных MongoDB с помощью Spring Integration. Адаптер исходящего канала пассивно сохраняет продукты в базе данных, в то время как адаптер входящего канала активно опрашивает базу данных для получения новых продуктов.
Если вы нашли этот пост полезным, поделитесь им или пометьте мой репозиторий. Я ценю его 🙂
Я публикую свои новые сообщения в Google Plus и Twitter. Следуйте за мной, если вы хотите быть в курсе нового контента.
| Ссылка: | Spring Integration Переходники MongoDB с Java DSL от нашего партнера по |

