1. Введение
В этом посте объясняется, как сохранять и извлекать объекты из базы данных MongoDB с помощью Spring Integration. Для этого мы настроим адаптеры входящих и исходящих каналов MongoDB с использованием расширения конфигурации Java DSL. В качестве примера мы собираемся создать приложение, которое позволит вам записывать заказы в хранилище MongoDB, а затем извлекать их для обработки.
Поток приложения можно разделить на две части:
- Новые заказы отправляются в систему обмена сообщениями, где они будут преобразованы в реальные продукты и затем сохранены в MongoDB.
- С другой стороны, другой компонент постоянно опрашивает базу данных и обрабатывает любой новый продукт, который он находит.
Исходный код можно найти в моем репозитории Spring Integration .
2 MessagingGateway — вход в систему обмена сообщениями
Наше приложение ничего не знает о системе обмена сообщениями. Фактически, он просто создаст новые заказы и отправит их в интерфейс (OrderService):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
@SpringBootApplication @EnableIntegration public class MongodbBasicApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(MongodbBasicApplication. class , args); new MongodbBasicApplication().start(context); } public void start(ConfigurableApplicationContext context) { resetDatabase(context); Order order1 = new Order( "1" , true ); Order order2 = new Order( "2" , false ); Order order3 = new Order( "3" , true ); InfrastructureConfiguration.OrderService orderService = context.getBean(InfrastructureConfiguration.OrderService. class ); orderService.order(order1); orderService.order(order2); orderService.order(order3); } private void resetDatabase(ConfigurableApplicationContext context) { ProductRepository productRepository = context.getBean(ProductRepository. class ); productRepository.deleteAll(); } } |
При первоначальном рассмотрении конфигурации мы видим, что на самом деле OrderService является шлюзом обмена сообщениями.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
@Configuration @ComponentScan ( "xpadro.spring.integration.endpoint" ) @IntegrationComponentScan ( "xpadro.spring.integration.mongodb" ) public class InfrastructureConfiguration { @MessagingGateway public interface OrderService { @Gateway (requestChannel = "sendOrder.input" ) void order(Order order); } ... } |
Любой заказ, отправленный методу заказа, будет представлен в системе обмена сообщениями в виде сообщения <Order> через прямой канал sendOrder.input.
3 Первая часть — обработка заказов
Первая часть потока сообщений Spring Integration состоит из следующих компонентов:
Мы используем лямбду для создания определения IntegrationFlow, которое регистрирует DirectChannel в качестве входного канала. Имя входного канала разрешается как «beanName + .input». Следовательно, имя является тем, которое мы указали в шлюзе: ‘sendOrder.input’
1
2
3
4
5
6
7
|
@Bean @Autowired public IntegrationFlow sendOrder(MongoDbFactory mongo) { return f -> f .transform(Transformers.converter(orderToProductConverter())) .handle(mongoOutboundAdapter(mongo)); } |
Первое, что делает поток при получении нового заказа, — использует преобразователь, чтобы преобразовать заказ в продукт. Для регистрации трансформатора мы можем использовать фабрику Transformers, предоставляемую DSL API. Здесь у нас разные возможности. Тот, который я выбрал, использует PayloadTypeConvertingTransformer , который делегирует преобразователю преобразование полезной нагрузки в объект.
1
2
3
4
5
6
7
|
public class OrderToProductConverter implements Converter<Order, Product> { @Override public Product convert(Order order) { return new Product(order.getId(), order.isPremium()); } } |
Следующим шагом в потоке заказов является сохранение вновь созданного продукта в базе данных. Здесь мы используем исходящий адаптер MongoDB:
1
2
3
4
5
6
7
|
@Bean @Autowired public MessageHandler mongoOutboundAdapter(MongoDbFactory mongo) { MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongo); mongoHandler.setCollectionNameExpression( new LiteralExpression( "product" )); return mongoHandler; } |
Если вам интересно, что на самом деле делает обработчик сообщений, он использует mongoTemplate для сохранения сущности:
1
2
3
4
5
6
7
|
@Override protected void handleMessageInternal(Message<?> message) throws Exception { String collectionName = this .collectionNameExpression.getValue( this .evaluationContext, message, String. class ); Object payload = message.getPayload(); this .mongoTemplate.save(payload, collectionName); } |
4 Вторая часть — переработка продуктов
Во второй части у нас есть еще один процесс интеграции продуктов обработки:
Чтобы получить ранее созданные продукты, мы определили адаптер входящего канала, который будет постоянно опрашивать базу данных MongoDB:
1
2
3
4
5
6
7
8
|
@Bean @Autowired public IntegrationFlow processProduct(MongoDbFactory mongo) { return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(Pollers.fixedDelay( 3 , TimeUnit.SECONDS))) .route(Product::isPremium, this ::routeProducts) .handle(mongoOutboundAdapter(mongo)) .get(); } |
Адаптер входящего канала MongoDB отвечает за опрос продуктов из базы данных. Уточняем запрос в конструкторе. В этом случае мы опрашиваем один необработанный продукт каждый раз:
01
02
03
04
05
06
07
08
09
10
|
@Bean @Autowired public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) { MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression( "{'processed' : false}" )); messageSource.setExpectSingleResult( true ); messageSource.setEntityClass(Product. class ); messageSource.setCollectionNameExpression( new LiteralExpression( "product" )); return messageSource; } |
Определение маршрутизатора показывает, как продукт отправляется в другой метод активации службы в зависимости от поля «premium»:
1
2
3
4
5
|
private RouterSpec<Boolean, MethodInvokingRouter> routeProducts(RouterSpec<Boolean, MethodInvokingRouter> mapping) { return mapping .subFlowMapping( true , sf -> sf.handle(productProcessor(), "fastProcess" )) .subFlowMapping( false , sf -> sf.handle(productProcessor(), "process" )); } |
Как активатор сервиса, у нас есть простой компонент, который регистрирует сообщение и устанавливает продукт как обработанный. Затем он вернет продукт, чтобы он мог быть обработан следующей конечной точкой в потоке.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
public class ProductProcessor { public Product process(Product product) { return doProcess(product, String.format( "Processing product %s" , product.getId())); } public Product fastProcess(Product product) { return doProcess(product, String.format( "Fast processing product %s" , product.getId())); } private Product doProcess(Product product, String message) { System.out.println(message); product.setProcessed( true ); return product; } } |
Причина установки продукта как обработанного заключается в том, что следующим шагом является обновление его статуса в базе данных, чтобы не опрашивать его снова. Мы сохраняем его, перенаправляя поток на адаптер исходящего канала mongoDb.
5. Заключение
Вы видели, какие конечные точки вы должны использовать для взаимодействия с базой данных MongoDB с помощью Spring Integration. Адаптер исходящего канала пассивно сохраняет продукты в базе данных, в то время как адаптер входящего канала активно опрашивает базу данных для получения новых продуктов.
Если вы нашли этот пост полезным, поделитесь им или пометьте мой репозиторий. Я ценю его 🙂
Я публикую свои новые сообщения в Google Plus и Twitter. Следуйте за мной, если вы хотите быть в курсе нового контента.
Ссылка: | Spring Integration Переходники MongoDB с Java DSL от нашего партнера по |