Теперь для Spring Integration введен новый DSL на основе Java, который позволяет определять потоки сообщений Spring Integration с использованием чисто Java-конфигурации вместо использования Spring-XML-конфигурации.
Я попробовал DSL для образца потока интеграции, который у меня есть — я называю его потоком Рубе Голдберга , поскольку он следует извилистым путем, пытаясь использовать заглавную строку, переданную в качестве входных данных. Поток выглядит следующим образом и выполняет некоторые безумные вещи, выполняя простую задачу:
- Он принимает сообщение такого типа — «Привет от Spring Integ»
- разбивает его на отдельные слова (привет, от, весна, целое)
- отправляет каждое слово в очередь ActiveMQ
- из очереди фрагменты слова подбираются обогащателем, чтобы заглавные слова
- помещение ответа обратно в очередь ответов
- Он подобран, переупорядочен на основе оригинальной последовательности слов
- объединены обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
- вернулся к заявке.
Чтобы начать с Spring Integration Java DSL, простая конфигурация на основе Xml для использования заглавных букв в строке должна выглядеть следующим образом:
1
2
3
4
5
|
< channel id = "requestChannel" /> < gateway id = "echoGateway" service-interface = "rube.simple.EchoGateway" default-request-channel = "requestChannel" /> < transformer input-channel = "requestChannel" expression = "payload.toUpperCase()" /> |
Здесь ничего особенного не происходит, шлюз обмена сообщениями принимает сообщение, переданное из приложения, преобразует его в преобразователь, и оно возвращается обратно приложению.
Выражая это в Spring Integration Java DSL:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
@Configuration @EnableIntegration @IntegrationComponentScan @ComponentScan public class EchoFlow { @Bean public IntegrationFlow simpleEchoFlow() { return IntegrationFlows.from( "requestChannel" ) .transform((String s) -> s.toUpperCase()) .get(); } } @MessagingGateway public interface EchoGateway { @Gateway (requestChannel = "requestChannel" ) String echo(String message); } |
Обратите внимание, что аннотация @MessagingGateway не является частью Spring Integration Java DSL, она является существующим компонентом в Spring Integration и служит той же цели, что и компонент шлюза в конфигурации на основе XML. Мне нравится тот факт, что преобразование может быть выражено с помощью типов безопасных лямбда-выражений Java 8, а не выражения Spring-EL. Обратите внимание, что выражение преобразования могло быть закодировано довольно несколькими альтернативными способами:
1
|
??.transform((String s) -> s.toUpperCase()) |
Или же:
1
|
??.<String, String>transform(s -> s.toUpperCase()) |
Или используя ссылки на метод:
1
|
??.<String, String>transform(String::toUpperCase) |
Переход к более сложному потоку Рубе Голдберга для выполнения той же задачи, снова начиная с конфигурации на основе XML. Есть две конфигурации, чтобы выразить этот поток:
rube-1.xml: эта конфигурация выполняет шаги 1, 2, 3, 6, 7, 8:
- Он принимает сообщение такого типа — «Привет от Spring Integ»
- разбивает его на отдельные слова (привет, от, весна, целое)
- отправляет каждое слово в очередь ActiveMQ
- из очереди фрагменты слова подбираются обогащателем, чтобы заглавные слова
- помещение ответа обратно в очередь ответов
- Он подобран, переупорядочен на основе оригинальной последовательности слов
- объединены обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
- вернулся к заявке.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
< channel id = "requestChannel" /> <!--Step 1, 8--> < gateway id = "echoGateway" service-interface = "rube.complicated.EchoGateway" default-request-channel = "requestChannel" default-reply-timeout = "5000" /> < channel id = "toJmsOutbound" /> <!--Step 2--> < splitter input-channel = "requestChannel" output-channel = "toJmsOutbound" expression = "payload.split('\s')" apply-sequence = "true" /> < channel id = "sequenceChannel" /> <!--Step 3--> < int-jms:outbound-gateway request-channel = "toJmsOutbound" reply-channel = "sequenceChannel" request-destination = "amq.outbound" extract-request-payload = "true" /> <!--On the way back from the queue--> < channel id = "aggregateChannel" /> <!--Step 6--> < resequencer input-channel = "sequenceChannel" output-channel = "aggregateChannel" release-partial-sequences = "false" /> <!--Step 7--> < aggregator input-channel = "aggregateChannel" expression = "T(com.google.common.base.Joiner).on(' ').join(![payload])" /> |
и rube-2.xml для шагов 4, 5:
- Он принимает сообщение такого типа — «Привет от Spring Integ»
- разбивает его на отдельные слова (привет, от, весна, целое)
- отправляет каждое слово в очередь ActiveMQ
- из очереди фрагменты слова подбираются обогащателем, чтобы заглавные слова
- помещение ответа обратно в очередь ответов
- Он подобран, переупорядочен на основе оригинальной последовательности слов
- объединены обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
- вернулся к заявке.
1
2
3
4
5
|
< channel id = "enhanceMessageChannel" /> < int-jms:inbound-gateway request-channel = "enhanceMessageChannel" request-destination = "amq.outbound" /> < transformer input-channel = "enhanceMessageChannel" expression = "(payload + '').toUpperCase()" /> |
Теперь, выражая этот поток Rube Goldberg с помощью Spring Integration Java DSL, конфигурация выглядит следующим образом, опять же, в двух частях:
EchoFlowOutbound.java:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Bean public DirectChannel sequenceChannel() { return new DirectChannel(); } @Bean public DirectChannel requestChannel() { return new DirectChannel(); } @Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from(requestChannel()) .split(s -> s.applySequence( true ).get().getT2().setDelimiters( "\\s" )) .handle(jmsOutboundGateway()) .get(); } @Bean public IntegrationFlow flowOnReturnOfMessage() { return IntegrationFlows.from(sequenceChannel()) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on( " " ).join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null ) .get(); } |
и EchoFlowInbound.java:
01
02
03
04
05
06
07
08
09
10
11
|
@Bean public JmsMessageDrivenEndpoint jmsInbound() { return new JmsMessageDrivenEndpoint(listenerContainer(), messageListener()); } @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from(enhanceMessageChannel()) .transform((String s) -> s.toUpperCase()) .get(); } |
Опять же, здесь код полностью безопасен и проверяется на наличие ошибок во время разработки, а не во время выполнения, как в конфигурации на основе XML. Опять же мне нравится тот факт, что преобразования, операторы агрегации могут быть выражены кратко, используя лямда-выражения Java 8, в отличие от выражений Spring-EL.
Здесь я не отображал часть кода поддержки, для настройки тестовой инфраструктуры activemq , эта конфигурация продолжает оставаться в формате xml, и я включил этот код в пример проекта github.
В целом, я очень рад видеть этот новый способ выражения потока сообщений Spring Integration с использованием чистой Java, и я с нетерпением жду его непрерывного развития и, возможно, даже попытаюсь участвовать в его развитии небольшими способами.
Вот весь рабочий код репозитория github : https://github.com/bijukunjummen/rg-si
Ресурсы и благодарность:
- Вступительная статья Spring Integration Java DSL от Артема Билана : https://spring.io/blog/2014/05/08/spring-integration-java-dsl-milestone-1-released
- Веб-сайт и вики- страница Spring Integration Java DSL: https://github.com/spring-projects/spring-integration-extensions/wiki/Spring-Integration-Java-DSL-Reference. Множество кода было бесстыдно скопировано мной из этой вики! Кроме того, большое спасибо Артему за руководство по вопросу, который у меня был
- Вебинар Гэри Рассела по Spring Integration 4.0, на котором Spring Integration Java DSL освещается очень подробно.
Ссылка: | Образец Spring Integration Java DSL от нашего партнера JCG Биджу Кунджуммена из блога all and sundry. |