В более
ранней записи в блоге я затронул фиктивный поток rube goldberg для заглавной строки в сложной последовательности шагов. В этой статье я хотел представить Spring DS Integration Java в качестве альтернативы определению потоков интеграции через файлы конфигурации xml.
После того, как я написал эту запись в блоге, я узнал несколько новых вещей благодаря
Артему Билану и хотел задокументировать эти уроки здесь:
Итак, сначала мой оригинальный образец, здесь у меня есть следующий поток (выделенный жирным шрифтом):
- Возьмите сообщение этого типа — «привет из весны Integ»
- Разделите его на отдельные слова (привет, от, весна, целое)
- Отправить каждое слово в очередь ActiveMQ
- Подберите фрагменты слова из очереди и используйте каждое слово с заглавной буквы
- Поместите ответ обратно в очередь ответов
- Возьмите сообщение, повторите последовательность на основе оригинальной последовательности слов
- Соберите обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
- Верните предложение обратно вызывающему приложению.
EchoFlowOutbound.java:
@Bean public DirectChannel sequenceChannel() { return new DirectChannel(); } @Bean public DirectChannel requestChannel() { return new DirectChannel(); } @Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from(requestChannel()) .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s")) .handle(jmsOutboundGateway()) .get(); } @Bean public IntegrationFlow flowOnReturnOfMessage() { return IntegrationFlows.from(sequenceChannel()) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on(" ").join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null) .get(); } @Bean public JmsOutboundGateway jmsOutboundGateway() { JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway(); jmsOutboundGateway.setConnectionFactory(this.connectionFactory); jmsOutboundGateway.setRequestDestinationName("amq.outbound"); jmsOutboundGateway.setReplyChannel(sequenceChannel()); return jmsOutboundGateway; }
На основании отзывов Артема Билана получается
, что здесь можно оптимизировать несколько вещей.
Сначала обратите внимание, как я явно определил два прямых канала: «requestChannel» для запуска потока, который принимает строковое сообщение, и «sequenceChannel» для обработки сообщения, как только оно вернется из очереди сообщений jms, они могут быть на самом деле полностью удалены и поток сделал немного более кратким этот путь:
@Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from("requestChannel") .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s")) .handle(jmsOutboundGateway()) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on(" ").join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null) .get(); } @Bean public JmsOutboundGateway jmsOutboundGateway() { JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway(); jmsOutboundGateway.setConnectionFactory(this.connectionFactory); jmsOutboundGateway.setRequestDestinationName("amq.outbound"); return jmsOutboundGateway; }
«requestChannel» теперь создается неявно, просто объявив для него имя. Последовательность канала более интересна, цитируя
Артема Билана —
не указывайте outputChannel для AbstractReplyProroductionMessageHandler и полагаться на DSL
это означает, что здесь jmsOutboundGateway является AbstractReplyProroductionMessageHandler и его канал ответа неявно получен DSL. Кроме того, два метода, которые ранее обрабатывали потоки для отправки сообщения в очередь и затем продолжения после возвращения сообщения, объединяются в один. И ИМХО это действительно читается немного лучше из-за этого изменения.
Вторым хорошим изменением и темой этой статьи является введение фабрик пространства имен Jms. Когда я писал предыдущую статью в блоге, DSL поддерживала определение входящего / исходящего адаптера / шлюза AMQ, теперь есть поддержка входящего на основе Jms. / адаптер адаптер / шлюзы также, это еще больше упрощает поток, поток теперь выглядит так:
@Bean public IntegrationFlow toOutboundQueueFlow() { return IntegrationFlows.from("requestChannel") .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s")) .handle(Jms.outboundGateway(connectionFactory) .requestDestination("amq.outbound")) .resequence() .aggregate(aggregate -> aggregate.outputProcessor(g -> Joiner.on(" ").join(g.getMessages() .stream() .map(m -> (String) m.getPayload()).collect(toList()))) , null) .get(); }
Входящая часть Jms-потока также упрощается до следующего:
@Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from(Jms.inboundGateway(connectionFactory) .destination("amq.outbound")) .transform((String s) -> s.toUpperCase()) .get(); }
Таким образом, в заключение следует отметить, что Spring Integration Java DSL — это захватывающий новый способ точной настройки потоков Spring Integration. Это уже впечатляет тем, как упрощает читаемость потоков, а введение фабрик пространства имен Jms делает его еще более привлекательным для потоков на основе JMS.