В более ранней записи в блоге я затронул фиктивный поток rube goldberg для заглавной строки в сложной последовательности шагов. В этой статье я хотел представить Spring DS Integration Java в качестве альтернативы определению потоков интеграции через файлы конфигурации xml.
Я написал несколько новых вещей после написания этой записи в блоге, благодаря Артему Билану, и хотел задокументировать эти уроки здесь:
Итак, сначала мой оригинальный образец, здесь у меня есть следующий поток (выделенный жирным шрифтом):
- Примите сообщение такого типа — «привет из пружины целых»
- Разделите его на отдельные слова (привет, от, весна, целое)
- Отправить каждое слово в очередь ActiveMQ
- Подберите фрагменты слова из очереди и используйте каждое слово с заглавной буквы.
- Поместите ответ обратно в очередь ответов
- Возьмите сообщение, повторите последовательность на основе оригинальной последовательности слов
- Соберите обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
- Верните предложение обратно вызывающему приложению.
EchoFlowOutbound.java:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@Bean public DirectChannel sequenceChannel() { return new DirectChannel(); } @Bean public DirectChannel requestChannel() { return new DirectChannel(); } @Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from(requestChannel()) .split(s -> s.applySequence( true ).get().getT2().setDelimiters( "\\s" )) .handle(jmsOutboundGateway()) .get(); } @Bean public IntegrationFlow flowOnReturnOfMessage() { return IntegrationFlows.from(sequenceChannel()) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on( " " ).join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null ) .get(); } @Bean public JmsOutboundGateway jmsOutboundGateway() { JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway(); jmsOutboundGateway.setConnectionFactory( this .connectionFactory); jmsOutboundGateway.setRequestDestinationName( "amq.outbound" ); jmsOutboundGateway.setReplyChannel(sequenceChannel()); return jmsOutboundGateway; } |
На основании отзывов Артема Билана получается, что здесь можно оптимизировать несколько вещей.
Сначала обратите внимание, как я явно определил два прямых канала: «requestChannel» для запуска потока, который принимает строковое сообщение, и «sequenceChannel» для обработки сообщения, как только оно вернется из очереди сообщений jms, их можно полностью удалить и поток сделал немного более кратким этот путь:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from( "requestChannel" ) .split(s -> s.applySequence( true ).get().getT2().setDelimiters( "\\s" )) .handle(jmsOutboundGateway()) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on( " " ).join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null ) .get(); } @Bean public JmsOutboundGateway jmsOutboundGateway() { JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway(); jmsOutboundGateway.setConnectionFactory( this .connectionFactory); jmsOutboundGateway.setRequestDestinationName( "amq.outbound" ); return jmsOutboundGateway; } |
«RequestChannel» теперь создается неявно, просто объявив для него имя. Последовательность канала более интересна, цитируя Артема Билана —
не указывайте outputChannel для AbstractReplyProroductionMessageHandler и полагаться на DSL
это означает, что здесь jmsOutboundGateway является AbstractReplyProductivityMessageHandler, а его канал ответа неявно получен DSL. Кроме того, два метода, которые ранее обрабатывали потоки для отправки сообщения в очередь и затем продолжения после возвращения сообщения, объединяются в один. И ИМХО это действительно читается немного лучше из-за этого изменения.
Вторым хорошим изменением и темой этой статьи является введение фабрик пространства имен Jms. Когда я писал предыдущую статью в блоге, DSL поддерживала определение входящего / исходящего адаптера / шлюза AMQ, теперь есть поддержка входящего на основе Jms. / адаптер адаптер / шлюзы также, это еще больше упрощает поток, поток теперь выглядит так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
@Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from( "requestChannel" ) .split(s -> s.applySequence( true ).get().getT2().setDelimiters( "\\s" )) .handle(Jms.outboundGateway(connectionFactory) .requestDestination( "amq.outbound" )) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on( " " ).join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null ) .get(); } |
Входящая часть Jms-потока также упрощается до следующего:
1
2
3
4
5
6
7
|
@Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from(Jms.inboundGateway(connectionFactory) .destination( "amq.outbound" )) .transform((String s) -> s.toUpperCase()) .get(); } |
Таким образом, в заключение следует отметить, что Spring Integration Java DSL — это захватывающий новый способ точной настройки потоков Spring Integration. Это уже впечатляет тем, как упрощает читаемость потоков, а введение фабрик пространства имен Jms делает его еще более привлекательным для потоков на основе JMS.
- Я обновил пример приложения с изменениями, перечисленными в этой статье — https://github.com/bijukunjummen/rg-si.
Ссылка: | Образец Spring Integration Java DSL — дальнейшее упрощение благодаря фабрикам пространств имен Jms от нашего партнера по JCG Биджу Кунджуммена из блога all and sundry. |