Первоначально написано
Артемом Биланом в блоге SpringSource.
Уважаемое весеннее сообщество!
Недавно мы опубликовали руководство по интеграции Java Spring DSL: построчно , в котором широко используются Java 8 Lambdas. Мы получили некоторые отзывы о том, что это хорошее введение в DSL, но аналогичное руководство необходимо для тех пользователей, которые не могут перейти на Java 8 или еще не знакомы Lambdas
, но хотят воспользоваться
Итак, чтобы помочь пользователям Spring Integration, которые хотят перейти с конфигурации XML на конфигурацию Java и аннотации, мы предоставляем это, line-by-line tutorial
чтобы продемонстрировать, что даже без Lambdas
этого мы многое извлекаем из использования Spring DS Integration Java DSL. Хотя большинство согласится с тем, что лямбда-синтаксис обеспечивает более краткое определение.
Здесь мы анализируем тот же пример Cafe Demo , но используя для конфигурации вариант до Java 8. Многие параметры одинаковы, поэтому мы просто скопируем / вставим их описание здесь, чтобы получить полную картину. Поскольку эта конфигурация Java DSL Spring Integration сильно отличается от лямбда-стиля Java 8, всем пользователям будет полезно получить представление о том, как мы можем достичь того же результата с богатым набором опций, предоставляемых Java DSL Spring Integration.
Исходный код нашего приложения размещен в одном классе, который является Boot
приложением; значимые строки обозначены цифрами, соответствующими комментариям, которые следуют:
@SpringBootApplication // 1 @IntegrationComponentScan // 2 public class Application { public static void main(String[] args) throws Exception { ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args); // 3 Cafe cafe = ctx.getBean(Cafe.class); // 4 for (int i = 1; i <= 100; i++) { // 5 Order order = new Order(i); order.addItem(DrinkType.LATTE, 2, false); order.addItem(DrinkType.MOCHA, 3, true); cafe.placeOrder(order); } System.out.println("Hit 'Enter' to terminate"); // 6 System.in.read(); ctx.close(); } @MessagingGateway // 7 public interface Cafe { @Gateway(requestChannel = "orders.input") // 8 void placeOrder(Order order); // 9 } private final AtomicInteger hotDrinkCounter = new AtomicInteger(); private final AtomicInteger coldDrinkCounter = new AtomicInteger(); // 10 @Autowired private CafeAggregator cafeAggregator; // 11 @Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerMetadata poller() { // 12 return Pollers.fixedDelay(1000).get(); } @Bean @SuppressWarnings("unchecked") public IntegrationFlow orders() { // 13 return IntegrationFlows.from("orders.input") // 14 .split("payload.items", (Consumer) null) // 15 .channel(MessageChannels.executor(Executors.newCachedThreadPool()))// 16 .route("payload.iced", // 17 new Consumer<RouterSpec<ExpressionEvaluatingRouter>>() { // 18 @Override public void accept(RouterSpec<ExpressionEvaluatingRouter> spec) { spec.channelMapping("true", "iced") .channelMapping("false", "hot"); // 19 } }) .get(); // 20 } @Bean public IntegrationFlow icedFlow() { // 21 return IntegrationFlows.from(MessageChannels.queue("iced", 10)) // 22 .handle(new GenericHandler<OrderItem>() { // 23 @Override public Object handle(OrderItem payload, Map<String, Object> headers) { Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() + " prepared cold drink #" + coldDrinkCounter.incrementAndGet() + " for order #" + payload.getOrderNumber() + ": " + payload); return payload; // 24 } }) .channel("output") // 25 .get(); } @Bean public IntegrationFlow hotFlow() { // 26 return IntegrationFlows.from(MessageChannels.queue("hot", 10)) .handle(new GenericHandler<OrderItem>() { @Override public Object handle(OrderItem payload, Map<String, Object> headers) { Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); // 27 System.out.println(Thread.currentThread().getName() + " prepared hot drink #" + hotDrinkCounter.incrementAndGet() + " for order #" + payload.getOrderNumber() + ": " + payload); return payload; } }) .channel("output") .get(); } @Bean public IntegrationFlow resultFlow() { // 28 return IntegrationFlows.from("output") // 29 .transform(new GenericTransformer<OrderItem, Drink>() { // 30 @Override public Drink transform(OrderItem orderItem) { return new Drink(orderItem.getOrderNumber(), orderItem.getDrinkType(), orderItem.isIced(), orderItem.getShots()); // 31 } }) .aggregate(new Consumer<AggregatorSpec>() { // 32 @Override public void accept(AggregatorSpec aggregatorSpec) { aggregatorSpec.processor(cafeAggregator, null); // 33 } }, null) .handle(CharacterStreamWritingMessageHandler.stdout()) // 34 .get(); } @Component public static class CafeAggregator { // 35 @Aggregator // 36 public Delivery output(List<Drink> drinks) { return new Delivery(drinks); } @CorrelationStrategy // 37 public Integer correlation(Drink drink) { return drink.getOrderNumber(); } } }
Изучение кода построчно …
@SpringBootApplication
Эта новая мета-аннотация от Spring Boot 1.2. Включает в себя @Configuration
и @EnableAutoConfiguration
. Поскольку мы находимся в приложении Spring Integration, и Spring Boot имеет автоматическую настройку для него, @EnableIntegration
он автоматически применяется для инициализации инфраструктуры Spring Integration, включая среду для Java DSL — DslIntegrationConfigurationInitializer
, которую выбирает IntegrationConfigurationBeanFactoryPostProcessor
from /META-INF/spring.factories
.
@IntegrationComponentScan
Аналог Spring Integration @ComponentScan
для сканирования компонентов на основе интерфейсов (Spring Framework рассматривает @ComponentScan
только классы). Spring Integration поддерживает обнаружение интерфейсов, помеченных @MessagingGateway
(см. № 7 ниже).
ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);
main
Метод нашего класса предназначен для запуска приложения Spring загрузки с использованием конфигурации из этого класса и начинает ApplicationContext
через Spring ботинок. Кроме того, он делегирует аргументы командной строки Spring Boot. Например, вы можете указать, --debug
чтобы видеть журналы для отчета автоконфигурации загрузки.
Cafe cafe = ctx.getBean(Cafe.class);
Поскольку у нас уже есть, ApplicationContext
мы можем начать взаимодействовать с приложением. И Cafe
это точка входа — в терминах EIP a gateway
. Шлюзы — это просто интерфейсы, и приложение не взаимодействует с Messaging API; это просто имеет дело с доменом (см. № 7 ниже).
for (int i = 1; i <= 100; i++) {
Чтобы продемонстрировать работу кафе, мы инициируем 100 заказов с двумя напитками — горячим и холодным. И отправьте Order
к Cafe
шлюзу.
System.out.println("Hit 'Enter' to terminate");
Обычно приложения Spring Integration являются асинхронными, поэтому, чтобы избежать раннего выхода из main
потока, мы блокируем main
метод до некоторого взаимодействия с конечным пользователем через командную строку. Потоки, не являющиеся демонами, будут держать приложение открытым, но System.read()
предоставят нам механизм для его точного закрытия.
@MessagingGateway
Аннотация для обозначения бизнес-интерфейса, указывающего, что он находится gateway
между конечным приложением и уровнем интеграции. Это аналог <gateway />
компонента из конфигурации Spring Integration XML. Spring Integration создает Proxy
для этого интерфейса и заполняет его как компонент в контексте приложения. Цель этого Proxy
состоит в том, чтобы обернуть параметры в Message<?>
объекте и отправить его в MessageChannel
соответствии с предоставленными параметрами.
@Gateway(requestChannel = "orders.input")
Аннотация на уровне методов к отдельной бизнес-логике как методами, так и целевыми потоками интеграции. В этом примере мы используем requestChannel
ссылку orders.input
, которая является MessageChannel
именем бина нашего IntegrationFlow
входного канала (см. Ниже # 14).
void placeOrder(Order order);
Метод интерфейса является центральной точкой взаимодействия конечного приложения с уровнем интеграции. Этот метод имеет void
тип возвращаемого значения. Это означает, что наш поток интеграции есть, one-way
и мы просто отправляем сообщения в поток интеграции, но не ждем ответа.
private AtomicInteger hotDrinkCounter = new AtomicInteger(); private AtomicInteger coldDrinkCounter = new AtomicInteger();
Два счетчика для сбора информации о том, как работает наше кафе с напитками.
@Autowired private CafeAggregator cafeAggregator;
POJO для Aggregator
логики (см. № 33 и № 35 ниже). Поскольку это бин Spring, мы можем просто внедрить его даже в текущий @Configuration
и использовать в любом месте ниже, например, из .aggregate()
EIP-метода.
@Bean(name = PollerMetadata.DEFAULT_POLLER) public PollerMetadata poller() {
default
poller
Боб. Это аналог <poller default="true">
компонента из конфигурации Spring Integration XML. Требуется для конечных точек, где inputChannel
есть PollableChannel
. В этом случае это необходимо для двух кафе queues
— горячего и холодного (см. Ниже № 18). Здесь мы используем Pollers
фабрику из проекта DSL и используем его свободно распространяемый API-метод для создания метаданных опроса. Обратите внимание, что Pollers
это можно использовать непосредственно из IntegrationFlow
определения, если poller
для конечной точки требуется определенный (а не поллер по умолчанию).
@Bean public IntegrationFlow orders() {
Определение IntegrationFlow
бина. Это центральный компонент Java DSL Spring Integration, хотя он не играет никакой роли во время выполнения, только на этапе регистрации компонента. Весь другие регистры ниже код компонента Spring Integration ( MessageChannel
, MessageHandler
, EventDrivenConsumer
, MessageProducer
, и MessageSource
т.д.) в IntegrationFlow
объекте, который анализируется самым , IntegrationFlowBeanPostProcessor
чтобы обработать эти компоненты и зарегистрировать их как бобы в контексте применения по мере необходимости (некоторые элементы, такие как каналы уже может существовать ).
return IntegrationFlows.from("orders.input")
Это IntegrationFlows
основной factory
класс для начала IntegrationFlow
. Она обеспечивает ряд перегруженных .from()
методов , чтобы начать вытекать из SourcePollingChannelAdapter
для А MessageSource<?>
реализаций, например JdbcPollingChannelAdapter
, от MessageProducer
, например WebSocketInboundChannelAdapter
; или просто MessageChannel
. Все параметры «.from ()» имеют несколько удобных вариантов для настройки соответствующего компонента для запуска IntegrationFlow
. Здесь мы используем только имя канала, которое преобразуется в определение DirectChannel
компонента во время фазы определения компонента при анализе IntegrationFlow
. В варианте Java 8 мы использовали здесь Lambda definition
— и это MessageChannel
было неявно создано с именем компонента на основе имени IntegrationFlow
компонента.
.split("payload.items", (Consumer) null)
Поскольку наш поток интеграции принимает сообщения через orders.input
канал, мы готовы их использовать и обрабатывать. Первый EIP-метод в нашем сценарии .split()
. Мы знаем, что сообщение payload
от orders.input
канала является Order
доменным объектом, поэтому мы можем просто использовать здесь выражение Spring (SpEL) для возврата Collection<OrderItem>
. Таким образом, выполняется split
шаблон EI, и мы отправляем каждую запись коллекции в виде отдельного сообщения на следующий канал. В фоновом режиме .split()
метод регистрирует ExpressionEvaluatingSplitter
MessageHandler
реализацию и EventDrivenConsumer
для этого MessageHandler
проводку в orders.input
канале как inputChannel
.
Второй аргумент для .split()
EIP-метода для endpointConfigurer
настройки параметров, таких как autoStartup
, requiresReply
и adviceChain
т. Д. Мы используем здесь, null
чтобы показать, что мы полагаемся на параметры по умолчанию для конечной точки. Многие из EIP-методов предоставляют перегруженные версии с и без endpointConfigurer
. В настоящее время .split(String expression)
EIP-метод без endpointConfigurer
аргумента недоступен; это будет решено в будущем выпуске.
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.channel()
EIP-метод позволяет определять конкретный MessageChannel
s между конечными точками, как это делается с помощью output-channel
/ input-channel
атрибуты паров с конфигурацией XML Spring Integration. По умолчанию конечные точки в определении потока интеграции DSL связаны с DirectChannel
s, которые получают имена компонентов на основе имени IntegrationFlow
компонента и index
в цепочке потоков. В этом случае мы выбираем конкретную MessageChannel
реализацию из Channels
класса фабрики; здесь выбран канал ExecutorChannel
, чтобы разрешить распределение сообщений от splitter
отдельных к Thread
, чтобы обрабатывать их параллельно в нисходящем потоке.
.route("payload.iced",
Следующий EIP-метод в нашем сценарии — .route()
отправка hot/iced
заказов на разные кухни кафе. Мы снова используем здесь выражение SpEL, чтобы получить routingKey
входящее сообщение. В варианте Java 8 мы использовали method-reference
лямбда-выражение, но для стиля, предшествующего Java 8, мы должны использовать SpEL или реализацию встроенного интерфейса. Многие анонимные классы в потоке могут затруднить чтение, поэтому в большинстве случаев мы предпочитаем SpEL.
new Consumer<RouterSpec<ExpressionEvaluatingRouter>>() {
Вторым аргументом .route()
EIP-метода является функциональный интерфейс Consumer
для задания ExpressionEvaluatingRouter
опций с помощью RouterSpec
Builder. Поскольку у нас нет никакого выбора в предварительной версии Java 8, мы просто предоставляем здесь встроенную реализацию для этого интерфейса.
spec.channelMapping("true", "iced") .channelMapping("false", "hot");
При Consumer<RouterSpec<ExpressionEvaluatingRouter>>#accept()
реализации мы можем предоставить желаемые AbstractMappingMessageRouter
варианты. Одним из них является то channelMappings
, когда мы определяем логику маршрутизации по результату расширения маршрутизатора и цели MessageChannel
для соответствующего результата. В этом случае iced
и hot
являются MessageChannel
имена IntegrationFlow
с ниже.
.get();
Это завершает поток. Любой IntegrationFlows.from()
метод возвращает IntegrationFlowBuilder
экземпляр, и этот get()
метод извлекает IntegrationFlow
объект из IntegrationFlowBuilder
конфигурации. Все, начиная с .from()
и до метода до, .get()
является IntegrationFlow
определением. Все определенные компоненты хранятся в IntegrationFlow
и обрабатываются на IntegrationFlowBeanPostProcessor
этапе создания компонента.
@Bean public IntegrationFlow icedFlow() {
Это второе IntegrationFlow
определение бобов — для iced
напитков. Здесь мы демонстрируем, что несколько IntegrationFlow
s могут быть соединены вместе для создания одного сложного приложения. Примечание: не рекомендуется вводить одно IntegrationFlow
в другое; это может вызвать неожиданное поведение. Поскольку они предоставляют компоненты интеграции для регистрации bean-компонентов и являются MessageChannel
одним из них, лучший способ подключиться и внедрить это через MessageChannel
или @MessagingGateway
интерфейсы.
return IntegrationFlows.from(MessageChannels.queue("iced", 10))
iced
IntegrationFlow
Начинается с , QueueChannel
что имеет емкость 10
сообщений; это зарегистрировано как боб с названием iced
. Как вы помните, мы используем это имя в качестве одного из отображений маршрута (см. Выше # 19).
В нашем примере мы используем ограничение, QueueChannel
чтобы отразить состояние кафе на кухне в реальной жизни. И здесь есть место, где нам нужно это global poller
для следующей конечной точки, которая прослушивает этот канал.
.handle(new GenericHandler<OrderItem>() {
.handle()
ОПЗ-метод iced
потока демонстрирует бетонное кафе кухни работы. Поскольку мы не можем минимизировать код с помощью чего-то вроде лямбда-выражения Java 8, мы предоставляем здесь встроенную реализацию для GenericHandler
функционального интерфейса с ожидаемым payload
типом в качестве универсального аргумента. В примере с Java 8 мы распределяем это .handle()
между несколькими подпотоками подписчика для a PublishSubscribeChannel
. Однако в этом случае логика все реализована в одном методе.
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() + " prepared cold drink #" + coldDrinkCounter.incrementAndGet() + " for order #" + payload.getOrderNumber() + ": " + payload); return payload;
Реализация бизнес-логики для текущего .handle()
EIP-компонента. С этим Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
мы просто блокируем ток Thread
на некоторое время, чтобы продемонстрировать, как быстро кухня Кафе готовит напиток. После этого мы просто сообщить , STDOUT
что напиток готов и вернуть ток OrderItem
от GenericHandler
к следующей конечной точке в нашей IntegrationFlow
. В фоновом режиме платформа DSL регистрирует a ServiceActivatingHandler
для MethodInvokingMessageProcessor
вызова GenericHandler#handle
во время выполнения. Кроме того, платформа регистрирует PollingConsumer
конечную точку для QueueChannel
вышеупомянутого. Эта конечная точка полагается на default poller
сообщения опроса из очереди. Конечно, мы всегда можем использовать конкретную poller
для любой конкретной конечной точки. В этом случае мы должны были бы предоставить второй endpointConfigurer
аргумент .handle()
EIP-метод.
.channel("output")
Поскольку это не конец нашего сценария Cafe, мы отправляем результат текущего потока на output
канал, используя удобный EIP-метод .channel()
и имя MessageChannel
компонента (см. Ниже # 29). Это логическое завершение текущего подпотока замороженного напитка, поэтому мы используем .get()
метод для возврата IntegrationFlow
. Потоки, которые заканчиваются обработчиком ответа, у которого нет финала .channel()
, вернут ответ в replyChannel
заголовок сообщения .
@Bean public IntegrationFlow hotFlow() {
IntegrationFlow
Определение hot
напитков. Это похоже на предыдущий iced
поток напитков, но с определенной hot
бизнес-логикой. Это начинается с того, hot
QueueChannel
который сопоставлен с маршрутизатором выше.
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
sleepUninterruptibly
Для hot
напитков. Правильно, нам нужно больше времени кипятить воду!
@Bean public IntegrationFlow resultFlow() {
Еще одно IntegrationFlow
определение bean-компонента для подготовки Delivery
клиента Cafe на основе Drink
s.
return IntegrationFlows.from("output")
В resultFlow
начинается с DirectChannel
, который создается во время фазы определения фасоли с этим именем при условии. Вы должны помнить, что мы используем output
название канала из потоков Кафе-кухни в последних .channel()
определениях.
.transform(new GenericTransformer<OrderItem, Drink>() {
The .transform()
EIP-method is for the appropriate pattern implementation and expects some object to convert one payload to another. In our sample we use an inline implementation of the GenericTransformer
functional interface to convert OrderItem
to Drink
and we specify that using generic arguments. In the background, the DSL framework registers aMessageTransformingHandler
and an EventDrivenConsumer
endpoint with default options to consume messages from the output
MessageChannel
.
public Drink transform(OrderItem orderItem) { return new Drink(orderItem.getOrderNumber(), orderItem.getDrinkType(), orderItem.isIced(), orderItem.getShots()); }
The business-specific GenericTransformer#transform()
implementation to demonstrate how we benefit from Java Generics to transform one payload
to another. Note: Spring Integration uses ConversionService
before any method invocation and if you provide some specific Converter
implementation, some domain payload
can be converted to another automatically, when the framework has an appropriate registered Converter
.
.aggregate(new Consumer<AggregatorSpec>() {
The .aggregate()
EIP-method provides options to configure anAggregatingMessageHandler
and its endpoint, similar to what we can do with the<aggregator>
component when using Spring Integration XML configuration. Of course, with the Java DSL we have more power to configure the aggregator in place, without any other extra beans. However we demonstrate here an aggregator configuration with annotations (see below #35). From the Cafe business logic perspective we compose the Delivery
for the initial Order
, since we .split()
the original order to the OrderItem
s near the beginning.
public void accept(AggregatorSpec aggregatorSpec) { aggregatorSpec.processor(cafeAggregator, null); }
An inline implementation of the Consumer
for the AggregatorSpec
. Using theaggregatorSpec
Builder we can provide desired options for the aggregator
component, which will be registered as an AggregatingMessageHandler
bean. Here we just provide theprocessor
as a reference to the autowired (see #11 above) CafeAggregator
component (see #35 below). The second argument of the .processor()
option is methodName
. Since we are relying on the aggregator annotation configuration for the POJO, we don’t need to provide the method here and the framework will determine the correct POJO methods in the background.
.handle(CharacterStreamWritingMessageHandler.stdout())
It is the end of our flow — the Delivery
is delivered to the client! We just print here the message payload
to STDOUT using out-of-the-boxCharacterStreamWritingMessageHandler
from Spring Integration Core. This is a case to show how existing components from Spring Integration Core (and its modules) can be used from the Java DSL.
@Component public static class CafeAggregator {
The bean to specify the business logic for the aggregator
above. This bean is picked up by the @ComponentScan
, which is a part of the @SpringBootApplication
meta-annotation (see above #1). So, this component becomes a bean and we can automatically wire (@Autowired
) it to other components in the application context (see #11 above).
@Aggregator public Delivery output(List<Drink> drinks) { return new Delivery(drinks); }
The POJO-specific MessageGroupProcessor
to build the output payload
based on the payloads from aggregated messages. Since we mark this method with the @Aggregator
annotation, the target AggregatingMessageHandler
can extract this method for theMethodInvokingMessageGroupProcessor
.
@CorrelationStrategy public Integer correlation(Drink drink) { return drink.getOrderNumber(); }
The POJO-specific CorrelationStrategy
to extract the custom correlationKey
from each inbound aggregator message. Since we mark this method with @CorrelationStrategy
annotation the target AggregatingMessageHandler
can extract this method for theMethodInvokingCorrelationStrategy
. There is a similar self-explained@ReleaseStrategy
annotation, but we rely in our Cafe sample just on the defaultSequenceSizeReleaseStrategy
, which is based on the sequenceDetails
message header populated by the splitter
from the beginning of our integration flow.
Well, we have finished describing the Cafe Demo sample based on the Spring Integration Java DSL when Java Lambda support is not available. Compare it with XML sample and also seeLambda support tutorial to get more information regarding Spring Integration.
As you can see, using the DSL without lambdas is a little more verbose because you need to provide boilerplate code for inline anonymous implementations of functional interfaces. However, we believe it is important to support the use of the DSL for users who can’t yet move to Java 8. Many of the DSL benefits (fluent API, compile-time validation etc) are available for all users.
The use of lambdas continues the Spring Framework tradition of reducing or eliminating boilerplate code, so we encourage users to try Java 8 and lambdas and to encourage their organizations to consider allowing the use of Java 8 for Spring Integration applications.
In addition see the Reference Manual for more information.
As always, we look forward to your comments and feedback (StackOverflow (spring-integration
tag), Spring JIRA, GitHub) and we very much welcome contributions!
Thank you for your time and patience to read this!