Первоначально автор
Артем Билан в блоге SpringSource
Уважаемое весеннее сообщество!
Сразу после объявления о выпуске Spring Integration Java DSL 1.0 GA я хочу представить вам Spring Integration Java DSL в виде построчного руководства на основе классического примера интеграции Cafe Demo . Мы описываем здесь поддержку Spring Boot , конфигурацию Spring Framework Java и аннотаций , эту IntegrationFlow
функцию и отдаем дань поддержке Java 8 Lambda, которая послужила вдохновением для стиля DSL. Конечно, все это поддерживается проектом Spring Integration Core .
Но, прежде чем мы начнем описание демонстрационного приложения Cafe, вот короткий пример, чтобы начать …
@Configuration @EnableAutoConfiguration @IntegrationComponentScan public class Start { public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext ctx = SpringApplication.run(Start.class, args); List<String> strings = Arrays.asList("foo", "bar"); System.out.println(ctx.getBean(Upcase.class).upcase(strings)); ctx.close(); } @MessagingGateway public interface Upcase { @Gateway(requestChannel = "upcase.input") Collection<String> upcase(Collection<String> strings); } @Bean public IntegrationFlow upcase() { return f -> f .split() // 1 .<String, String>transform(String::toUpperCase) // 2 .aggregate(); // 3 } }
Мы оставим описание инфраструктуры (аннотации и т. Д.) Основному описанию потока кафе. Здесь мы хотим, чтобы вы сосредоточились на последнем @Bean
, IntegrationFlow
а также на методе шлюза, который отправляет сообщения этому потоку.
В этом main
методе мы отправляем коллекцию строк в шлюз и печатаем результаты в STDOUT. Сначала поток разбивает коллекцию на отдельные String
s (1); каждая строка затем преобразуется в верхний регистр (2), и, наконец, мы реагрегируем их обратно в коллекцию (3). Поскольку это конец потока, среда возвращает результат агрегирования обратно на шлюз, и новая полезная нагрузка становится возвращаемое значение из метода шлюза.
Эквивалентная конфигурация XML может быть …
<int:gateway service interface="foo.Upcase" default-request-channel="upcase.input"> <int:splitter input-channel="upcase.input" output-channel="transform"/> <int:transformer expression="payload.toUpperCase()" input-channel="transform" output-channel="aggregate" /> <int:aggregator input-channle="aggregate" />
или…
<int:gateway service interface="foo.Upcase" default-request-channel="upcase.input"> <int:chain input-channel="upcase.input"> <int:splitter /> <int:transformer expression="payload.toUpperCase()" /> <int:aggregator /> </int:chain>
Кафе Демо
Цель Cafe Demo
приложения — показать, как можно использовать шаблоны корпоративной интеграции (EIP) для отражения order-delivery
сценария в реальном кафе. С помощью этого приложения мы обрабатываем несколько заказов на напитки — горячие и холодные. После запуска приложения мы видим в стандартном выводе ( System.out.println
), как холодные напитки готовятся быстрее, чем горячие. Однако доставка всего заказа откладывается до готовности горячего напитка.
Для того, чтобы отразить модель предметной области есть несколько классов: Order
, OrderItem
, Drink
и Delivery
. Все они упоминаются в сценарии интеграции, но мы не будем их здесь анализировать, потому что они достаточно просты.
Исходный код нашего приложения размещен только в одном классе; значимые строки обозначены цифрами, соответствующими комментариям, которые следуют:
@SpringBootApplication // 1 @IntegrationComponentScan // 2 public class Application { public static void main(String[] args) throws Exception { ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);// 3 Cafe cafe = ctx.getBean(Cafe.class); // 4 for (int i = 1; i <= 100; i++) { // 5 Order order = new Order(i); order.addItem(DrinkType.LATTE, 2, false); //hot order.addItem(DrinkType.MOCHA, 3, true); //iced cafe.placeOrder(order); } System.out.println("Hit 'Enter' to terminate"); // 6 System.in.read(); ctx.close(); } @MessagingGateway // 7 public interface Cafe { @Gateway(requestChannel = "orders.input") // 8 void placeOrder(Order order); // 9 } private AtomicInteger hotDrinkCounter = new AtomicInteger(); private AtomicInteger coldDrinkCounter = new AtomicInteger(); // 10 @Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerMetadata poller() { // 11 return Pollers.fixedDelay(1000).get(); } @Bean public IntegrationFlow orders() { // 12 return f -> f // 13 .split(Order.class, Order::getItems) // 14 .channel(c -> c.executor(Executors.newCachedThreadPool()))// 15 .<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping // 16 .subFlowMapping("true", sf -> sf // 17 .channel(c -> c.queue(10)) // 18 .publishSubscribeChannel(c -> c // 19 .subscribe(s -> // 20 s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))// 21 .subscribe(sub -> sub // 22 .<OrderItem, String>transform(item -> Thread.currentThread().getName() + " prepared cold drink #" + this.coldDrinkCounter.incrementAndGet() + " for order #" + item.getOrderNumber() + ": " + item) // 23 .handle(m -> System.out.println(m.getPayload())))))// 24 .subFlowMapping("false", sf -> sf // 25 .channel(c -> c.queue(10)) .publishSubscribeChannel(c -> c .subscribe(s -> s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))// 26 .subscribe(sub -> sub .<OrderItem, String>transform(item -> Thread.currentThread().getName() + " prepared hot drink #" + this.hotDrinkCounter.incrementAndGet() + " for order #" + item.getOrderNumber() + ": " + item) .handle(m -> System.out.println(m.getPayload())))))) .<OrderItem, Drink>transform(orderItem -> new Drink(orderItem.getOrderNumber(), orderItem.getDrinkType(), orderItem.isIced(), orderItem.getShots())) // 27 .aggregate(aggregator -> aggregator // 28 .outputProcessor(group -> // 29 new Delivery(group.getMessages() .stream() .map(message -> (Drink) message.getPayload()) .collect(Collectors.toList()))) // 30 .correlationStrategy(m -> ((Drink) m.getPayload()).getOrderNumber()), null) // 31 .handle(CharacterStreamWritingMessageHandler.stdout()); // 32 } }
Изучение кода построчно …
@SpringBootApplication
Эта новая мета-аннотация от Spring Boot 1.2. Включает в себя @Configuration
и @EnableAutoConfiguration
. Поскольку мы находимся в приложении Spring Integration, и Spring Boot имеет автоматическую настройку для него, @EnableIntegration
он автоматически применяется для инициализации инфраструктуры Spring Integration, включая среду для Java DSL — DslIntegrationConfigurationInitializer
, которую выбирает IntegrationConfigurationBeanFactoryPostProcessor
from /META-INF/spring.factories
.
@IntegrationComponentScan
Аналог Spring Integration @ComponentScan
для сканирования компонентов на основе интерфейсов (Spring Framework рассматривает @ComponentScan
только классы). Spring Integration поддерживает обнаружение интерфейсов, помеченных @MessagingGateway
(см. № 7 ниже).
ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);
main
Метод нашего класса предназначен для запуска приложения Spring загрузки с использованием конфигурации из этого класса и начинает ApplicationContext
через Spring ботинок. Кроме того, он делегирует аргументы командной строки Spring Boot. Например, вы можете указать, --debug
чтобы видеть журналы для отчета автоконфигурации загрузки.
Cafe cafe = ctx.getBean(Cafe.class);
Поскольку у нас уже есть, ApplicationContext
мы можем начать взаимодействовать с приложением. И Cafe
это точка входа — в терминах EIP a gateway
. Шлюзы — это просто интерфейсы, и приложение не взаимодействует с Messaging API; это просто имеет дело с доменом (см. № 7 ниже).
for (int i = 1; i <= 100; i++) {
Чтобы продемонстрировать работу кафе, мы инициируем 100 заказов с двумя напитками — горячим и холодным. И отправьте Order
к Cafe
шлюзу.
System.out.println("Hit 'Enter' to terminate");
Обычно приложения Spring Integration являются асинхронными, поэтому, чтобы избежать раннего выхода из main
потока, мы блокируем main
метод до некоторого взаимодействия с конечным пользователем через командную строку. Потоки, не являющиеся демонами, будут держать приложение открытым, но System.read()
предоставят нам механизм для его точного закрытия.
@MessagingGateway
Аннотация для обозначения бизнес-интерфейса, указывающего, что он находится gateway
между конечным приложением и уровнем интеграции. Это аналог <gateway />
компонента из конфигурации Spring Integration XML. Spring Integration создает Proxy
для этого интерфейса и заполняет его как компонент в контексте приложения. Цель этого Proxy
состоит в том, чтобы обернуть параметры в Message<?>
объекте и отправить его в MessageChannel
соответствии с предоставленными параметрами.
@Gateway(requestChannel = "orders.input")
Аннотация на уровне методов к отдельной бизнес-логике как методами, так и целевыми потоками интеграции. В этом примере мы используем requestChannel
ссылку orders.input
, которая является MessageChannel
именем бина нашего IntegrationFlow
входного канала (см. Ниже # 13).
void placeOrder(Order order);
The interface method is a central point to interact from end-application with the integration layer. This method has a void
return type. It means that our integration flow is one-way
and we just send messages to the integration flow, but don’t wait for a reply.
private AtomicInteger hotDrinkCounter = new AtomicInteger(); private AtomicInteger coldDrinkCounter = new AtomicInteger();
Two counters to gather the information how our cafe works with drinks.
@Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerMetadata poller() {
The default
poller
bean. It is a analogue of <poller default="true">
component from Spring Integration XML configuration. Required for endpoints where the inputChannel
is a PollableChannel
. In this case, it is necessary for the two Cafe queues
— hot and iced (see below #18). Here we use the Pollers
factory from the DSL project and use its method-chain fluent API to build the poller metadata. Note that Pollers
can be used directly from an IntegrationFlow
definition, if a specific poller
(rather than the default poller) is needed for an endpoint.
@Bean public IntegrationFlow orders() {
The IntegrationFlow
bean definition. It is the central component of the Spring Integration Java DSL, although it does not play any role at runtime, just during the bean registration phase. All other code below registers Spring Integration components (MessageChannel
,MessageHandler
, EventDrivenConsumer
, MessageProducer
, MessageSource
etc.) in theIntegrationFlow
object, which is parsed by the IntegrationFlowBeanPostProcessor
to process those components and register them as beans in the application context as necessary (some elements, such as channels may already exist).
return f -> f
The IntegrationFlow
is a Consumer
functional interface, so we can minimize our code and concentrate just only on the integration scenario requirements. Its Lambda
acceptsIntegrationFlowDefinition
as an argument. This class offers a comprehensive set of methods which can be composed to the chain
. We call these EIP-methods
, because they provide implementations for EI patterns and populate components from Spring Integration Core. During the bean registration phase, the IntegrationFlowBeanPostProcessor
converts this inline (Lambda) IntegrationFlow
to a StandardIntegrationFlow
and processes its components. The same we can achieve using IntegrationFlows
factory (e.g.IntegrationFlow.from("channelX"). ... .get()
), but we find the Lambda definition more elegant. An IntegrationFlow
definition using a Lambda populates DirectChannel
as an inputChannel
of the flow and it is registered in the application context as a bean with the name orders.input
in this our sample (flow bean name + ".input"
). That’s why we use that name for the Cafe
gateway.
.split(Order.class, Order::getItems)
Since our integration flow accepts message through the orders.input
channel, we are ready to consume and process them. The first EIP-method in our scenario is .split()
. We know that the message payload
from orders.input
channel is an Order
domain object, so we can simply use its type here and use the Java 8 method-reference
feature. The first parameter is a type of message payload
we expect, and the second is a method reference to the getItems()
method, which returns Collection<OrderItem>
. So, this performs thesplit
EI pattern, when we send each collection entry as a separate message to the next channel. In the background, the .split()
method registers a MethodInvokingSplitter
MessageHandler
implementation and the EventDrivenConsumer
for thatMessageHandler
, and wiring in the orders.input
channel as the inputChannel
.
.channel(c -> c.executor(Executors.newCachedThreadPool()))
The .channel()
EIP-method allows the specification of concrete MessageChannel
s between endpoints, as it is done via output-channel
/input-channel
attributes pair with Spring Integration XML configuration. By default, endpoints in the DSL integration flow definition are wired with DirectChannel
s, which get the bean names based on theIntegrationFlow
bean name and index
in the flow chain. In this case we use anotherLambda
expression, which selects a specific MessageChannel
implementation from itsChannels
factory and configures it with the fluent API. The current channel here is anExecutorChannel
, to allow to distribute messages from the splitter
to separateThread
s, to process them in parallel in the downstream flow.
.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping
The next EIP-method in our scenario is .route()
, to send hot/iced
order items to different Cafe kitchens. We again use here a method reference (isIced()
) to get theroutingKey
from the incoming message. The second Lambda parameter represents arouter mapping
— something similar to <mapping>
sub-element for the <router>
component from Spring Integration XML configuration. However since we are using Java we can go a bit further with its Lambda support! The Spring Integration Java DSL introduced thesubflow
definition for router
s in addition to traditional channel mapping
. Each subflow is executed depending on the routing and, if the subflow produces a result, it is passed to the next element in the flow definition after the router.
.subFlowMapping("true", sf -> sf
Specifies the integration flow for the current router’s mappingKey
. We have in this samples two subflows — hot
and iced
. The subflow is the same IntegrationFlow
functional interface, therefore we can use its Lambda exactly the same as we do on the top levelIntegrationFlow
definition. The subflows don’t have any runtime dependency with its parent, it’s just a logical relationship.
.channel(c -> c.queue(10))
We already know that a Lambda definition for the IntegrationFlow
starts from[FLOW_BEAN_NAME].input
DirectChannel
, so it may be a question «how does it work here if we specify .channel()
again?». The DSL takes care of such a case and wires those two channels with a BridgeHandler
and endpoint. In our sample, we use here a restrictedQueueChannel
to reflect the Cafe kitchen busy state from real life. And here is a place where we need that global poller
for the next endpoint which is listening on this channel.
.publishSubscribeChannel(c -> c
The .publishSubscribeChannel()
EIP-method is a variant of the .channel()
for aMessageChannels.publishSubscribe()
, but with the .subscribe()
option when we can specify subflow as a subscriber to the channel. Right, subflow one more time! So, subflows can be specified to any depth. Independently of the presence .subscribe()
subflows, the next endpoint in the parent flow is also a subscriber to this .publishSubscribeChannel()
. Since we are in the .route()
subflow already, the last subscriber is an implicit BridgeHandler
which just pops the message to the top level — to a similar implicit BridgeHandler
to pop message to the next .transform()
endpoint in the main flow. And one more note about this current position of our flow: the previous EIP-method is .channel(c -> c.queue(10))
and this one is for MessageChannel
too. So, they are again tied with an implicit BridgeHandler
as well. In a real application we could avoid this .publishSubscribeChannel()
just with the single .handle()
for the Cafe kitchen, but our goal here to cover DSL features as much as possible. That’s why we distribute the kitchen work to several subflows for the samePublishSubscribeChannel
.
.subscribe(s ->
The .subscribe()
method accepts an IntegrationFlow
as parameter, which can be specified as Lambda to configure subscriber as subflow
. We use here several subflow subscribers to avoid multi-line Lambdas and cover some DSL as we as Spring Integration capabilities.
s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))
Here we use a simple .handle()
EIP-method just to block the current Thread for some timeout to demonstrate how quickly the Cafe kitchen prepares a drink. Here we use Google Guava Uninterruptibles.sleepUninterruptibly
, to avoid using a try...catch
block within the Lambda expression, although you can do that and your Lambda will be multi-line. Or you can move that code to a separate method and use it here as method reference
.
Since we don’t use any Executor
on the .publishSubscribeChannel()
all subscribers will beperformed sequentially on the same Thread; in our case it is one of TaskScheduler
‘s Threads from poller
on the previous QueueChannel
. That’s why this sleep
blocks all downstream process and allows to demonstrate the busy state
for that restricted to 10QueueChannel
.
.subscribe(sub -> sub
The next subflow subscriber which will be performed only after that sleep
with 1 second foriced
drink. We use here one more subflow because .handle()
of previous one is one-way
with the nature of the Lambda for MessageHandler
. That’s why, to go ahead with process of our whole flow, we have several subscribers: some of subflows finish after their work and don’t return anything to the parent flow.
.<OrderItem, String>transform(item -> Thread.currentThread().getName() + " prepared cold drink #" + this.coldDrinkCounter.incrementAndGet() + " for order #" + item.getOrderNumber() + ": " + item)
The transformer
in the current subscriber subflow is to convert the OrderItem
to the friendly STDOUT message for the next .handle
. Here we see the use of generics with the Lambda expression. This is implemented using the GenericTransformer
functional interface.
.handle(m -> System.out.println(m.getPayload())))))
The .handle()
here just to demonstrate how to use Lambda expression to print thepayload
to STDOUT. It is a signal that our drink is ready. After that the final (implicit) subscriber to the PublishSubscribeChannel
just sends the message with the OrderItem
to the .transform()
in the main flow.
.subFlowMapping("false", sf -> sf
The .subFlowMapping()
for the hot
drinks. Actually it is similar to the previous iced
drinks subflow, but with specific hot
business logic.
s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))
The sleepUninterruptibly
for hot
drinks. Right, we need more time to boil the water!
.<OrderItem, Drink>transform(orderItem -> new Drink(orderItem.getOrderNumber(), orderItem.getDrinkType(), orderItem.isIced(), orderItem.getShots()))
The main OrderItem
to Drink
transformer
, which is performed when the .route()
subflow returns its result after the Cafe kitchen subscribers have finished preparing the drink.
.aggregate(aggregator -> aggregator
The .aggregate()
EIP-method provides similar options to configure anAggregatingMessageHandler
and its endpoint, like we can do with the <aggregator>
component when using Spring Integration XML configuration. Of course, with the Java DSL we have more power to configure the aggregator just in place, without any other extra beans. And Lambdas come to the rescue again! From the Cafe business logic perspective we compose theDelivery
for the initial Order
, since we .split()
the original order to the OrderItem
s near the beginning.
.outputProcessor(group ->
The .outputProcessor()
of the AggregatorSpec
allows us to emit a custom result after aggregator completes the group. It’s an analogue of ref
/method
from the <aggregator>
component or the @Aggregator
annotation on a POJO method. Our goal here to compose aDelivery
for all Drink
s.
new Delivery(group.getMessages() .stream() .map(message -> (Drink) message.getPayload()) .collect(Collectors.toList())))
As you see we use here the Java 8 Stream
feature for Collection
. We iterate over messages from the released MessageGroup
and convert (map
) each of them to its Drink
payload
. The result of the Stream
(.collect()
) (a list of Drink
s) is passed to theDelivery
constructor. The Message
with this new Delivery
payload is sent to the next endpoint in our Cafe scenario.
.correlationStrategy(m -> ((Drink) m.getPayload()).getOrderNumber()), null)
The .correlationStrategy()
Lambda demonstrates how we can customize an aggregator behaviour. Of course, we can rely here just only on a built-in SequenceDetails
from Spring Integration, which is populated by default from .split()
in the beginning of our flow to each split message, but the Lambda sample for the CorrelationStrategy
is included for illustration. (With XML, we could have used a correlation-expression
or a customCorrelationStrategy
). The second argument in this line for the .aggregate()
EIP-method is for the endpointConfigurer
to customize options like autoStartup
,requiresReply
, adviceChain
etc. We use here null
to show that we rely on the default options for the endpoint. Many of EIP-methods provide overloaded versions with and withoutendpointConfigurer
, but .aggregate()
requires an endpoint argument, to avoid an explicit cast for the AggregatorSpec
Lambda argument.
.handle(CharacterStreamWritingMessageHandler.stdout());
It is the end of our flow — the Delivery
is delivered to the client! We just print here the message payload
to STDOUT using out-of-the-boxCharacterStreamWritingMessageHandler
from Spring Integration Core. This is a case to show how existing components from Spring Integration Core (and its modules) can be used from the Java DSL.
Well, we have finished describing the Cafe Demo sample based on the Spring Integration Java DSL. Compare it with XML sample to get more information regarding Spring Integration.
This is not an overall tutorial to the DSL stuff. We don’t review here theendpointConfigurer
options, Transformers
factory, the IntegrationComponentSpec
hierarchy, the NamespaceFactories
, how we can specify several IntegrationFlow
beans and wire them to a single application etc., see the Reference Manual for more information.
At least this line-by-line tutorial should show you Spring Integration Java DSL basics and its seamless fusion between Spring Framework Java & Annotation configuration, Spring Integration foundation and Java 8 Lambda support!
Also see the si4demo to see the evolution of Spring Integration including the Java DSL, as shown at the 2014 SpringOne/2GX Conference. (Video should be available soon).
As always, we look forward to your comments and feedback (StackOverflow (spring-integration
tag), Spring JIRA, GitHub) and we very much welcome contributions!
P.S. Even if this tutorial is fully based on the Java 8 Lambda support, we don’t want to miss pre Java 8 users, we are going to provide similar non-Lambda blog post. Stay tuned!