1. Введение
Цель этого поста — показать, как обрабатываются ошибки при использовании системы обмена сообщениями с Spring Integration. Вы увидите, что обработка ошибок отличается между синхронным и асинхронным обменом сообщениями. Как обычно, я пропущу чат и приступлю к некоторым примерам.
- Вы можете получить исходный код на GitHub .
2. образец приложения
Я буду использовать базовый пример, так как я хочу сосредоточиться на обработке исключений. Приложение состоит из службы заказов, которая принимает заказ, обрабатывает его и возвращает подтверждение.
Ниже мы видим, как настроена система обмена сообщениями:
ИНТ-config.xml
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/><int:channel id="requestChannel"/><int:router input-channel="requestChannel" ref="orderRouter" method="redirectOrder"/><int:channel id="syncChannel"/><int:channel id="asyncChannel"> <int:queue capacity="5"/></int:channel><int:service-activator method="processOrder" input-channel="syncChannel" ref="orderProcessor"/><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"> <int:poller fixed-delay="2000"/></int:service-activator> |
Шлюз является точкой входа в систему обмена сообщениями. Он получит заказ и отправит его на прямой канал «requestChannel», где маршрутизатор перенаправит его на соответствующий канал на основе идентификатора заказа:
- syncChannel: прямой канал, который будет отправлять заказ обработчику заказов, подписанному на этот канал.
- asyncChannel: канал очереди, из которого обработчик заказов будет активно извлекать заказ.
После обработки заказа подтверждение заказа будет отправлено обратно на шлюз. Вот график, представляющий это:
Хорошо, давайте начнем с самого простого случая — синхронной отправки по прямому каналу.
3. Синхронная отправка по прямому каналу
Процессор заказов подписан на прямой канал syncChannel. Метод «processOrder» будет вызываться в потоке отправителя.
|
01
02
03
04
05
06
07
08
09
10
|
public OrderConfirmation processOrder(Order order) { logger.info("Processing order {}", order.getId()); if (isInvalidOrder(order)) { logger.info("Error while processing order [{}]", ERROR_INVALID_ID); throw new InvalidOrderException(ERROR_INVALID_ID); } return new OrderConfirmation("confirmed");} |
Теперь мы реализуем тест, который вызовет исключение, отправив неверный заказ. Этот тест отправит заказ на шлюз:
|
1
2
3
4
|
public interface OrderService { @Gateway public OrderConfirmation sendOrder(Order order);} |
Тест:
TestSyncErrorHandling.java
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})@RunWith(SpringJUnit4ClassRunner.class)public class TestSyncErrorHandling { @Autowired private OrderService service; @Test public void testCorrectOrder() { OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order")); Assert.assertNotNull(confirmation); Assert.assertEquals("confirmed", confirmation.getId()); } @Test public void testSyncErrorHandling() { OrderConfirmation confirmation = null; try { confirmation = service.sendOrder(new Order(1, "an invalid order")); Assert.fail("Should throw a MessageHandlingException"); } catch (MessageHandlingException e) { Assert.assertEquals(InvalidOrderException.class, e.getCause().getClass()); Assert.assertNull(confirmation); } }} |
Мы запускаем тест и видим, как исключение вызывается в процессоре заказов и достигает теста. Хорошо; мы хотели проверить, что отправка неправильного заказа вызвала исключение. Это произошло потому, что тест отправил заказ и заблокировал ожидание обработки заказа в том же потоке. Но что происходит, когда мы используем асинхронный канал? Давайте перейдем к следующему разделу.
4.Асинхронная отправка с очередью канала
Тест этого раздела отправляет заказ, который будет перенаправлен маршрутизатором в канал очереди. Шлюз показан ниже:
|
1
2
3
4
|
public interface OrderService { @Gateway public Future<OrderConfirmation> sendFutureOrder(Order order);} |
Обратите внимание, что на этот раз шлюз возвращает будущее . Если мы не вернем это, шлюз заблокирует тестовый поток. Возвращая Future, шлюз становится асинхронным и не блокирует поток отправителя.
Тест:
TestKoAsyncErrorHandling.java
|
01
02
03
04
05
06
07
08
09
10
11
12
|
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})@RunWith(SpringJUnit4ClassRunner.class)public class TestKoAsyncErrorHandling { @Autowired private OrderService service; @Test(expected=MessageHandlingException.class) public void testAsyncErrorHandling() throws InterruptedException, ExecutionException { Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order")); }} |
Итак, теперь мы собираемся запустить тест и увидеть повышение исключения …
|
1
|
java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException |
К сожалению, тест не пройден, поскольку ни одно исключение не достигло его Что произошло? Ну, объяснение ниже:
|
1
2
3
4
5
6
7
|
<int:channel id="asyncChannel"> <int:queue capacity="5"/></int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"> <int:poller fixed-delay="2000"/></int:service-activator> |
Так как мы используем асинхронный канал (очередь), отправитель отправляет заказ и движется дальше. Затем получатель опрашивает заказ из другого потока. По этой причине будет невозможно вернуть исключение отправителю. Давайте действовать, как ничего не случилось тогда? Ну лучше не надо, есть и другие варианты.
5.Асинхронная обработка ошибок
При использовании асинхронного обмена сообщениями Spring Integration обрабатывает исключения, публикуя их в каналах сообщений. Сгенерированное исключение будет помещено в MessagingException и станет полезной нагрузкой сообщения.
На какой канал отправлено сообщение об ошибке? Сначала он проверит, содержит ли сообщение запроса заголовок с именем «errorChannel». Если найдено, сообщение об ошибке будет отправлено туда. В противном случае сообщение будет отправлено на так называемый канал глобальной ошибки.
5.1 Канал глобальной ошибки
По умолчанию Spring Integration создает глобальный канал ошибок с именем errorChannel. Этот канал является каналом публикации-подписки. Это означает, что мы можем подписать несколько конечных точек на этот канал. Фактически, уже есть подписанная конечная точка: обработчик журналирования. Этот обработчик будет регистрировать полезную нагрузку сообщений, поступающих на канал, хотя он может быть настроен на другое поведение.
Теперь мы подпишем новый обработчик на этот глобальный канал и проверим, что он получает сообщение об исключении, сохраняя его в базе данных.
Прежде всего, нам нужно будет изменить несколько вещей в нашей конфигурации. Я создал новый файл, чтобы он не мешал нашим предыдущим тестам:
ИНТ-Асинхр-config.xml
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="asyncChannel" service-interface="xpadro.spring.integration.service.OrderService" error-channel="errorChannel"/><int:channel id="asyncChannel"> <int:queue capacity="5"/></int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"> <int:poller fixed-delay="2000"/></int:service-activator><int:service-activator input-channel="errorChannel" ref="orderErrorHandler" method="handleFailedOrder"/><bean id="orderErrorHandler" class="xpadro.spring.integration.activator.OrderErrorHandler"/> |
Шлюз : Я добавил канал ошибок. Если вызов не удался, сообщение об ошибке будет отправлено на этот канал. Если бы я не определил канал ошибки, шлюз передал бы исключение вызывающей стороне, но в этом случае это не сработало бы, так как это асинхронный шлюз.
Обработчик ошибок : я определил новую конечную точку, которая подписана на глобальный канал ошибок. Теперь любое сообщение об ошибке, отправленное в глобальный канал ошибок, будет доставлено нашему обработчику.
Я также добавил файл конфигурации для настройки базы данных. Наш обработчик ошибок вставит полученные ошибки в эту базу данных:
дб-config.xml
|
1
2
3
4
5
6
7
8
|
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <constructor-arg ref="dataSource"/></bean><!-- in-memory database --><jdbc:embedded-database id="dataSource"> <jdbc:script location="classpath:db/schemas/schema.sql" /></jdbc:embedded-database> |
Обработчик ошибок довольно прост; он получает сообщение об ошибке и вставляет свою информацию в базу данных:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
public class OrderErrorHandler { @Autowired private JdbcTemplate jdbcTemplate; @ServiceActivator public void handleFailedOrder(Message<MessageHandlingException> message) { Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload(); saveToBD(requestedOrder.getId(), message.getPayload().getMessage()); } private void saveToBD(int orderId, String errorMessage) { String query = "insert into errors(orderid, message) values (?,?)"; jdbcTemplate.update(query, orderId, errorMessage); }} |
Хорошо, теперь все готово. Давайте реализуем новый тест:
TestOkAsyncErrorHandlingTest.java
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-async-config.xml", "/xpadro/spring/integration/config/db-config.xml"})@RunWith(SpringJUnit4ClassRunner.class)public class TestOkAsyncErrorHandling { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private OrderService service; @Before public void prepareTest() { jdbcTemplate.update("delete from errors"); } @Test public void testCorrectOrder() throws InterruptedException, ExecutionException { Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(7, "another correct order")); OrderConfirmation orderConfirmation = confirmation.get(); Assert.assertNotNull(orderConfirmation); Assert.assertEquals("confirmed", orderConfirmation.getId()); } @Test public void testAsyncErrorHandling() throws InterruptedException, ExecutionException { Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order")); Thread.sleep(2000); Assert.assertEquals(1, getSavedErrors()); validateSavedError(6); } private int getSavedErrors() { return jdbcTemplate.queryForObject("select count(*) from errors", Integer.class); } private void validateSavedError(int orderId) { String query = "select * from errors where orderid=?"; Map<String, Object> result = jdbcTemplate.queryForMap(query, orderId); Assert.assertEquals(6, result.get("orderid")); assertThat((String)result.get("message"), containsString("Order ID is invalid")); }} |
На этот раз тест пройден успешно, сообщение об ошибке было сохранено в базе данных.
5.2 Другие механизмы
Пользовательский канал ошибок : Вы можете определить свой канал ошибок и определить его как канал очереди вместо стандартного канала публикации-подписки:
|
1
2
3
4
5
|
<int:poller id="defaultPoller" default="true" fixed-delay="5000" /><int:channel id="errorChannel"> <int:queue capacity="10"/></int:channel> |
ErrorMessageExceptionTypeRouter : этот специализированный маршрутизатор Spring Integration разрешит канал, на который будет отправлено сообщение об ошибке. Решение основывается на наиболее конкретной причине ошибки:
|
1
2
3
4
|
<int:exception-type-router input-channel="errorChannel" default-output-channel="genericErrorChannel"> <int:mapping exception-type="xpadro.spring.integration.exception.InvalidOrderException" channel="invalidChannel" /> <int:mapping exception-type="xpadro.spring.integration.exception.FooException" channel="fooChannel" /></int:exception-type-router> |
6. Заключение
Мы узнали, каковы различные механизмы обработки ошибок при использовании Spring Integration. С помощью этой базы вы сможете расширить ее и настроить обработку ошибок, внедряя преобразователи для извлечения информации из сообщения об ошибке, используя обогатители заголовков для настройки канала ошибок или внедряя собственный маршрутизатор, помимо прочего.
