Статьи

Образец Spring Integration Java DSL — дальнейшее упрощение с помощью фабрики пространств имен JMS


В более 
ранней  записи в блоге я затронул фиктивный поток rube goldberg для заглавной строки в сложной последовательности шагов. В этой статье я хотел представить Spring DS Integration Java в качестве альтернативы определению потоков интеграции через файлы конфигурации xml.

После того, как я написал эту запись в блоге, я узнал несколько новых вещей благодаря 
Артему Билану  и хотел задокументировать эти уроки здесь:

Итак, сначала мой оригинальный образец, здесь у меня есть следующий поток (выделенный жирным шрифтом):

  1. Возьмите сообщение этого типа — «привет из весны Integ»
  2. Разделите его на отдельные слова (привет, от, весна, целое)
  3. Отправить каждое слово в очередь ActiveMQ
  4. Подберите фрагменты слова из очереди и используйте каждое слово с заглавной буквы
  5. Поместите ответ обратно в очередь ответов
  6. Возьмите сообщение, повторите последовательность на основе оригинальной последовательности слов
  7. Соберите обратно в предложение («ПРИВЕТ ОТ ВЕСНОЙ ИНТЕГ») и
  8. Верните предложение обратно вызывающему приложению.

EchoFlowOutbound.java:

@Bean
 public DirectChannel sequenceChannel() {
  return new DirectChannel();
 }

 @Bean
 public DirectChannel requestChannel() {
  return new DirectChannel();
 }

 @Bean
 public IntegrationFlow toOutboundQueueFlow() {
  return IntegrationFlows.from(requestChannel())
    .split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
    .handle(jmsOutboundGateway())
    .get();
 }

 @Bean
 public IntegrationFlow flowOnReturnOfMessage() {
  return IntegrationFlows.from(sequenceChannel())
    .resequence()
    .aggregate(aggregate ->
      aggregate.outputProcessor(g ->
        Joiner.on(" ").join(g.getMessages()
          .stream()
          .map(m -> (String) m.getPayload()).collect(toList())))
      , null)
    .get();
 }

@Bean
public JmsOutboundGateway jmsOutboundGateway() {
	JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
	jmsOutboundGateway.setConnectionFactory(this.connectionFactory);
	jmsOutboundGateway.setRequestDestinationName("amq.outbound");
	jmsOutboundGateway.setReplyChannel(sequenceChannel());
	return jmsOutboundGateway;
}

На основании отзывов Артема Билана получается 
, что здесь можно оптимизировать несколько вещей. 

Сначала обратите внимание, как я явно определил два прямых канала: «requestChannel» для запуска потока, который принимает строковое сообщение, и «sequenceChannel» для обработки сообщения, как только оно вернется из очереди сообщений jms, они могут быть на самом деле полностью удалены и поток сделал немного более кратким этот путь:

@Bean
public IntegrationFlow toOutboundQueueFlow() {
	return IntegrationFlows.from("requestChannel")
			.split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
			.handle(jmsOutboundGateway())
			.resequence()
			.aggregate(aggregate ->
					aggregate.outputProcessor(g ->
							Joiner.on(" ").join(g.getMessages()
									.stream()
									.map(m -> (String) m.getPayload()).collect(toList())))
					, null)
			.get();
}

@Bean
public JmsOutboundGateway jmsOutboundGateway() {
	JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
	jmsOutboundGateway.setConnectionFactory(this.connectionFactory);
	jmsOutboundGateway.setRequestDestinationName("amq.outbound");
	return jmsOutboundGateway;
}

«requestChannel» теперь создается неявно, просто объявив для него имя. Последовательность канала более интересна, цитируя 
Артема Билана  —


не указывайте outputChannel для AbstractReplyProroductionMessageHandler и полагаться на DSL

это означает, что здесь jmsOutboundGateway является AbstractReplyProroductionMessageHandler и его канал ответа неявно получен DSL. Кроме того, два метода, которые ранее обрабатывали потоки для отправки сообщения в очередь и затем продолжения после возвращения сообщения, объединяются в один. И ИМХО это действительно читается немного лучше из-за этого изменения.

Вторым хорошим изменением и темой этой статьи является введение фабрик пространства имен Jms. Когда я писал предыдущую статью в блоге, DSL поддерживала определение входящего / исходящего адаптера / шлюза AMQ, теперь есть поддержка входящего на основе Jms. / адаптер адаптер / шлюзы также, это еще больше упрощает поток, поток теперь выглядит так:

@Bean
public IntegrationFlow toOutboundQueueFlow() {
	return IntegrationFlows.from("requestChannel")
			.split(s -> s.applySequence(true).get().getT2().setDelimiters("\\s"))
			.handle(Jms.outboundGateway(connectionFactory)
					.requestDestination("amq.outbound"))
			.resequence()
			.aggregate(aggregate ->
					aggregate.outputProcessor(g ->
							Joiner.on(" ").join(g.getMessages()
									.stream()
									.map(m -> (String) m.getPayload()).collect(toList())))
					, null)
			.get();
}

Входящая часть Jms-потока также упрощается до следующего:

@Bean
public IntegrationFlow inboundFlow() {
	return IntegrationFlows.from(Jms.inboundGateway(connectionFactory)
			.destination("amq.outbound"))
			.transform((String s) -> s.toUpperCase())
			.get();
}

Таким образом, в заключение следует отметить, что Spring Integration Java DSL — это захватывающий новый способ точной настройки потоков Spring Integration. Это уже впечатляет тем, как упрощает читаемость потоков, а введение фабрик пространства имен Jms делает его еще более привлекательным для потоков на основе JMS.