Эта статья является частью нашего курса Академии под названием Spring Integration для EAI .
В этом курсе вы познакомитесь с шаблонами интеграции корпоративных приложений и с тем, как Spring Integration обращается к ним. Далее вы углубитесь в основы Spring Integration, такие как каналы, преобразователи и адаптеры. Проверьте это здесь !
Содержание
1. Введение
Во втором уроке вы изучите основные понятия, составляющие ядро Spring Integration. После объяснения этих понятий мы рассмотрим различные компоненты, поставляемые с проектом. Эта редакция основана на версии 3.0.1 . Учитывая, что выпуск 4.0.0 будет выпущен в ближайшее время, вы можете найти некоторые новые компоненты, которые не описаны в этом руководстве. В любом случае, вы получите достаточно знаний о структуре, чтобы понять поведение будущих компонентов.
Завершая этот урок, вы узнаете, как Spring Integration поддерживает различные типы коммуникаций (асинхронные и синхронные) и как это решение может повлиять на ваш дизайн. Одним из особых случаев является обработка ошибок, которая описана в последнем разделе.
Этот урок состоит из следующих разделов:
- Вступление
- Что такое весенняя интеграция?
- Основные понятия системы обмена сообщениями Spring Integration
- Компоненты
- Синхронная и асинхронная связь
- Обработка ошибок
2. Что такое весенняя интеграция?
Как объяснялось в предыдущем разделе, Spring Integration основана на концепциях, описанных в книге «Шаблоны интеграции предприятия». Это легкое решение для обмена сообщениями, которое добавит возможности интеграции в ваше приложение Spring. Как стратегия обмена сообщениями, она обеспечивает быстрый обмен информацией с высокой степенью развязки между задействованными компонентами или приложениями. Вы узнаете, как этого добиться, в то время как Spring решает любые проблемы инфраструктуры низкого уровня. Это позволит вам сосредоточиться на вашей бизнес-логике.
В настоящее время конфигурация Spring Integration в основном основана на xml, хотя некоторые аннотации начинают включаться. Примеры, показанные в этом руководстве, также будут основаны на xml, хотя я покажу соответствующую аннотацию, когда это возможно.
Объяснив это, возникает вопрос: что вы можете сделать с помощью Spring Integration? Фреймворк в основном позволяет вам делать следующее:
- Он обеспечивает связь между компонентами в вашем приложении на основе обмена сообщениями в памяти. Это позволяет этим прикладным компонентам свободно соединяться друг с другом, обмениваясь данными по каналам сообщений.
- Это позволяет общаться с внешними системами. Вам просто нужно отправить информацию; Spring Integration будет обрабатывать его отправку в указанную внешнюю систему и при необходимости возвращает ответ. Конечно, это работает наоборот; Spring Integration будет обрабатывать входящие звонки из внешней системы в ваше приложение. Это будет объяснено позже в этом уроке.
Spring Integration нацелен на лучшие практики среды Spring, такие как программирование с интерфейсами или методика композиции поверх наследования. Его основными преимуществами являются:
- Слабая связь между компонентами.
- Событийно-ориентированная архитектура.
- Логика интеграции (обрабатывается платформой) отделена от бизнес-логики.
В следующем разделе вы узнаете, каковы три основных понятия, на которых основана эта система обмена сообщениями.
3. Основные понятия системы обмена сообщениями Spring Integration
Основными понятиями архитектуры, управляемой сообщениями , являются: сообщение , канал сообщения и конечная точка сообщения .
API довольно прост:
- Сообщение отправляется на конечную точку
- Конечные точки связаны между собой через MessageChannels
- Конечная точка может получать сообщения от MessageChannel
3.1 Сообщение
Сообщение содержит информацию, которая будет передана различным компонентам приложения или отправлена во внешнюю систему. Но что такое сообщение? Сообщение структурировано следующим образом:
Как вы можете видеть в следующем фрагменте, сообщение — это интерфейс с GenericMessage в качестве его основной реализации (также предоставляемой платформой):
- Заголовок : содержит метаинформацию о сообщении. Если вы проверите класс MessageHeaders , вы увидите, что это просто оболочка Map, но ее операции вставки помечены как неподдерживаемые. Структура помечает их так, потому что сообщение считается неизменным. После того, как сообщение было создано, вы не можете его изменить. Вы можете добавить свои собственные заголовки в виде пары ключ-значение, но они в основном используются для передачи транспортной информации. Например, если вы хотите отправить электронное письмо, оно будет содержать заголовки, такие как, subject, from…
- Полезная нагрузка : это обычный Java-класс, который будет содержать информацию, которой вы хотите поделиться. Это может быть любой тип Java.
Если вы хотите создать сообщение, у вас есть два варианта. Первый включает использование класса построителя ( MessageBuilder ).
1
2
3
4
5
|
Message<String> message = MessageBuilder .withPayload( "my message payload" ) .setHeader( "key1" , "value1" ) .setHeader( "key2" , "value2" ) .build(); |
Вы должны установить полезную нагрузку и требуемые заголовки перед его построением, поскольку после создания сообщения вы не сможете сделать это, если не создадите новое сообщение.
Другой вариант заключается в использовании реализации, предоставляемой платформой:
1
2
3
4
5
|
Map<String, Object> headers = new HashMap<>(); headers.put( "key1" , "value1" ); headers.put( "key2" , "value2" ); Message<String> message = new GenericMessage<String>( "my message payload" , headers); |
3.2 Канал сообщений
Канал сообщений — это канал, который соединяет конечные точки и куда проходят сообщения. Производители отправляют сообщения на канал, а потребители получают их с канала. С этим механизмом вам не нужен какой-либо брокер.
Канал сообщений также может использоваться в качестве точки перехвата или для мониторинга сообщений.
В зависимости от того, как сообщение используется, каналы сообщений классифицируются следующим образом:
3.2.1 Точка-точка
К каналу сообщений подключен только один приемник. Ну, это не совсем на 100% верно. Если это подписываемый канал, вы можете иметь более одного получателя, но только один будет обрабатывать сообщение. А пока забудьте об этом, поскольку это сложная тема, которая будет рассмотрена позже в этом курсе (настройка диспетчера). Этот тип канала имеет несколько реализаций:
- DirectChannel : реализует SubscribeableChannel . Сообщение отправляется подписчику через тот же поток получателя. Эта связь является синхронной, и блок производителя пока не получен ответ. Как это работает:
- Производитель отправляет сообщение на канал.
- Канал отправляет сообщение своему подписчику (пассивному подписчику).
- QueueChannel : Реализует PollableChannel . К каналу подключена одна конечная точка, нет подписчиков. Это сообщение асинхронное; получатель получит сообщение через другой поток. Как это работает:
- Производитель отправляет сообщение на канал.
- Канал ставит сообщение в очередь.
- Потребитель активно извлекает сообщение (активный получатель).
- ExecutorChannel : реализует
SubscribableChannel
. Отправка делегируется TaskExecutor. Это означает, что метод send () не будет блокироваться.
- PriorityChannel : Реализует
PollableChannel
. Аналогичен QueueChannel, но сообщения упорядочены по приоритету вместо FIFO.
- RendezvousChannel : Реализует
PollableChannel
. Похож на QueueChannel, но с нулевой емкостью. Производитель будет блокировать, пока получатель не вызовет свой метод receive ().
3.2.2 Публикация-подписка
Канал может иметь несколько конечных точек, подписанных на него. Таким образом, сообщение будет обрабатываться разными получателями.
- PublishSubscribeChannel : Реализует
SubscribableChannel
. Подписанные получатели могут быть вызваны последовательно через поток производителя. Если мы укажем TaskExecutor, получатели будут вызываться параллельно через разные потоки.
3.2.3 Временные каналы
Это специальный тип канала, который создается автоматически конечными точками, для которых явно не определен выходной канал. Созданный канал является двухточечным анонимным каналом. Вы можете увидеть его в заголовке сообщения под именем replyChannel
.
Эти типы каналов автоматически удаляются после отправки ответа. Не рекомендуется явно определять выходной канал, если он вам не нужен. Каркас справится с этим за вас.
3.3 Конечная точка сообщения
Его целью является неинвазивное соединение приложения с платформой обмена сообщениями. Если вы знакомы с Spring MVC, конечная точка будет обрабатывать сообщения так же, как контроллер MVC обрабатывает HTTP-запрос. Конечная точка будет сопоставлена с каналом сообщений таким же образом, как контроллер MVC сопоставлен с шаблоном URL.
Ниже приведен список с кратким описанием доступных конечных точек сообщений:
- Канальный адаптер : подключает приложение к внешней системе (однонаправлено).
- Шлюз : соединяет приложение с внешней системой (двунаправленная).
- Активатор службы : может вызвать операцию над объектом службы.
- Трансформатор : преобразует содержимое сообщения.
- Фильтр : определяет, может ли сообщение продолжать свой путь к выходному каналу.
- Маршрутизатор : решает, на какой канал будет отправлено сообщение.
- Разделитель : разбивает сообщение на несколько частей.
- Агрегатор : объединяет несколько сообщений в одно.
Следующий раздел этого руководства объясняет каждую из этих конечных точек.
4. Компоненты
В этом разделе вы узнаете, каковы различные конечные точки и как их можно использовать в Spring Integration.
4.1 Канальные адаптеры
Канальный адаптер является конечной точкой, которая позволяет вашему приложению подключаться к внешним системам. Если вы посмотрите ссылку, вы увидите предоставленные типы, такие как подключение к очередям JMS, базы данных MongoDB, RMI, веб-сервисы и т. Д.
Существует четыре типа адаптеров:
- Адаптер входящего канала : однонаправленный. Он получает сообщение от внешней системы. Затем он поступает в нашу систему обмена сообщениями через канал сообщений, где мы будем его обрабатывать.
- Адаптер исходящего канала : однонаправленный. Наша система сообщений создает сообщение и отправляет его во внешнюю систему.
- Входящий шлюз : двунаправленный. Сообщение поступает в приложение и ожидает ответа. Ответ будет отправлен обратно во внешнюю систему.
- Исходящий шлюз : двунаправленный. Приложение создает сообщение и отправляет его во внешнюю систему. Затем шлюз будет ждать ответа.
4.2 Трансформатор
Эта конечная точка используется для преобразования полезной нагрузки. Он преобразует тип полезной нагрузки в другой тип. Например, из строки в документ XML. Просто учтите, что преобразование полезной нагрузки приведет к появлению нового сообщения (помните, что сообщение является неизменным!). Этот тип конечной точки увеличивает слабую связь между производителями и потребителями, потому что потребитель не должен знать, что является созданным типом производителя. Преобразователь позаботится об этом и предоставит тип контента, который ждет потребитель.
Spring Integration предоставляет несколько реализаций Transformer. Вот некоторые примеры:
- HeaderEnricher: позволяет добавить значения заголовка к сообщению.
- ObjectToMapTransformer: преобразует объект в карту, преобразовывая его атрибуты в значения карты.
- ObjectToStringTransformer: преобразует объект в строку. Он преобразует его, вызывая операцию toString ().
- PayloadSerializingTransformer / PayloadDeserializingTransformer: Преобразует из объекта в байтовый массив и наоборот.
Давайте посмотрим на пару примеров:
Предполагая, что у нас есть следующая модель:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
public class Order implements Serializable { private static final long serialVersionUID = 1L; private int id; private String description; public Order() {} public Order( int id, String description) { this .id = id; this .description = description; } @Override public String toString() { return String.valueOf( this .getId()); } //Setters & Getters } |
Когда это отправляется в канал сообщений с именем «requestChannel», следующий фрагмент кода автоматически преобразует экземпляр Order в String, вызывая его метод toString ():
1
|
<int:object-to-string-transformer input-channel= "requestChannel" output-channel= "transformedChannel" /> |
Результирующая строка будет отправлена в выходной канал с именем transformedChannel
.
Если вам нужно более настраиваемое преобразование, вы можете реализовать свой собственный преобразователь, который является обычным компонентом. Вам нужно будет указать указанный компонент в элементе преобразователя следующим образом:
1
2
|
<int:transformer ref= "myTransformer" method= "transform" input-channel= "requestChannel" output-channel= "transformedChannel" /> |
Преобразователь вызывает метод «преобразования» компонента с именем «myTransformer». Этот боб показан ниже:
1
2
3
4
5
6
7
|
@Component ( "myTransformer" ) public class MyTransformer { public Order transform(Order requestOrder) { return new Order(requestOrder.getId(), requestOrder.getDescription()+ "_modified" ); } } |
В этом примере атрибут method
элемента преобразователя не является обязательным, поскольку преобразователь имеет только один метод. Если бы у него было несколько методов, вам нужно было бы установить атрибут «method», чтобы сообщить платформе, какой метод вызывать. Или, если вы предпочитаете аннотации, вы можете указать метод, используя аннотацию @Transformer на уровне метода:
01
02
03
04
05
06
07
08
09
10
11
12
|
@Component ( "myTransformer" ) public class MyTransformer { @Transformer public Order transform(Order requestOrder) { return new Order(requestOrder.getId(), requestOrder.getDescription()+ "_modified" ); } public Order doOtherThings(Order requestOrder) { //do other things } } |
4.3 Фильтр
Фильтр используется для определения, должно ли сообщение продолжать свой путь или, наоборот, отбрасываться. Чтобы решить, что делать, он основан на некоторых критериях.
Следующая реализация фильтра будет получать экземпляры Order из входного канала и отбрасывать экземпляры с недопустимым описанием. Действительные заказы будут отправлены на выходной канал:
1
|
<int:filter ref= "myFilter" method= "filterInvalidOrders" input-channel= "requestChannel" output-channel= "filteredChannel" /> |
Метод фильтра возвращает логический тип. Если он возвращает false, сообщение будет отклонено:
01
02
03
04
05
06
07
08
09
10
11
|
@Component ( "myFilter" ) public class MyFilter { public boolean filterInvalidOrders(Order order) { if (order == null || "invalid order" .equals(order.getDescription())) { return false ; } return true ; } } |
Как и в случае с преобразователем, атрибут method
будет необходим, только если в компоненте фильтра определено более одного метода. Чтобы указать метод, который вы хотите вызвать, используйте аннотацию @Filter:
1
2
|
@Filter public boolean filterInvalidOrders(Order order) { |
Spring Expression Language
Если ваш фильтр будет очень простым, вы можете пропустить любой класс Java для реализации фильтра. Вы можете определить фильтр с помощью SpEL . Например, следующий фрагмент кода реализует тот же фильтр, что и выше, но без кода Java:
1
|
<int:filter expression= "!payload.description.equals('invalid order')" input-channel= "requestChannel" output-channel= "filteredChannel" /> |
Отбрасывать сообщения
В конфигурации по умолчанию отброшенные сообщения просто отбрасываются. Мы можем изменить это, и, если мы решим сделать это, у нас есть два варианта:
1. Возможно, мы не хотим потерять ни одного сообщения. В этом случае мы можем сгенерировать исключение:
1
2
|
<int:filter expression= "!payload.description.equals('invalid order')" input-channel= "requestChannel" output-channel= "filteredChannel" throw-exception-on-rejection= "true" /> |
2. Мы хотим зарегистрировать все пропущенные сообщения. Мы можем настроить канал сброса:
1
2
|
<int:filter expression= "!payload.description.equals('invalid order')" input-channel= "requestChannel" output-channel= "filteredChannel" discard-channel= "discardedOrders" /> |
4.4 Маршрутизатор
Маршрутизатор позволяет перенаправить сообщение на определенный канал сообщений в зависимости от условия.
Как обычно, фреймворк предоставляет некоторые из самых базовых реализаций. В следующем примере используется маршрутизатор типа полезной нагрузки. Он будет получать сообщения из канала запроса и в зависимости от типа полезной нагрузки отправляет его в другой выходной канал:
1
2
3
4
|
<int:payload- type -router input-channel= "requestChannel" > <int:mapping type = "String" channel= "stringChannel" /> <int:mapping type = "Integer" channel= "integerChannel" /> < /int :payload- type -router> |
Вы можете проверить полный список здесь .
Теперь давайте вернемся к нашему примеру заказов и собираемся внедрить маршрутизатор, который будет перенаправлять сообщения в зависимости от описания заказа.
1
|
<int:router ref= "myRouter" input-channel= "requestChannel" default-output-channel= "genericOrders" /> |
Реализация маршрутизатора содержит метод, который возвращает имя канала сообщения, куда сообщение будет перенаправлено:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
@Component ( "myRouter" ) public class MyRouter { public String routeOrder(Order order) { String returnChannel = "genericOrders" ; if (order.getDescription().startsWith( "US-" )) { returnChannel = "usOrders" ; } else if (order.getDescription().startsWith( "EU-" )) { returnChannel = "europeOrders" ; } return returnChannel; } } |
Если у вас есть несколько методов, вы можете использовать аннотацию @Router
:
1
2
|
@Router public String routeOrder(Order order) { |
Как и в случае с фильтром, вы можете направлять сообщения на основе языка Spring Express.
4.5 Сплиттер и агрегатор
Цель разделителя — получить сообщение и разбить его на несколько частей. Эти части затем отправляются отдельно, поэтому они могут быть обработаны независимо. Эта конечная точка обычно объединяется с агрегатором.
Агрегатор берет список сообщений и объединяет их в одно сообщение. Это как раз наоборот сплиттера.
Вы лучше увидите это на примере:
Мы собираемся изменить пример заказа, чтобы Splitter получил пакет заказа. Этот пакет содержит несколько связанных заказов, которые разделитель разделит. Сплиттер принимает пакет заказов и возвращает список заказов:
1
|
<int:splitter input-channel= "requestChannel" ref= "mySplitter" output-channel= "splitChannel" /> |
Реализация сплиттера очень проста:
1
2
3
4
5
6
7
|
@Component ( "mySplitter" ) public class MySplitter { public List<Order> splitOrderPackage(OrderPackage orderPackage) { return orderPackage.getOrders(); } } |
Сплиттер возвращает список заказов, но он может вернуть любой из следующих:
- Коллекция или массив сообщений.
- Коллекция или массив объектов Java. Каждый элемент списка будет включен как полезная нагрузка сообщения.
- Сообщение.
- Объект Java (будет включен в полезную нагрузку сообщения).
Следуя примеру, есть конечная точка агрегатора, которая подключена к каналу «splitChannel». Этот агрегатор берет список и объединяет свои заказы, чтобы сформировать подтверждение заказа, добавляя количество каждого заказа:
1
2
3
|
<int:channel id = "splitChannel" /> <int:aggregator ref= "myAggregator" input-channel= "splitChannel" output-channel= "outputChannel" /> |
Реализация агрегатора:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
@Component ( "myAggregator" ) public class MyAggregator { public OrderConfirmation confirmOrders(List<Order> orders) { int total = 0 ; for (Order order:orders) { total += order.getQuantity(); } OrderConfirmation confirmation = new OrderConfirmation( "3" ); confirmation.setQuantity(total); return confirmation; } } |
4.5.1 Корреляция и стратегии выпуска
Когда сообщение разделяется конечной точкой разделителя, устанавливаются два заголовка:
- MessageHeaders.CORRELATION_ID
- MessageHeaders.SEQUENCE_SIZE
Эти заголовки используются конечной точкой агрегатора для правильного объединения сообщений. Он будет удерживать сообщения до тех пор, пока набор сообщений с одинаковым идентификатором корреляции не будет готов. И когда он будет готов? Он будет готов, когда будет достигнут размер последовательности.
Корреляционная стратегия
Разрешить группировать сообщения. По умолчанию он группирует все сообщения с одинаковым значением в заголовке CORRELATION_ID
. Есть несколько стратегий на выбор.
Стратегия релиза
По умолчанию группа сообщений будет считаться завершенной, когда ее размер достигнет значения, указанного в заголовке сообщения SEQUENCE_SIZE
.
4.6 Poller
В Spring Integration есть два типа потребителей:
- Активные потребители
- Пассивные потребители
Пассивные компоненты — это те, которые подписаны на подписываемый канал. Таким образом, когда сообщение отправляется на этот тип канала, канал будет вызывать своих подписчиков. Метод потребителя будет вызываться пассивно.
Активные компоненты — это те, которые подключены к каналу, подлежащему опросу Таким образом, сообщения будут помещаться в канал, ожидая, пока потребитель активно извлечет их из канала.
Поллеры используются для указания того, как активный потребитель получает эти сообщения. Вот пара примеров:
Базовая конфигурация опроса
Он будет опрашивать канал сообщений с интервалом в одну секунду
1
2
3
|
<int:service-activator method= "processOrder" input-channel= "pollableChannel" ref= "orderProcessor" > <int:poller fixed-rate= "1000" /> < /int :service-activator> |
Poller настроен с использованием выражения Cron
Он будет опрашивать канал сообщений каждые 30 минут
1
2
3
|
<int:service-activator method= "processOrder" input-channel= "pollableChannel" ref= "orderProcessor" > <int:poller cron = "0 0/30 * * * ?" /> < /int :service-activator> |
Одна вещь, которую нужно принять во внимание, — то, что, если потребитель связан с опрашиваемым каналом, ему потребуется опрашивающий. Если нет, то будет сделано исключение. Если вы не хотите настраивать программу опроса для каждого активного потребителя, вы можете определить программу опроса по умолчанию:
1
|
<int:poller id = "defaultPoller" fixed-rate= "1000" default= "true" /> |
Не забудьте установить атрибуты по default
и id
.
4.7 Мост обмена сообщениями
Конечная точка этого типа соединяет два канала сообщений или два канальных адаптера. Например, вы можете подключить канал SubscribableChannel
каналу PollableChannel
.
Вот образец:
01
02
03
04
05
06
07
08
09
10
11
|
<int:channel id = "requestChannel" /> <int:bridge input-channel= "requestChannel" output-channel= "pollableChannel" /> <int:channel id = "pollableChannel" > <int:queue capacity= "5" /> < /int :channel> <int:service-activator method= "processOrder" input-channel= "pollableChannel" ref= "orderProcessor" /> <int:poller id = "defaultPoller" fixed-rate= "1000" default= "true" /> |
В этом примере мост обмена сообщениями получает сообщение от входного канала и публикует его в выходной канал. В этом случае у нас есть активатор службы, подключенный к выходному каналу. Обработчик заказов (активатор службы) будет опрашивать канал сообщений с интервалом в одну секунду.
4.8 Цепочка обработки сообщений
Цепочка обработчиков сообщений используется для упрощения конфигурации, когда у вас есть несколько обработчиков сообщений, подключенных линейно. В следующем примере показана конфигурация обмена сообщениями, которая будет упрощена с помощью цепочки обработчиков:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
<int:channel id = "requestChannel" /> <int:channel id = "responseChannel" /> <int:filter ref= "myFilter" method= "filterInvalidOrders" input-channel= "requestChannel" output-channel= "filteredChannel" /> <int:channel id = "filteredChannel" /> <int:transformer ref= "myTransformer" method= "transform" input-channel= "filteredChannel" output-channel= "transformedChannel" /> <int:channel id = "transformedChannel" /> <int:service-activator method= "processOrder" input-channel= "transformedChannel" ref= "orderProcessor" output-channel= "responseChannel" /> |
Сообщение проходит через фильтр, затем достигает преобразователя, и, наконец, сообщение будет обработано активатором службы. После этого сообщение будет отправлено на выходной канал responseChannel.
Используя цепочку фильтров сообщений, конфигурация будет упрощена так:
1
2
3
4
5
6
7
8
|
<int:channel id = "requestChannel" /> <int:channel id = "responseChannel" /> <int:chain input-channel= "requestChannel" output-channel= "responseChannel" > <int:filter ref= "myFilter" method= "filterInvalidOrders" /> <int:transformer ref= "myTransformer" method= "transform" /> <int:service-activator ref= "orderProcessor" method= "processOrder" /> < /int :chain> |
5. Синхронная и асинхронная связь
Как объяснено в первом уроке этого курса , связь может выполняться синхронно или асинхронно. В этом разделе показано, как изменить это сообщение.
5.1 Каналы сообщений
В зависимости от того, как вы настроили каналы сообщений, сообщения будут извлекаться синхронно или асинхронно. Изменять не так много, только конфигурацию.
Например, представьте, что у нас есть прямой канал типа «точка-точка», как показано ниже:
1
|
<int:channel id = "requestChannel" /> |
Сообщение, отправленное на этот канал, будет немедленно доставлено пассивному потребителю (подписчику). Если ожидается ответ, отправитель будет ждать, пока он не будет отправлен ему. Чтобы изменить это, нам просто нужно добавить очередь:
1
2
3
|
<int:channel id = "requestChannel" > <int:queue capacity= "5" /> < /int :channel> |
Вот и все. Теперь канал может поставить в очередь до пяти сообщений. Потребитель будет активно получать сообщения, поставленные в очередь в этом канале, из потока, отличного от отправителя.
А как насчет каналов публикации и подписки? Давайте возьмем похожий пример, настроив синхронный канал:
1
|
<int:publish-subscribe-channel id = "mySubscribableChannel" /> |
В этом случае мы изменим его поведение с помощью исполнителя задачи:
1
2
3
|
<int:publish-subscribe-channel id = "mySubscribableChannel" task-executor= "myTaskExecutor" /> <task:executor id = "myTaskExecutor" pool-size= "5" /> |
5.2 Шлюзы
Шлюз — это тип канального адаптера, который можно использовать для:
- Предоставить механизм входа / выхода в систему обмена сообщениями. Таким образом, приложение может отправить сообщение в систему обмена сообщениями, которая обработает его через конечные точки сообщения.
- Отправьте сообщение во внешнюю систему и дождитесь ответа (выходной шлюз)
- Получите сообщение из внешней системы и отправьте ответ после его обработки (входящий шлюз).
В этом примере используется первый случай. Приложение отправит сообщение через шлюз и будет ждать, пока система обмена сообщениями его обработает. Здесь мы будем использовать синхронный шлюз. Таким образом, тестовое приложение отправит сообщение и заблокирует, ожидая ответа.
Интерфейс
Все вызовы его метода sendOrder
будут sendOrder
шлюзом. Смотрите, что нет реализации этого интерфейса. Шлюз обернет его для перехвата этих вызовов.
1
2
3
4
|
public interface OrderService { @Gateway public OrderConfirmation sendOrder(Order order); } |
Конфигурация
Шлюз связан с интерфейсом для перехвата его вызовов и отправки сообщения в систему обмена сообщениями.
1
2
3
4
|
<int:gateway default-request-channel= "requestChannel" service-interface= "xpadro.spring.integration.service.OrderService" /> <int:channel id = "requestChannel" /> |
Тест
Сервисный интерфейс (шлюз) внедряется в приложение. Вызов метода "sendOrder"
отправит объект Order в систему обмена сообщениями, заключенную в сообщение.
1
2
3
4
5
6
7
8
9
|
@Autowired private OrderService service; @Test public void testSendOrder() { OrderConfirmation confirmation = service.sendOrder( new Order( 3 , "a correct order" )); Assert.assertNotNull(confirmation); Assert.assertEquals( "confirmed" , confirmation.getId()); } |
В этом другом примере тестовый класс будет блокироваться, пока подтверждение заказа не будет отправлено обратно. Теперь мы собираемся настроить его, чтобы сделать его асинхронным:
Интерфейс
Единственное изменение здесь — вернуть будущее
1
2
3
4
|
public interface OrderService { @Gateway public Future<OrderConfirmation> sendFutureOrder(Order order); } |
Тест
Теперь тест должен обработать объект Future, который будет возвращен из шлюза.
01
02
03
04
05
06
07
08
09
10
|
@Autowired private OrderService service; @Test public void testSendCorrectOrder() throws ExecutionException { Future<OrderConfirmation> confirmation = service.sendFutureOrder( new Order( 3 , "a correct order" )); OrderConfirmation orderConfirmation = confirmation.get(); Assert.assertNotNull(orderConfirmation); Assert.assertEquals( "confirmed" , orderConfirmation.getId()); } |
6. Обработка ошибок
В этом последнем разделе этого руководства будут объяснены различия в обработке ошибок в зависимости от того, какой тип связи мы настроили: синхронный или асинхронный.
При синхронной связи отправитель блокируется, пока сообщение отправляется в систему обмена сообщениями с использованием того же потока. Очевидно, что если возникает исключение, оно будет достигать приложения (наш тест в примере из предыдущего раздела).
Но при асинхронном взаимодействии потребитель получает сообщение из другого потока. Если это вызывает исключение, оно не достигает приложения. Как Spring Integration справляется с этим? Вот где появляется канал ошибок.
Когда возникает исключение, оно помещается в MessagingException , становясь полезной нагрузкой нового сообщения. Это сообщение отправлено:
- Канал ошибки: Этот канал определяется как заголовок с именем «errorChannel» в исходном заголовке сообщения.
- Канал глобальной ошибки: если в заголовке сообщения не определен канал ошибки, он отправляется в канал глобальной ошибки. Этот канал определяется по умолчанию Spring Integration.
Канал глобальной ошибки
Этот канал является каналом публикации-подписки. Это означает, что мы можем подписать наши собственные конечные точки на этот канал и получать любую возникшую ошибку. Фактически, Spring Integration уже подписывает конечную точку: обработчик журналирования. Этот обработчик регистрирует полезную нагрузку любого сообщения, отправленного на канал глобальной ошибки.
Чтобы подписать другую конечную точку для обработки исключения, нам просто нужно настроить его следующим образом:
1
2
3
|
<int:service-activator input-channel= "errorChannel" ref= "myExceptionHandler" method= "handleInvalidOrder" /> <bean id = "myExceptionHandler" class= "xpadro.spring.integration.activator.MyExceptionHandler" /> |
Метод handleInvalidOrder
нашей конечной точки активатора службы получит исключение обмена сообщениями:
01
02
03
04
05
06
07
08
09
10
|
public class MyExceptionHandler { @ServiceActivator public void handleInvalidOrder(Message<MessageHandlingException> message) { //Retrieve the failed order (payload of the source message) Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload(); //Handle exception ... } } |