Статьи

Spring DS Integration Java (до Java 8): построчное руководство


Первоначально написано 
Артемом Биланом   в блоге SpringSource.

Уважаемое весеннее сообщество!

Недавно мы опубликовали руководство по  интеграции Java Spring DSL: построчно , в котором широко используются Java 8 Lambdas. Мы получили некоторые отзывы о том, что это хорошее введение в DSL, но аналогичное руководство необходимо для тех пользователей, которые не могут перейти на Java 8 или еще не знакомы  Lambdas, но хотят воспользоваться

Итак, чтобы помочь пользователям Spring Integration, которые хотят перейти с конфигурации XML на конфигурацию Java и аннотации, мы предоставляем это,  line-by-line tutorial чтобы продемонстрировать, что даже без  Lambdasэтого мы многое извлекаем из использования Spring DS Integration Java DSL. Хотя большинство согласится с тем, что лямбда-синтаксис обеспечивает более краткое определение.

Здесь мы анализируем тот же   пример Cafe Demo , но используя для конфигурации вариант до Java 8. Многие параметры одинаковы, поэтому мы просто скопируем / вставим их описание здесь, чтобы получить полную картину. Поскольку эта конфигурация Java DSL Spring Integration сильно отличается от лямбда-стиля Java 8, всем пользователям будет полезно получить представление о том, как мы можем достичь того же результата с богатым набором опций, предоставляемых Java DSL Spring Integration.

Исходный код нашего приложения размещен в одном классе, который является  Boot приложением; значимые строки обозначены цифрами, соответствующими комментариям, которые следуют:

@SpringBootApplication // 1
@IntegrationComponentScan // 2
public class Application {




public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx =
SpringApplication.run(Application.class, args); // 3




Cafe cafe = ctx.getBean(Cafe.class); // 4
for (int i = 1; i <= 100; i++) { // 5
Order order = new Order(i);
order.addItem(DrinkType.LATTE, 2, false);
order.addItem(DrinkType.MOCHA, 3, true);
cafe.placeOrder(order);
}




System.out.println("Hit 'Enter' to terminate"); // 6
System.in.read();
ctx.close();
}




@MessagingGateway // 7
public interface Cafe {




@Gateway(requestChannel = "orders.input") // 8
void placeOrder(Order order); // 9




}




private final AtomicInteger hotDrinkCounter = new AtomicInteger();




private final AtomicInteger coldDrinkCounter = new AtomicInteger(); // 10




@Autowired
private CafeAggregator cafeAggregator; // 11




@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() { // 12
return Pollers.fixedDelay(1000).get();
}




@Bean
@SuppressWarnings("unchecked")
public IntegrationFlow orders() { // 13
return IntegrationFlows.from("orders.input") // 14
.split("payload.items", (Consumer) null) // 15
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))// 16
.route("payload.iced", // 17
new Consumer<RouterSpec<ExpressionEvaluatingRouter>>() { // 18




@Override
public void accept(RouterSpec<ExpressionEvaluatingRouter> spec) {
spec.channelMapping("true", "iced")
.channelMapping("false", "hot"); // 19
}




})
.get(); // 20
}




@Bean
public IntegrationFlow icedFlow() { // 21
return IntegrationFlows.from(MessageChannels.queue("iced", 10)) // 22
.handle(new GenericHandler<OrderItem>() { // 23




@Override
public Object handle(OrderItem payload, Map<String, Object> headers) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()
+ " prepared cold drink #" + coldDrinkCounter.incrementAndGet()
+ " for order #" + payload.getOrderNumber() + ": " + payload);
return payload; // 24
}




})
.channel("output") // 25
.get();
}




@Bean
public IntegrationFlow hotFlow() { // 26
return IntegrationFlows.from(MessageChannels.queue("hot", 10))
.handle(new GenericHandler<OrderItem>() {




@Override
public Object handle(OrderItem payload, Map<String, Object> headers) {
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); // 27
System.out.println(Thread.currentThread().getName()
+ " prepared hot drink #" + hotDrinkCounter.incrementAndGet()
+ " for order #" + payload.getOrderNumber() + ": " + payload);
return payload;
}




})
.channel("output")
.get();
}




@Bean
public IntegrationFlow resultFlow() { // 28
return IntegrationFlows.from("output") // 29
.transform(new GenericTransformer<OrderItem, Drink>() { // 30




@Override
public Drink transform(OrderItem orderItem) {
return new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots()); // 31
}




})
.aggregate(new Consumer<AggregatorSpec>() { // 32




@Override
public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.processor(cafeAggregator, null); // 33
}




}, null)
.handle(CharacterStreamWritingMessageHandler.stdout()) // 34
.get();
}
@Component
public static class CafeAggregator { // 35




@Aggregator // 36
public Delivery output(List<Drink> drinks) {
return new Delivery(drinks);
}




@CorrelationStrategy // 37
public Integer correlation(Drink drink) {
return drink.getOrderNumber();
}




}




}

Изучение кода построчно …

1.

@SpringBootApplication

Эта новая мета-аннотация от Spring Boot 1.2. Включает в себя  @Configuration и @EnableAutoConfiguration. Поскольку мы находимся в приложении Spring Integration, и Spring Boot имеет автоматическую настройку для него,  @EnableIntegration он автоматически применяется для инициализации инфраструктуры Spring Integration, включая среду для Java DSL — DslIntegrationConfigurationInitializer, которую выбирает IntegrationConfigurationBeanFactoryPostProcessor from  /META-INF/spring.factories.

2.

@IntegrationComponentScan

Аналог Spring Integration  @ComponentScan для сканирования компонентов на основе интерфейсов (Spring Framework рассматривает  @ComponentScan только классы). Spring Integration поддерживает обнаружение интерфейсов, помеченных  @MessagingGateway (см. № 7 ниже).

3.

ConfigurableApplicationContext ctx =
SpringApplication.run(Application.class, args);

main Метод нашего класса предназначен для запуска приложения Spring загрузки с использованием конфигурации из этого класса и начинает  ApplicationContext через Spring ботинок. Кроме того, он делегирует аргументы командной строки Spring Boot. Например, вы можете указать,  --debug чтобы видеть журналы для отчета автоконфигурации загрузки.

4.

Cafe cafe = ctx.getBean(Cafe.class);

Поскольку у нас уже есть,  ApplicationContext мы можем начать взаимодействовать с приложением. И Cafe это точка входа — в терминах EIP a  gateway. Шлюзы — это просто интерфейсы, и приложение не взаимодействует с Messaging API; это просто имеет дело с доменом (см. № 7 ниже).

5.

for (int i = 1; i <= 100; i++) {

Чтобы продемонстрировать работу кафе, мы инициируем 100 заказов с двумя напитками — горячим и холодным. И отправьте  Order к  Cafe шлюзу.

6.

System.out.println("Hit 'Enter' to terminate");

Обычно приложения Spring Integration являются асинхронными, поэтому, чтобы избежать раннего выхода из main потока, мы блокируем  main метод до некоторого взаимодействия с конечным пользователем через командную строку. Потоки, не являющиеся демонами, будут держать приложение открытым, но  System.read()предоставят нам механизм для его точного закрытия.

7.

@MessagingGateway

Аннотация для обозначения бизнес-интерфейса, указывающего, что он находится  gateway между конечным приложением и уровнем интеграции. Это аналог  <gateway /> компонента из конфигурации Spring Integration XML. Spring Integration создает  Proxy для этого интерфейса и заполняет его как компонент в контексте приложения. Цель этого  Proxy состоит в том, чтобы обернуть параметры в  Message<?> объекте и отправить его в  MessageChannel соответствии с предоставленными параметрами.

8.

@Gateway(requestChannel = "orders.input")

Аннотация на уровне методов к отдельной бизнес-логике как методами, так и целевыми потоками интеграции. В этом примере мы используем  requestChannel ссылку  orders.input, которая является  MessageChannel именем бина нашего  IntegrationFlow входного канала (см. Ниже # 14).

9.

void placeOrder(Order order);

Метод интерфейса является центральной точкой взаимодействия конечного приложения с уровнем интеграции. Этот метод имеет  void тип возвращаемого значения. Это означает, что наш поток интеграции есть,  one-wayи мы просто отправляем сообщения в поток интеграции, но не ждем ответа.

10.

private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();

Два счетчика для сбора информации о том, как работает наше кафе с напитками.

11.

@Autowired
private CafeAggregator cafeAggregator;

POJO для  Aggregator логики (см. № 33 и № 35 ниже). Поскольку это бин Spring, мы можем просто внедрить его даже в текущий  @Configuration и использовать в любом месте ниже, например, из  .aggregate() EIP-метода.

12.

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {

default poller Боб. Это аналог  <poller default="true"> компонента из конфигурации Spring Integration XML. Требуется для конечных точек, где  inputChannelесть  PollableChannel. В этом случае это необходимо для двух кафе  queues — горячего и холодного (см. Ниже № 18). Здесь мы используем  Pollers фабрику из проекта DSL и используем его свободно распространяемый API-метод для создания метаданных опроса. Обратите внимание, что  Pollers это можно использовать непосредственно из  IntegrationFlow определения, если poller для конечной точки требуется определенный  (а не поллер по умолчанию).

13.

@Bean
public IntegrationFlow orders() {

Определение  IntegrationFlow бина. Это центральный компонент Java DSL Spring Integration, хотя он не играет никакой роли во время выполнения, только на этапе регистрации компонента. Весь другие регистры ниже код компонента Spring Integration ( MessageChannel, MessageHandlerEventDrivenConsumerMessageProducer,  и MessageSource т.д.) в IntegrationFlow объекте, который анализируется самым  , IntegrationFlowBeanPostProcessor чтобы обработать эти компоненты и зарегистрировать их как бобы в контексте применения по мере необходимости (некоторые элементы, такие как каналы уже может существовать ).

14.

return IntegrationFlows.from("orders.input")

Это  IntegrationFlows основной  factory класс для начала  IntegrationFlow. Она обеспечивает ряд перегруженных  .from() методов , чтобы начать вытекать из SourcePollingChannelAdapter для А  MessageSource<?> реализаций, например JdbcPollingChannelAdapter, от  MessageProducer, например WebSocketInboundChannelAdapter; или просто  MessageChannel. Все параметры «.from ()» имеют несколько удобных вариантов для настройки соответствующего компонента для запуска IntegrationFlow. Здесь мы используем только имя канала, которое преобразуется в определение DirectChannel компонента во время фазы определения компонента при анализе IntegrationFlow. В варианте Java 8 мы использовали здесь  Lambda definition — и это MessageChannel было неявно создано с именем компонента на основе имени IntegrationFlow компонента.

15.

.split("payload.items", (Consumer) null)

Поскольку наш поток интеграции принимает сообщения через  orders.input канал, мы готовы их использовать и обрабатывать. Первый EIP-метод в нашем сценарии  .split(). Мы знаем, что сообщение  payload от  orders.input канала является  Order доменным объектом, поэтому мы можем просто использовать здесь выражение Spring (SpEL) для возврата  Collection<OrderItem>. Таким образом, выполняется  split шаблон EI, и мы отправляем каждую запись коллекции в виде отдельного сообщения на следующий канал. В фоновом режиме  .split() метод регистрирует ExpressionEvaluatingSplitter MessageHandler реализацию и EventDrivenConsumer для этого  MessageHandlerпроводку в  orders.input канале как  inputChannel.

Второй аргумент для  .split() EIP-метода для  endpointConfigurer настройки параметров, таких как  autoStartuprequiresReplyи  adviceChain т. Д. Мы используем здесь, null чтобы показать, что мы полагаемся на параметры по умолчанию для конечной точки. Многие из EIP-методов предоставляют перегруженные версии с и без  endpointConfigurer. В настоящее время .split(String expression) EIP-метод без  endpointConfigurer аргумента недоступен; это будет решено в будущем выпуске.

16.

.channel(MessageChannels.executor(Executors.newCachedThreadPool()))

.channel() EIP-метод позволяет определять конкретный  MessageChannels между конечными точками, как это делается с помощью  output-channel/ input-channel атрибуты паров с конфигурацией XML Spring Integration. По умолчанию конечные точки в определении потока интеграции DSL связаны с  DirectChannels, которые получают имена компонентов на основе имени IntegrationFlow компонента и  index в цепочке потоков. В этом случае мы выбираем конкретную MessageChannel реализацию из  Channels класса фабрики; здесь выбран канал  ExecutorChannel, чтобы разрешить распределение сообщений от  splitter отдельных к  Thread, чтобы обрабатывать их параллельно в нисходящем потоке.

17.

.route("payload.iced",

Следующий EIP-метод в нашем сценарии —  .route()отправка  hot/iced заказов на разные кухни кафе. Мы снова используем здесь выражение SpEL, чтобы получить  routingKey входящее сообщение. В варианте Java 8 мы использовали  method-reference лямбда-выражение, но для стиля, предшествующего Java 8, мы должны использовать SpEL или реализацию встроенного интерфейса. Многие анонимные классы в потоке могут затруднить чтение, поэтому в большинстве случаев мы предпочитаем SpEL.

18.

new Consumer<RouterSpec<ExpressionEvaluatingRouter>>() {

Вторым аргументом  .route() EIP-метода является функциональный интерфейс  Consumer для задания  ExpressionEvaluatingRouter опций с помощью  RouterSpec Builder. Поскольку у нас нет никакого выбора в предварительной версии Java 8, мы просто предоставляем здесь встроенную реализацию для этого интерфейса.

19.

spec.channelMapping("true", "iced")
.channelMapping("false", "hot");

При  Consumer<RouterSpec<ExpressionEvaluatingRouter>>#accept()реализации мы можем предоставить желаемые  AbstractMappingMessageRouter варианты. Одним из них является то  channelMappings, когда мы определяем логику маршрутизации по результату расширения маршрутизатора и цели  MessageChannel для соответствующего результата. В этом случае  iced и hot являются  MessageChannel имена  IntegrationFlowс ниже.

20.

.get();

Это завершает поток. Любой  IntegrationFlows.from() метод возвращает IntegrationFlowBuilder экземпляр, и этот  get() метод извлекает  IntegrationFlowобъект из  IntegrationFlowBuilder конфигурации. Все, начиная с .from() и до метода до,  .get() является  IntegrationFlow определением. Все определенные компоненты хранятся в  IntegrationFlow и обрабатываются на IntegrationFlowBeanPostProcessor этапе создания компонента.

21.

@Bean
public IntegrationFlow icedFlow() {

Это второе  IntegrationFlow определение бобов — для  iced напитков. Здесь мы демонстрируем, что несколько  IntegrationFlows могут быть соединены вместе для создания одного сложного приложения. Примечание: не рекомендуется вводить одно  IntegrationFlow в другое; это может вызвать неожиданное поведение. Поскольку они предоставляют компоненты интеграции для регистрации bean-компонентов и являются  MessageChannelодним из них, лучший способ подключиться и внедрить это через MessageChannel или  @MessagingGateway интерфейсы.

22.

return IntegrationFlows.from(MessageChannels.queue("iced", 10))

iced IntegrationFlow Начинается с  , QueueChannel что имеет емкость  10сообщений; это зарегистрировано как боб с названием  iced. Как вы помните, мы используем это имя в качестве одного из отображений маршрута (см. Выше # 19).

В нашем примере мы используем ограничение,  QueueChannel чтобы отразить состояние кафе на кухне в реальной жизни. И здесь есть место, где нам нужно это  global poller для следующей конечной точки, которая прослушивает этот канал.

23.

.handle(new GenericHandler<OrderItem>() {

.handle() ОПЗ-метод  iced потока демонстрирует бетонное кафе кухни работы. Поскольку мы не можем минимизировать код с помощью чего-то вроде лямбда-выражения Java 8, мы предоставляем здесь встроенную реализацию для  GenericHandler функционального интерфейса с ожидаемым  payload типом в качестве универсального аргумента. В примере с Java 8 мы распределяем это .handle() между несколькими подпотоками подписчика для a  PublishSubscribeChannel. Однако в этом случае логика все реализована в одном методе.

24.

Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()
+ " prepared cold drink #" + coldDrinkCounter.incrementAndGet()
+ " for order #" + payload.getOrderNumber() + ": " + payload);
return payload;

Реализация бизнес-логики для текущего  .handle() EIP-компонента. С этим Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); мы просто блокируем ток  Thread на некоторое время, чтобы продемонстрировать, как быстро кухня Кафе готовит напиток. После этого мы просто сообщить  , STDOUT что напиток готов и вернуть ток OrderItem от  GenericHandler к следующей конечной точке в нашей  IntegrationFlow. В фоновом режиме платформа DSL регистрирует a  ServiceActivatingHandler для MethodInvokingMessageProcessor вызова  GenericHandler#handle во время выполнения. Кроме того, платформа регистрирует  PollingConsumer конечную точку для  QueueChannelвышеупомянутого. Эта конечная точка полагается на  default poller сообщения опроса из очереди. Конечно, мы всегда можем использовать конкретную  poller для любой конкретной конечной точки. В этом случае мы должны были бы предоставить второй  endpointConfigurer аргумент .handle() EIP-метод.

25.

.channel("output")

Поскольку это не конец нашего сценария Cafe, мы отправляем результат текущего потока на output канал, используя удобный EIP-метод  .channel() и имя MessageChannel компонента (см. Ниже # 29). Это логическое завершение текущего подпотока замороженного напитка, поэтому мы используем  .get() метод для возврата  IntegrationFlow. Потоки, которые заканчиваются обработчиком ответа, у которого нет финала  .channel() , вернут ответ в replyChannel заголовок сообщения  .

26.

@Bean
public IntegrationFlow hotFlow() {

IntegrationFlow Определение  hot напитков. Это похоже на предыдущий  iced поток напитков, но с определенной  hot бизнес-логикой. Это начинается с того,  hot QueueChannel который сопоставлен с маршрутизатором выше.

27.

Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);

sleepUninterruptibly Для  hot напитков. Правильно, нам нужно больше времени кипятить воду!

28.

@Bean
public IntegrationFlow resultFlow() {

Еще одно  IntegrationFlow определение bean-компонента для подготовки  Delivery клиента Cafe на основе  Drinks.

29.

return IntegrationFlows.from("output")

В  resultFlow начинается с  DirectChannel, который создается во время фазы определения фасоли с этим именем при условии. Вы должны помнить, что мы используем  outputназвание канала из потоков Кафе-кухни в последних  .channel() определениях.

30.

.transform(new GenericTransformer<OrderItem, Drink>() {

The .transform() EIP-method is for the appropriate pattern implementation and expects some object to convert one payload to another. In our sample we use an inline implementation of the GenericTransformer functional interface to convert OrderItem to Drink and we specify that using generic arguments. In the background, the DSL framework registers aMessageTransformingHandler and an EventDrivenConsumer endpoint with default options to consume messages from the output MessageChannel.

31.

public Drink transform(OrderItem orderItem) {
return new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots());
}

The business-specific GenericTransformer#transform() implementation to demonstrate how we benefit from Java Generics to transform one payload to another. Note: Spring Integration uses ConversionService before any method invocation and if you provide some specific Converter implementation, some domain payload can be converted to another automatically, when the framework has an appropriate registered Converter.

32.

.aggregate(new Consumer<AggregatorSpec>() {

The .aggregate() EIP-method provides options to configure anAggregatingMessageHandler and its endpoint, similar to what we can do with the<aggregator> component when using Spring Integration XML configuration. Of course, with the Java DSL we have more power to configure the aggregator in place, without any other extra beans. However we demonstrate here an aggregator configuration with annotations (see below #35). From the Cafe business logic perspective we compose the Delivery for the initial Order, since we .split() the original order to the OrderItems near the beginning.

33.

public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.processor(cafeAggregator, null);
}

An inline implementation of the Consumer for the AggregatorSpec. Using theaggregatorSpec Builder we can provide desired options for the aggregator component, which will be registered as an AggregatingMessageHandler bean. Here we just provide theprocessor as a reference to the autowired (see #11 above) CafeAggregator component (see #35 below). The second argument of the .processor() option is methodName. Since we are relying on the aggregator annotation configuration for the POJO, we don’t need to provide the method here and the framework will determine the correct POJO methods in the background.

34.

.handle(CharacterStreamWritingMessageHandler.stdout())

It is the end of our flow — the Delivery is delivered to the client! We just print here the message payload to STDOUT using out-of-the-boxCharacterStreamWritingMessageHandler from Spring Integration Core. This is a case to show how existing components from Spring Integration Core (and its modules) can be used from the Java DSL.

35.

@Component
public static class CafeAggregator {

The bean to specify the business logic for the aggregator above. This bean is picked up by the @ComponentScan, which is a part of the @SpringBootApplication meta-annotation (see above #1). So, this component becomes a bean and we can automatically wire (@Autowired) it to other components in the application context (see #11 above).

36.

@Aggregator
public Delivery output(List<Drink> drinks) {
return new Delivery(drinks);
}

The POJO-specific MessageGroupProcessor to build the output payload based on the payloads from aggregated messages. Since we mark this method with the @Aggregatorannotation, the target AggregatingMessageHandler can extract this method for theMethodInvokingMessageGroupProcessor.

37.

@CorrelationStrategy
public Integer correlation(Drink drink) {
return drink.getOrderNumber();
}

The POJO-specific CorrelationStrategy to extract the custom correlationKey from each inbound aggregator message. Since we mark this method with @CorrelationStrategyannotation the target AggregatingMessageHandler can extract this method for theMethodInvokingCorrelationStrategy. There is a similar self-explained@ReleaseStrategy annotation, but we rely in our Cafe sample just on the defaultSequenceSizeReleaseStrategy, which is based on the sequenceDetails message header populated by the splitter from the beginning of our integration flow.

Well, we have finished describing the Cafe Demo sample based on the Spring Integration Java DSL when Java Lambda support is not available. Compare it with XML sample and also seeLambda support tutorial to get more information regarding Spring Integration.

As you can see, using the DSL without lambdas is a little more verbose because you need to provide boilerplate code for inline anonymous implementations of functional interfaces. However, we believe it is important to support the use of the DSL for users who can’t yet move to Java 8. Many of the DSL benefits (fluent API, compile-time validation etc) are available for all users.

The use of lambdas continues the Spring Framework tradition of reducing or eliminating boilerplate code, so we encourage users to try Java 8 and lambdas and to encourage their organizations to consider allowing the use of Java 8 for Spring Integration applications.

In addition see the Reference Manual for more information.

As always, we look forward to your comments and feedback (StackOverflow (spring-integration tag), Spring JIRAGitHub) and we very much welcome contributions!

Thank you for your time and patience to read this!