Теперь для Spring Integration введен новый DSL на основе Java, который позволяет определять потоки сообщений Spring Integration с использованием чисто Java-конфигурации вместо использования Spring-XML-конфигурации.
Я попробовал DSL для образца потока интеграции, который у меня есть — я называю его потоком Рубе Голдберга , поскольку он следует извилистым путем, пытаясь использовать заглавную строку, переданную в качестве входных данных. Поток выглядит следующим образом и выполняет некоторые безумные вещи, выполняя простую задачу:
- Он принимает сообщение такого типа — «Привет от Spring Integ»
- разбивает его на отдельные слова (привет, от, весна, целое)
- отправляет каждое слово в очередь ActiveMQ
- из очереди фрагменты слова подбираются обогащателем, чтобы заглавные слова
- помещение ответа обратно в очередь ответов
- Он подобран, переупорядочен на основе оригинальной последовательности слов
- объединены обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
- вернулся к заявке.
Чтобы начать с Spring Integration Java DSL, простая конфигурация на основе Xml для использования заглавных букв в строке должна выглядеть следующим образом:
|
1
2
3
4
5
|
<channel id="requestChannel"/><gateway id="echoGateway" service-interface="rube.simple.EchoGateway" default-request-channel="requestChannel" /><transformer input-channel="requestChannel" expression="payload.toUpperCase()" /> |
Здесь ничего особенного не происходит, шлюз обмена сообщениями принимает сообщение, переданное из приложения, преобразует его в преобразователь, и оно возвращается обратно приложению.
Выражая это в Spring Integration Java DSL:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
@Configuration@EnableIntegration@IntegrationComponentScan@ComponentScanpublic class EchoFlow { @Bean public IntegrationFlow simpleEchoFlow() { return IntegrationFlows.from("requestChannel") .transform((String s) -> s.toUpperCase()) .get(); }}@MessagingGatewaypublic interface EchoGateway { @Gateway(requestChannel = "requestChannel") String echo(String message);} |
Обратите внимание, что аннотация @MessagingGateway не является частью Spring Integration Java DSL, она является существующим компонентом в Spring Integration и служит той же цели, что и компонент шлюза в конфигурации на основе XML. Мне нравится тот факт, что преобразование может быть выражено с помощью типов безопасных лямбда-выражений Java 8, а не выражения Spring-EL. Обратите внимание, что выражение преобразования могло быть закодировано довольно несколькими альтернативными способами:
|
1
|
??.transform((String s) -> s.toUpperCase()) |
Или же:
|
1
|
??.<String, String>transform(s -> s.toUpperCase()) |
Или используя ссылки на метод:
|
1
|
??.<String, String>transform(String::toUpperCase) |
Переход к более сложному потоку Рубе Голдберга для выполнения той же задачи, снова начиная с конфигурации на основе XML. Есть две конфигурации, чтобы выразить этот поток:
rube-1.xml: эта конфигурация выполняет шаги 1, 2, 3, 6, 7, 8:
- Он принимает сообщение такого типа — «Привет от Spring Integ»
- разбивает его на отдельные слова (привет, от, весна, целое)
- отправляет каждое слово в очередь ActiveMQ
- из очереди фрагменты слова подбираются обогащателем, чтобы заглавные слова
- помещение ответа обратно в очередь ответов
- Он подобран, переупорядочен на основе оригинальной последовательности слов
- объединены обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
- вернулся к заявке.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<channel id="requestChannel"/><!--Step 1, 8--><gateway id="echoGateway" service-interface="rube.complicated.EchoGateway" default-request-channel="requestChannel" default-reply-timeout="5000"/><channel id="toJmsOutbound"/><!--Step 2--><splitter input-channel="requestChannel" output-channel="toJmsOutbound" expression="payload.split('\s')" apply-sequence="true"/><channel id="sequenceChannel"/><!--Step 3--><int-jms:outbound-gateway request-channel="toJmsOutbound" reply-channel="sequenceChannel" request-destination="amq.outbound" extract-request-payload="true"/><!--On the way back from the queue--><channel id="aggregateChannel"/><!--Step 6--><resequencer input-channel="sequenceChannel" output-channel="aggregateChannel" release-partial-sequences="false"/><!--Step 7--><aggregator input-channel="aggregateChannel" expression="T(com.google.common.base.Joiner).on(' ').join(![payload])"/> |
и rube-2.xml для шагов 4, 5:
- Он принимает сообщение такого типа — «Привет от Spring Integ»
- разбивает его на отдельные слова (привет, от, весна, целое)
- отправляет каждое слово в очередь ActiveMQ
- из очереди фрагменты слова подбираются обогащателем, чтобы заглавные слова
- помещение ответа обратно в очередь ответов
- Он подобран, переупорядочен на основе оригинальной последовательности слов
- объединены обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
- вернулся к заявке.
|
1
2
3
4
5
|
<channel id="enhanceMessageChannel"/><int-jms:inbound-gateway request-channel="enhanceMessageChannel" request-destination="amq.outbound"/><transformer input-channel="enhanceMessageChannel" expression="(payload + '').toUpperCase()"/> |
Теперь, выражая этот поток Rube Goldberg с помощью Spring Integration Java DSL, конфигурация выглядит следующим образом, опять же, в двух частях:
EchoFlowOutbound.java:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Bean public DirectChannel sequenceChannel() { return new DirectChannel(); } @Bean public DirectChannel requestChannel() { return new DirectChannel(); } @Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from(requestChannel()) .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s")) .handle(jmsOutboundGateway()) .get(); } @Bean public IntegrationFlow flowOnReturnOfMessage() { return IntegrationFlows.from(sequenceChannel()) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on(" ").join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null) .get(); } |
и EchoFlowInbound.java:
|
01
02
03
04
05
06
07
08
09
10
11
|
@Beanpublic JmsMessageDrivenEndpoint jmsInbound() { return new JmsMessageDrivenEndpoint(listenerContainer(), messageListener());}@Beanpublic IntegrationFlow inboundFlow() { return IntegrationFlows.from(enhanceMessageChannel()) .transform((String s) -> s.toUpperCase()) .get();} |
Опять же, здесь код полностью безопасен и проверяется на наличие ошибок во время разработки, а не во время выполнения, как в конфигурации на основе XML. Опять же мне нравится тот факт, что преобразования, операторы агрегации могут быть выражены кратко, используя лямда-выражения Java 8, в отличие от выражений Spring-EL.
Здесь я не отображал часть кода поддержки, для настройки тестовой инфраструктуры activemq , эта конфигурация продолжает оставаться в формате xml, и я включил этот код в пример проекта github.
В целом, я очень рад видеть этот новый способ выражения потока сообщений Spring Integration с использованием чистой Java, и я с нетерпением жду его непрерывного развития и, возможно, даже попытаюсь участвовать в его развитии небольшими способами.
Вот весь рабочий код репозитория github : https://github.com/bijukunjummen/rg-si
Ресурсы и благодарность:
- Вступительная статья Spring Integration Java DSL от Артема Билана : https://spring.io/blog/2014/05/08/spring-integration-java-dsl-milestone-1-released
- Веб-сайт и вики- страница Spring Integration Java DSL: https://github.com/spring-projects/spring-integration-extensions/wiki/Spring-Integration-Java-DSL-Reference. Множество кода было бесстыдно скопировано мной из этой вики! Кроме того, большое спасибо Артему за руководство по вопросу, который у меня был
- Вебинар Гэри Рассела по Spring Integration 4.0, на котором Spring Integration Java DSL освещается очень подробно.
| Ссылка: | Образец Spring Integration Java DSL от нашего партнера JCG Биджу Кунджуммена из блога all and sundry. |
