Статьи

Пример Spring DS интеграции Java DSL

Теперь для Spring Integration введен новый DSL на основе Java, который позволяет определять потоки сообщений Spring Integration с использованием чисто Java-конфигурации вместо использования Spring-XML-конфигурации.

Я попробовал DSL для образца потока интеграции, который у меня есть — я называю его потоком Рубе Голдберга , поскольку он следует извилистым путем, пытаясь использовать заглавную строку, переданную в качестве входных данных. Поток выглядит следующим образом и выполняет некоторые безумные вещи, выполняя простую задачу:

деревенщина

  1. Он принимает сообщение такого типа — «Привет от Spring Integ»
  2. разбивает его на отдельные слова (привет, от, весна, целое)
  3. отправляет каждое слово в очередь ActiveMQ
  4. из очереди фрагменты слова подбираются обогащателем, чтобы заглавные слова
  5. помещение ответа обратно в очередь ответов
  6. Он подобран, переупорядочен на основе оригинальной последовательности слов
  7. объединены обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
  8. вернулся к заявке.

Чтобы начать с Spring Integration Java DSL, простая конфигурация на основе Xml для использования заглавных букв в строке должна выглядеть следующим образом:

1
2
3
4
5
<channel id="requestChannel"/>
 
<gateway id="echoGateway" service-interface="rube.simple.EchoGateway" default-request-channel="requestChannel" />
 
<transformer input-channel="requestChannel" expression="payload.toUpperCase()" />

Здесь ничего особенного не происходит, шлюз обмена сообщениями принимает сообщение, переданное из приложения, преобразует его в преобразователь, и оно возвращается обратно приложению.

Выражая это в Spring Integration Java DSL:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class EchoFlow {
 
 @Bean
 public IntegrationFlow simpleEchoFlow() {
  return IntegrationFlows.from("requestChannel")
    .transform((String s) -> s.toUpperCase())
    .get();
 }
}
 
@MessagingGateway
public interface EchoGateway {
 @Gateway(requestChannel = "requestChannel")
 String echo(String message);
}

Обратите внимание, что аннотация @MessagingGateway не является частью Spring Integration Java DSL, она является существующим компонентом в Spring Integration и служит той же цели, что и компонент шлюза в конфигурации на основе XML. Мне нравится тот факт, что преобразование может быть выражено с помощью типов безопасных лямбда-выражений Java 8, а не выражения Spring-EL. Обратите внимание, что выражение преобразования могло быть закодировано довольно несколькими альтернативными способами:

1
??.transform((String s) -> s.toUpperCase())

Или же:

1
??.<String, String>transform(s -> s.toUpperCase())

Или используя ссылки на метод:

1
??.<String, String>transform(String::toUpperCase)

Переход к более сложному потоку Рубе Голдберга для выполнения той же задачи, снова начиная с конфигурации на основе XML. Есть две конфигурации, чтобы выразить этот поток:

rube-1.xml: эта конфигурация выполняет шаги 1, 2, 3, 6, 7, 8:

  1. Он принимает сообщение такого типа — «Привет от Spring Integ»
  2. разбивает его на отдельные слова (привет, от, весна, целое)
  3. отправляет каждое слово в очередь ActiveMQ
  4. из очереди фрагменты слова подбираются обогащателем, чтобы заглавные слова
  5. помещение ответа обратно в очередь ответов
  6. Он подобран, переупорядочен на основе оригинальной последовательности слов
  7. объединены обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
  8. вернулся к заявке.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<channel id="requestChannel"/>
 
<!--Step 1, 8-->
<gateway id="echoGateway" service-interface="rube.complicated.EchoGateway" default-request-channel="requestChannel"
   default-reply-timeout="5000"/>
 
<channel id="toJmsOutbound"/>
 
<!--Step 2-->
<splitter input-channel="requestChannel" output-channel="toJmsOutbound" expression="payload.split('\s')"
    apply-sequence="true"/>
 
<channel id="sequenceChannel"/>
 
<!--Step 3-->
<int-jms:outbound-gateway request-channel="toJmsOutbound" reply-channel="sequenceChannel"
        request-destination="amq.outbound" extract-request-payload="true"/>
 
 
<!--On the way back from the queue-->
<channel id="aggregateChannel"/>
 
<!--Step 6-->
<resequencer input-channel="sequenceChannel" output-channel="aggregateChannel" release-partial-sequences="false"/>
 
<!--Step 7-->
<aggregator input-channel="aggregateChannel"
   expression="T(com.google.common.base.Joiner).on(' ').join(![payload])"/>

и rube-2.xml для шагов 4, 5:

  1. Он принимает сообщение такого типа — «Привет от Spring Integ»
  2. разбивает его на отдельные слова (привет, от, весна, целое)
  3. отправляет каждое слово в очередь ActiveMQ
  4. из очереди фрагменты слова подбираются обогащателем, чтобы заглавные слова
  5. помещение ответа обратно в очередь ответов
  6. Он подобран, переупорядочен на основе оригинальной последовательности слов
  7. объединены обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
  8. вернулся к заявке.
1
2
3
4
5
<channel id="enhanceMessageChannel"/>
 
<int-jms:inbound-gateway request-channel="enhanceMessageChannel" request-destination="amq.outbound"/>
 
<transformer input-channel="enhanceMessageChannel" expression="(payload + '').toUpperCase()"/>

Теперь, выражая этот поток Rube Goldberg с помощью Spring Integration Java DSL, конфигурация выглядит следующим образом, опять же, в двух частях:

EchoFlowOutbound.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Bean
 public DirectChannel sequenceChannel() {
  return new DirectChannel();
 }
 
 @Bean
 public DirectChannel requestChannel() {
  return new DirectChannel();
 }
 
 @Bean
 public IntegrationFlow toOutboundQueueFlow() {
  return IntegrationFlows.from(requestChannel())
    .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
    .handle(jmsOutboundGateway())
    .get();
 }
 
 @Bean
 public IntegrationFlow flowOnReturnOfMessage() {
  return IntegrationFlows.from(sequenceChannel())
    .resequence()
    .aggregate(aggregate ->
      aggregate.outputProcessor(g ->
        Joiner.on(" ").join(g.getMessages()
          .stream()
          .map(m -> (String) m.getPayload()).collect(toList())))
      , null)
    .get();
 }

и EchoFlowInbound.java:

01
02
03
04
05
06
07
08
09
10
11
@Bean
public JmsMessageDrivenEndpoint jmsInbound() {
 return new JmsMessageDrivenEndpoint(listenerContainer(), messageListener());
}
 
@Bean
public IntegrationFlow inboundFlow() {
 return IntegrationFlows.from(enhanceMessageChannel())
   .transform((String s) -> s.toUpperCase())
   .get();
}

Опять же, здесь код полностью безопасен и проверяется на наличие ошибок во время разработки, а не во время выполнения, как в конфигурации на основе XML. Опять же мне нравится тот факт, что преобразования, операторы агрегации могут быть выражены кратко, используя лямда-выражения Java 8, в отличие от выражений Spring-EL.

Здесь я не отображал часть кода поддержки, для настройки тестовой инфраструктуры activemq , эта конфигурация продолжает оставаться в формате xml, и я включил этот код в пример проекта github.

В целом, я очень рад видеть этот новый способ выражения потока сообщений Spring Integration с использованием чистой Java, и я с нетерпением жду его непрерывного развития и, возможно, даже попытаюсь участвовать в его развитии небольшими способами.

Вот весь рабочий код репозитория github : https://github.com/bijukunjummen/rg-si

Ресурсы и благодарность:

Ссылка: Образец Spring Integration Java DSL от нашего партнера JCG Биджу Кунджуммена из блога all and sundry.