Статьи

Образец Spring Integration Java DSL — дальнейшее упрощение с фабриками пространства имен Jms

В более ранней записи в блоге я затронул фиктивный поток rube goldberg для заглавной строки в сложной последовательности шагов. В этой статье я хотел представить Spring DS Integration Java в качестве альтернативы определению потоков интеграции через файлы конфигурации xml.

Я написал несколько новых вещей после написания этой записи в блоге, благодаря Артему Билану, и хотел задокументировать эти уроки здесь:

Итак, сначала мой оригинальный образец, здесь у меня есть следующий поток (выделенный жирным шрифтом):

  1. Примите сообщение такого типа — «привет из пружины целых»
  2. Разделите его на отдельные слова (привет, от, весна, целое)
  3. Отправить каждое слово в очередь ActiveMQ
  4. Подберите фрагменты слова из очереди и используйте каждое слово с заглавной буквы.
  5. Поместите ответ обратно в очередь ответов
  6. Возьмите сообщение, повторите последовательность на основе оригинальной последовательности слов
  7. Соберите обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
  8. Верните предложение обратно вызывающему приложению.

EchoFlowOutbound.java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Bean
 public DirectChannel sequenceChannel() {
  return new DirectChannel();
 }
 
 @Bean
 public DirectChannel requestChannel() {
  return new DirectChannel();
 }
 
 @Bean
 public IntegrationFlow toOutboundQueueFlow() {
  return IntegrationFlows.from(requestChannel())
    .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
    .handle(jmsOutboundGateway())
    .get();
 }
 
 @Bean
 public IntegrationFlow flowOnReturnOfMessage() {
  return IntegrationFlows.from(sequenceChannel())
    .resequence()
    .aggregate(aggregate ->
      aggregate.outputProcessor(g ->
        Joiner.on(" ").join(g.getMessages()
          .stream()
          .map(m -> (String) m.getPayload()).collect(toList())))
      , null)
    .get();
 }
 
@Bean
public JmsOutboundGateway jmsOutboundGateway() {
 JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
 jmsOutboundGateway.setConnectionFactory(this.connectionFactory);
 jmsOutboundGateway.setRequestDestinationName("amq.outbound");
 jmsOutboundGateway.setReplyChannel(sequenceChannel());
 return jmsOutboundGateway;
}

На основании отзывов Артема Билана получается, что здесь можно оптимизировать несколько вещей.

Сначала обратите внимание, как я явно определил два прямых канала: «requestChannel» для запуска потока, который принимает строковое сообщение, и «sequenceChannel» для обработки сообщения, как только оно вернется из очереди сообщений jms, их можно полностью удалить и поток сделал немного более кратким этот путь:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
@Bean
public IntegrationFlow toOutboundQueueFlow() {
 return IntegrationFlows.from("requestChannel")
   .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
   .handle(jmsOutboundGateway())
   .resequence()
   .aggregate(aggregate ->
     aggregate.outputProcessor(g ->
       Joiner.on(" ").join(g.getMessages()
         .stream()
         .map(m -> (String) m.getPayload()).collect(toList())))
     , null)
   .get();
}
 
@Bean
public JmsOutboundGateway jmsOutboundGateway() {
 JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
 jmsOutboundGateway.setConnectionFactory(this.connectionFactory);
 jmsOutboundGateway.setRequestDestinationName("amq.outbound");
 return jmsOutboundGateway;
}

«RequestChannel» теперь создается неявно, просто объявив для него имя. Последовательность канала более интересна, цитируя Артема Билана

не указывайте outputChannel для AbstractReplyProroductionMessageHandler и полагаться на DSL

это означает, что здесь jmsOutboundGateway является AbstractReplyProductivityMessageHandler, а его канал ответа неявно получен DSL. Кроме того, два метода, которые ранее обрабатывали потоки для отправки сообщения в очередь и затем продолжения после возвращения сообщения, объединяются в один. И ИМХО это действительно читается немного лучше из-за этого изменения.

Вторым хорошим изменением и темой этой статьи является введение фабрик пространства имен Jms. Когда я писал предыдущую статью в блоге, DSL поддерживала определение входящего / исходящего адаптера / шлюза AMQ, теперь есть поддержка входящего на основе Jms. / адаптер адаптер / шлюзы также, это еще больше упрощает поток, поток теперь выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
@Bean
public IntegrationFlow toOutboundQueueFlow() {
 return IntegrationFlows.from("requestChannel")
   .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
   .handle(Jms.outboundGateway(connectionFactory)
     .requestDestination("amq.outbound"))
   .resequence()
   .aggregate(aggregate ->
     aggregate.outputProcessor(g ->
       Joiner.on(" ").join(g.getMessages()
         .stream()
         .map(m -> (String) m.getPayload()).collect(toList())))
     , null)
   .get();
}

Входящая часть Jms-потока также упрощается до следующего:

1
2
3
4
5
6
7
@Bean
public IntegrationFlow inboundFlow() {
 return IntegrationFlows.from(Jms.inboundGateway(connectionFactory)
   .destination("amq.outbound"))
   .transform((String s) -> s.toUpperCase())
   .get();
}

Таким образом, в заключение следует отметить, что Spring Integration Java DSL — это захватывающий новый способ точной настройки потоков Spring Integration. Это уже впечатляет тем, как упрощает читаемость потоков, а введение фабрик пространства имен Jms делает его еще более привлекательным для потоков на основе JMS.

  • Я обновил пример приложения с изменениями, перечисленными в этой статье — https://github.com/bijukunjummen/rg-si.