В более
ранней записи в блоге я затронул фиктивный поток rube goldberg для заглавной строки в сложной последовательности шагов. В этой статье я хотел представить Spring DS Integration Java в качестве альтернативы определению потоков интеграции через файлы конфигурации xml.
После того, как я написал эту запись в блоге, я узнал несколько новых вещей благодаря
Артему Билану и хотел задокументировать эти уроки здесь:
Итак, сначала мой оригинальный образец, здесь у меня есть следующий поток (выделенный жирным шрифтом):
- Возьмите сообщение этого типа — «привет из весны Integ»
- Разделите его на отдельные слова (привет, от, весна, целое)
- Отправить каждое слово в очередь ActiveMQ
- Подберите фрагменты слова из очереди и используйте каждое слово с заглавной буквы
- Поместите ответ обратно в очередь ответов
- Возьмите сообщение, повторите последовательность на основе оригинальной последовательности слов
- Соберите обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
- Верните предложение обратно вызывающему приложению.
EchoFlowOutbound.java:
@Bean
public DirectChannel sequenceChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel requestChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow toOutboundQueueFlow() {
return IntegrationFlows.from(requestChannel())
.split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
.handle(jmsOutboundGateway())
.get();
}
@Bean
public IntegrationFlow flowOnReturnOfMessage() {
return IntegrationFlows.from(sequenceChannel())
.resequence()
.aggregate(aggregate ->
aggregate.outputProcessor(g ->
Joiner.on(" ").join(g.getMessages()
.stream()
.map(m -> (String) m.getPayload()).collect(toList())))
, null)
.get();
}
@Bean
public JmsOutboundGateway jmsOutboundGateway() {
JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
jmsOutboundGateway.setConnectionFactory(this.connectionFactory);
jmsOutboundGateway.setRequestDestinationName("amq.outbound");
jmsOutboundGateway.setReplyChannel(sequenceChannel());
return jmsOutboundGateway;
}
На основании отзывов Артема Билана получается
, что здесь можно оптимизировать несколько вещей.
Сначала обратите внимание, как я явно определил два прямых канала: «requestChannel» для запуска потока, который принимает строковое сообщение, и «sequenceChannel» для обработки сообщения, как только оно вернется из очереди сообщений jms, они могут быть на самом деле полностью удалены и поток сделал немного более кратким этот путь:
@Bean
public IntegrationFlow toOutboundQueueFlow() {
return IntegrationFlows.from("requestChannel")
.split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
.handle(jmsOutboundGateway())
.resequence()
.aggregate(aggregate ->
aggregate.outputProcessor(g ->
Joiner.on(" ").join(g.getMessages()
.stream()
.map(m -> (String) m.getPayload()).collect(toList())))
, null)
.get();
}
@Bean
public JmsOutboundGateway jmsOutboundGateway() {
JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
jmsOutboundGateway.setConnectionFactory(this.connectionFactory);
jmsOutboundGateway.setRequestDestinationName("amq.outbound");
return jmsOutboundGateway;
}
«requestChannel» теперь создается неявно, просто объявив для него имя. Последовательность канала более интересна, цитируя
Артема Билана —
не указывайте outputChannel для AbstractReplyProroductionMessageHandler и полагаться на DSL
это означает, что здесь jmsOutboundGateway является AbstractReplyProroductionMessageHandler и его канал ответа неявно получен DSL. Кроме того, два метода, которые ранее обрабатывали потоки для отправки сообщения в очередь и затем продолжения после возвращения сообщения, объединяются в один. И ИМХО это действительно читается немного лучше из-за этого изменения.
Вторым хорошим изменением и темой этой статьи является введение фабрик пространства имен Jms. Когда я писал предыдущую статью в блоге, DSL поддерживала определение входящего / исходящего адаптера / шлюза AMQ, теперь есть поддержка входящего на основе Jms. / адаптер адаптер / шлюзы также, это еще больше упрощает поток, поток теперь выглядит так:
@Bean
public IntegrationFlow toOutboundQueueFlow() {
return IntegrationFlows.from("requestChannel")
.split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
.handle(Jms.outboundGateway(connectionFactory)
.requestDestination("amq.outbound"))
.resequence()
.aggregate(aggregate ->
aggregate.outputProcessor(g ->
Joiner.on(" ").join(g.getMessages()
.stream()
.map(m -> (String) m.getPayload()).collect(toList())))
, null)
.get();
}
Входящая часть Jms-потока также упрощается до следующего:
@Bean
public IntegrationFlow inboundFlow() {
return IntegrationFlows.from(Jms.inboundGateway(connectionFactory)
.destination("amq.outbound"))
.transform((String s) -> s.toUpperCase())
.get();
}
Таким образом, в заключение следует отметить, что Spring Integration Java DSL — это захватывающий новый способ точной настройки потоков Spring Integration. Это уже впечатляет тем, как упрощает читаемость потоков, а введение фабрик пространства имен Jms делает его еще более привлекательным для потоков на основе JMS.