Статьи

Spring DS Integration Java: руководство по каждой строке


Первоначально автор 
Артем Билан   в блоге SpringSource

Уважаемое весеннее сообщество!

Сразу после   объявления о выпуске Spring Integration Java DSL 1.0 GA я хочу представить вам Spring Integration Java DSL в виде построчного руководства на основе классического   примера интеграции Cafe Demo . Мы описываем здесь   поддержку Spring Boot ,  конфигурацию Spring Framework  Java и аннотаций , эту  IntegrationFlow функцию и отдаем дань поддержке Java 8  Lambda, которая послужила вдохновением для стиля DSL. Конечно, все это поддерживается   проектом Spring Integration Core .

Но, прежде чем мы начнем описание демонстрационного приложения Cafe, вот короткий пример, чтобы начать …

@Configuration
@EnableAutoConfiguration
@IntegrationComponentScan
public class Start {

    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext ctx = SpringApplication.run(Start.class, args);

        List<String> strings = Arrays.asList("foo", "bar");
        System.out.println(ctx.getBean(Upcase.class).upcase(strings));

        ctx.close();
    }

    @MessagingGateway
    public interface Upcase {

        @Gateway(requestChannel = "upcase.input")
        Collection<String> upcase(Collection<String> strings);

    }

    @Bean
    public IntegrationFlow upcase() {
        return f -> f
                .split()                                         // 1
                .<String, String>transform(String::toUpperCase)  // 2
                .aggregate();                                    // 3
    }

}

Мы оставим описание инфраструктуры (аннотации и т. Д.) Основному описанию потока кафе. Здесь мы хотим, чтобы вы сосредоточились на последнем  @BeanIntegrationFlow а также на методе шлюза, который отправляет сообщения этому потоку.

В этом  main методе мы отправляем коллекцию строк в шлюз и печатаем результаты в STDOUT. Сначала поток разбивает коллекцию на отдельные  Strings (1); каждая строка затем преобразуется в верхний регистр (2), и, наконец, мы реагрегируем их обратно в коллекцию (3). Поскольку это конец потока, среда возвращает результат агрегирования обратно на шлюз, и новая полезная нагрузка становится возвращаемое значение из метода шлюза.

Эквивалентная конфигурация XML может быть …

<int:gateway service interface="foo.Upcase" default-request-channel="upcase.input">

<int:splitter input-channel="upcase.input" output-channel="transform"/>

<int:transformer expression="payload.toUpperCase()"
	input-channel="transform"
	output-channel="aggregate" />

<int:aggregator input-channle="aggregate" />

или…

<int:gateway service interface="foo.Upcase" default-request-channel="upcase.input">

<int:chain input-channel="upcase.input">
	<int:splitter />
	<int:transformer expression="payload.toUpperCase()" />
	<int:aggregator />
</int:chain>

1.

@SpringBootApplication

Эта новая мета-аннотация от Spring Boot 1.2. Включает в себя  @Configuration и @EnableAutoConfiguration. Поскольку мы находимся в приложении Spring Integration, и Spring Boot имеет автоматическую настройку для него,  @EnableIntegration он автоматически применяется для инициализации инфраструктуры Spring Integration, включая среду для Java DSL — DslIntegrationConfigurationInitializer, которую выбирает IntegrationConfigurationBeanFactoryPostProcessor from  /META-INF/spring.factories.

2.

@IntegrationComponentScan

Аналог Spring Integration  @ComponentScan для сканирования компонентов на основе интерфейсов (Spring Framework рассматривает  @ComponentScan только классы). Spring Integration поддерживает обнаружение интерфейсов, помеченных  @MessagingGateway (см. № 7 ниже).

3.

ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);

main Метод нашего класса предназначен для запуска приложения Spring загрузки с использованием конфигурации из этого класса и начинает  ApplicationContext через Spring ботинок. Кроме того, он делегирует аргументы командной строки Spring Boot. Например, вы можете указать,  --debug чтобы видеть журналы для отчета автоконфигурации загрузки.

4.

Cafe cafe = ctx.getBean(Cafe.class);

Поскольку у нас уже есть,  ApplicationContext мы можем начать взаимодействовать с приложением. И Cafe это точка входа — в терминах EIP a  gateway. Шлюзы — это просто интерфейсы, и приложение не взаимодействует с Messaging API; это просто имеет дело с доменом (см. № 7 ниже).

5.

for (int i = 1; i <= 100; i++) {

Чтобы продемонстрировать работу кафе, мы инициируем 100 заказов с двумя напитками — горячим и холодным. И отправьте  Order к  Cafe шлюзу.

6.

System.out.println("Hit 'Enter' to terminate");

Обычно приложения Spring Integration являются асинхронными, поэтому, чтобы избежать раннего выхода из main потока, мы блокируем  main метод до некоторого взаимодействия с конечным пользователем через командную строку. Потоки, не являющиеся демонами, будут держать приложение открытым, но  System.read()предоставят нам механизм для его точного закрытия.

7.

@MessagingGateway

Аннотация для обозначения бизнес-интерфейса, указывающего, что он находится  gateway между конечным приложением и уровнем интеграции. Это аналог  <gateway /> компонента из конфигурации Spring Integration XML. Spring Integration создает  Proxy для этого интерфейса и заполняет его как компонент в контексте приложения. Цель этого  Proxy состоит в том, чтобы обернуть параметры в  Message<?> объекте и отправить его в  MessageChannel соответствии с предоставленными параметрами.

8.

@Gateway(requestChannel = "orders.input")

Аннотация на уровне методов к отдельной бизнес-логике как методами, так и целевыми потоками интеграции. В этом примере мы используем  requestChannel ссылку  orders.input, которая является  MessageChannel именем бина нашего  IntegrationFlow входного канала (см. Ниже # 13).

9.

void placeOrder(Order order);

The interface method is a central point to interact from end-application with the integration layer. This method has a void return type. It means that our integration flow is one-wayand we just send messages to the integration flow, but don’t wait for a reply.

10.

private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();

Two counters to gather the information how our cafe works with drinks.

11.

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {

The default poller bean. It is a analogue of <poller default="true"> component from Spring Integration XML configuration. Required for endpoints where the inputChannelis a PollableChannel. In this case, it is necessary for the two Cafe queues — hot and iced (see below #18). Here we use the Pollers factory from the DSL project and use its method-chain fluent API to build the poller metadata. Note that Pollers can be used directly from an IntegrationFlow definition, if a specific poller (rather than the default poller) is needed for an endpoint.

12.

@Bean
public IntegrationFlow orders() {

The IntegrationFlow bean definition. It is the central component of the Spring Integration Java DSL, although it does not play any role at runtime, just during the bean registration phase. All other code below registers Spring Integration components (MessageChannel,MessageHandlerEventDrivenConsumerMessageProducerMessageSource etc.) in theIntegrationFlow object, which is parsed by the IntegrationFlowBeanPostProcessor to process those components and register them as beans in the application context as necessary (some elements, such as channels may already exist).

13.

return f -> f

The IntegrationFlow is a Consumer functional interface, so we can minimize our code and concentrate just only on the integration scenario requirements. Its Lambda acceptsIntegrationFlowDefinition as an argument. This class offers a comprehensive set of methods which can be composed to the chain. We call these EIP-methods, because they provide implementations for EI patterns and populate components from Spring Integration Core. During the bean registration phase, the IntegrationFlowBeanPostProcessor converts this inline (Lambda) IntegrationFlow to a StandardIntegrationFlow and processes its components. The same we can achieve using IntegrationFlows factory (e.g.IntegrationFlow.from("channelX"). ... .get()), but we find the Lambda definition more elegant. An IntegrationFlow definition using a Lambda populates DirectChannel as an inputChannel of the flow and it is registered in the application context as a bean with the name orders.input in this our sample (flow bean name + ".input"). That’s why we use that name for the Cafe gateway.

14.

.split(Order.class, Order::getItems)

Since our integration flow accepts message through the orders.input channel, we are ready to consume and process them. The first EIP-method in our scenario is .split(). We know that the message payload from orders.input channel is an Order domain object, so we can simply use its type here and use the Java 8 method-reference feature. The first parameter is a type of message payload we expect, and the second is a method reference to the getItems() method, which returns Collection<OrderItem>. So, this performs thesplit EI pattern, when we send each collection entry as a separate message to the next channel. In the background, the .split() method registers a MethodInvokingSplitterMessageHandler implementation and the EventDrivenConsumer for thatMessageHandler, and wiring in the orders.input channel as the inputChannel.

15.

.channel(c -> c.executor(Executors.newCachedThreadPool()))

The .channel() EIP-method allows the specification of concrete MessageChannels between endpoints, as it is done via output-channel/input-channel attributes pair with Spring Integration XML configuration. By default, endpoints in the DSL integration flow definition are wired with DirectChannels, which get the bean names based on theIntegrationFlow bean name and index in the flow chain. In this case we use anotherLambda expression, which selects a specific MessageChannel implementation from itsChannels factory and configures it with the fluent API. The current channel here is anExecutorChannel, to allow to distribute messages from the splitter to separateThreads, to process them in parallel in the downstream flow.

16.

.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping

The next EIP-method in our scenario is .route(), to send hot/iced order items to different Cafe kitchens. We again use here a method reference (isIced()) to get theroutingKey from the incoming message. The second Lambda parameter represents arouter mapping — something similar to <mapping> sub-element for the <router>component from Spring Integration XML configuration. However since we are using Java we can go a bit further with its Lambda support! The Spring Integration Java DSL introduced thesubflow definition for routers in addition to traditional channel mapping. Each subflow is executed depending on the routing and, if the subflow produces a result, it is passed to the next element in the flow definition after the router.

17.

.subFlowMapping("true", sf -> sf 

Specifies the integration flow for the current router’s mappingKey. We have in this samples two subflows — hot and iced. The subflow is the same IntegrationFlow functional interface, therefore we can use its Lambda exactly the same as we do on the top levelIntegrationFlow definition. The subflows don’t have any runtime dependency with its parent, it’s just a logical relationship.

18.

.channel(c -> c.queue(10))

We already know that a Lambda definition for the IntegrationFlow starts from[FLOW_BEAN_NAME].input DirectChannel, so it may be a question «how does it work here if we specify .channel() again?». The DSL takes care of such a case and wires those two channels with a BridgeHandler and endpoint. In our sample, we use here a restrictedQueueChannel to reflect the Cafe kitchen busy state from real life. And here is a place where we need that global poller for the next endpoint which is listening on this channel.

19.

.publishSubscribeChannel(c -> c

The .publishSubscribeChannel() EIP-method is a variant of the .channel() for aMessageChannels.publishSubscribe(), but with the .subscribe() option when we can specify subflow as a subscriber to the channel. Right, subflow one more time! So, subflows can be specified to any depth. Independently of the presence .subscribe() subflows, the next endpoint in the parent flow is also a subscriber to this .publishSubscribeChannel(). Since we are in the .route() subflow already, the last subscriber is an implicit BridgeHandlerwhich just pops the message to the top level — to a similar implicit BridgeHandler to pop message to the next .transform() endpoint in the main flow. And one more note about this current position of our flow: the previous EIP-method is .channel(c -> c.queue(10)) and this one is for MessageChannel too. So, they are again tied with an implicit BridgeHandleras well. In a real application we could avoid this .publishSubscribeChannel() just with the single .handle() for the Cafe kitchen, but our goal here to cover DSL features as much as possible. That’s why we distribute the kitchen work to several subflows for the samePublishSubscribeChannel.

20.

.subscribe(s ->

The .subscribe() method accepts an IntegrationFlow as parameter, which can be specified as Lambda to configure subscriber as subflow. We use here several subflow subscribers to avoid multi-line Lambdas and cover some DSL as we as Spring Integration capabilities.

21.

s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))

Here we use a simple .handle() EIP-method just to block the current Thread for some timeout to demonstrate how quickly the Cafe kitchen prepares a drink. Here we use Google Guava Uninterruptibles.sleepUninterruptibly, to avoid using a try...catch block within the Lambda expression, although you can do that and your Lambda will be multi-line. Or you can move that code to a separate method and use it here as method reference.

Since we don’t use any Executor on the .publishSubscribeChannel() all subscribers will beperformed sequentially on the same Thread; in our case it is one of TaskScheduler‘s Threads from poller on the previous QueueChannel. That’s why this sleep blocks all downstream process and allows to demonstrate the busy state for that restricted to 10QueueChannel.

22.

.subscribe(sub -> sub

The next subflow subscriber which will be performed only after that sleep with 1 second foriced drink. We use here one more subflow because .handle() of previous one is one-way with the nature of the Lambda for MessageHandler. That’s why, to go ahead with process of our whole flow, we have several subscribers: some of subflows finish after their work and don’t return anything to the parent flow.

23.

 .<OrderItem, String>transform(item ->
	Thread.currentThread().getName()
		+ " prepared cold drink #"
		+ this.coldDrinkCounter.incrementAndGet()
		+ " for order #" + item.getOrderNumber()
		+ ": " + item) 

The transformer in the current subscriber subflow is to convert the OrderItem to the friendly STDOUT message for the next .handle. Here we see the use of generics with the Lambda expression. This is implemented using the GenericTransformer functional interface.

24.

.handle(m -> System.out.println(m.getPayload())))))

The .handle() here just to demonstrate how to use Lambda expression to print thepayload to STDOUT. It is a signal that our drink is ready. After that the final (implicit) subscriber to the PublishSubscribeChannel just sends the message with the OrderItemto the .transform() in the main flow.

25.

.subFlowMapping("false", sf -> sf

The .subFlowMapping() for the hot drinks. Actually it is similar to the previous iceddrinks subflow, but with specific hot business logic.

26.

s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))

The sleepUninterruptibly for hot drinks. Right, we need more time to boil the water!

27.

 .<OrderItem, Drink>transform(orderItem ->
	new Drink(orderItem.getOrderNumber(),
		orderItem.getDrinkType(),
		orderItem.isIced(),
		orderItem.getShots())) 

The main OrderItem to Drink transformer, which is performed when the .route()subflow returns its result after the Cafe kitchen subscribers have finished preparing the drink.

28.

.aggregate(aggregator -> aggregator

The .aggregate() EIP-method provides similar options to configure anAggregatingMessageHandler and its endpoint, like we can do with the <aggregator>component when using Spring Integration XML configuration. Of course, with the Java DSL we have more power to configure the aggregator just in place, without any other extra beans. And Lambdas come to the rescue again! From the Cafe business logic perspective we compose theDelivery for the initial Order, since we .split() the original order to the OrderItems near the beginning.

29.

.outputProcessor(group -> 

The .outputProcessor() of the AggregatorSpec allows us to emit a custom result after aggregator completes the group. It’s an analogue of ref/method from the <aggregator>component or the @Aggregator annotation on a POJO method. Our goal here to compose aDelivery for all Drinks.

30.

new Delivery(group.getMessages()
	.stream()
	.map(message -> (Drink) message.getPayload())
	.collect(Collectors.toList()))) 

As you see we use here the Java 8 Stream feature for Collection. We iterate over messages from the released MessageGroup and convert (map) each of them to its Drinkpayload. The result of the Stream (.collect()) (a list of Drinks) is passed to theDelivery constructor. The Message with this new Delivery payload is sent to the next endpoint in our Cafe scenario.

31.

.correlationStrategy(m ->
	((Drink) m.getPayload()).getOrderNumber()), null)

The .correlationStrategy() Lambda demonstrates how we can customize an aggregator behaviour. Of course, we can rely here just only on a built-in SequenceDetails from Spring Integration, which is populated by default from .split() in the beginning of our flow to each split message, but the Lambda sample for the CorrelationStrategy is included for illustration. (With XML, we could have used a correlation-expression or a customCorrelationStrategy). The second argument in this line for the .aggregate() EIP-method is for the endpointConfigurer to customize options like autoStartup,requiresReplyadviceChain etc. We use here null to show that we rely on the default options for the endpoint. Many of EIP-methods provide overloaded versions with and withoutendpointConfigurer, but .aggregate() requires an endpoint argument, to avoid an explicit cast for the AggregatorSpec Lambda argument.

32.

.handle(CharacterStreamWritingMessageHandler.stdout());

It is the end of our flow — the Delivery is delivered to the client! We just print here the message payload to STDOUT using out-of-the-boxCharacterStreamWritingMessageHandler from Spring Integration Core. This is a case to show how existing components from Spring Integration Core (and its modules) can be used from the Java DSL.

Well, we have finished describing the Cafe Demo sample based on the Spring Integration Java DSL. Compare it with XML sample to get more information regarding Spring Integration.

This is not an overall tutorial to the DSL stuff. We don’t review here theendpointConfigurer options, Transformers factory, the IntegrationComponentSpechierarchy, the NamespaceFactories, how we can specify several IntegrationFlow beans and wire them to a single application etc., see the Reference Manual for more information.

At least this line-by-line tutorial should show you Spring Integration Java DSL basics and its seamless fusion between Spring Framework Java & Annotation configuration, Spring Integration foundation and Java 8 Lambda support!

Also see the si4demo to see the evolution of Spring Integration including the Java DSL, as shown at the 2014 SpringOne/2GX Conference. (Video should be available soon).

As always, we look forward to your comments and feedback (StackOverflow (spring-integration tag), Spring JIRAGitHub) and we very much welcome contributions!

P.S. Even if this tutorial is fully based on the Java 8 Lambda support, we don’t want to miss pre Java 8 users, we are going to provide similar non-Lambda blog post. Stay tuned!