Статьи

Как работает обработка ошибок в Spring Integration

1. Введение

Цель этого поста — показать, как обрабатываются ошибки при использовании системы обмена сообщениями с Spring Integration. Вы увидите, что обработка ошибок отличается между синхронным и асинхронным обменом сообщениями. Как обычно, я пропущу чат и приступлю к некоторым примерам.

  • Вы можете получить исходный код на GitHub .

2. образец приложения

Я буду использовать базовый пример, так как я хочу сосредоточиться на обработке исключений. Приложение состоит из службы заказов, которая принимает заказ, обрабатывает его и возвращает подтверждение.

Ниже мы видим, как настроена система обмена сообщениями:

ИНТ-config.xml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
<context:component-scan base-package="xpadro.spring.integration"/>
 
<int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/>
 
<int:channel id="requestChannel"/>
 
<int:router input-channel="requestChannel" ref="orderRouter" method="redirectOrder"/>
 
<int:channel id="syncChannel"/>
 
<int:channel id="asyncChannel">
    <int:queue capacity="5"/>
</int:channel>
 
<int:service-activator method="processOrder" input-channel="syncChannel" ref="orderProcessor"/>
 
<int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor">
    <int:poller fixed-delay="2000"/>
</int:service-activator>

Шлюз является точкой входа в систему обмена сообщениями. Он получит заказ и отправит его на прямой канал «requestChannel», где маршрутизатор перенаправит его на соответствующий канал на основе идентификатора заказа:

  • syncChannel: прямой канал, который будет отправлять заказ обработчику заказов, подписанному на этот канал.
  • asyncChannel: канал очереди, из которого обработчик заказов будет активно извлекать заказ.

После обработки заказа подтверждение заказа будет отправлено обратно на шлюз. Вот график, представляющий это:

Grafic

Хорошо, давайте начнем с самого простого случая — синхронной отправки по прямому каналу.

3. Синхронная отправка по прямому каналу

Процессор заказов подписан на прямой канал syncChannel. Метод «processOrder» будет вызываться в потоке отправителя.

01
02
03
04
05
06
07
08
09
10
public OrderConfirmation processOrder(Order order) {
    logger.info("Processing order {}", order.getId());
     
    if (isInvalidOrder(order)) {
        logger.info("Error while processing order [{}]", ERROR_INVALID_ID);
        throw new InvalidOrderException(ERROR_INVALID_ID);
    }
     
    return new OrderConfirmation("confirmed");
}

Теперь мы реализуем тест, который вызовет исключение, отправив неверный заказ. Этот тест отправит заказ на шлюз:

1
2
3
4
public interface OrderService {
    @Gateway
    public OrderConfirmation sendOrder(Order order);
}

Тест:

TestSyncErrorHandling.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestSyncErrorHandling {
     
    @Autowired
    private OrderService service;
     
    @Test
    public void testCorrectOrder() {
        OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order"));
        Assert.assertNotNull(confirmation);
        Assert.assertEquals("confirmed", confirmation.getId());
    }
     
    @Test
    public void testSyncErrorHandling() {
        OrderConfirmation confirmation = null;
        try {
            confirmation = service.sendOrder(new Order(1, "an invalid order"));
            Assert.fail("Should throw a MessageHandlingException");
        } catch (MessageHandlingException e) {
            Assert.assertEquals(InvalidOrderException.class, e.getCause().getClass());
            Assert.assertNull(confirmation);
        }
    }
}

Мы запускаем тест и видим, как исключение вызывается в процессоре заказов и достигает теста. Хорошо; мы хотели проверить, что отправка неправильного заказа вызвала исключение. Это произошло потому, что тест отправил заказ и заблокировал ожидание обработки заказа в том же потоке. Но что происходит, когда мы используем асинхронный канал? Давайте перейдем к следующему разделу.

4.Асинхронная отправка с очередью канала

Тест этого раздела отправляет заказ, который будет перенаправлен маршрутизатором в канал очереди. Шлюз показан ниже:

1
2
3
4
public interface OrderService {
    @Gateway
    public Future<OrderConfirmation> sendFutureOrder(Order order);
}

Обратите внимание, что на этот раз шлюз возвращает будущее . Если мы не вернем это, шлюз заблокирует тестовый поток. Возвращая Future, шлюз становится асинхронным и не блокирует поток отправителя.

Тест:

TestKoAsyncErrorHandling.java

01
02
03
04
05
06
07
08
09
10
11
12
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestKoAsyncErrorHandling {
     
    @Autowired
    private OrderService service;
     
    @Test(expected=MessageHandlingException.class)
    public void testAsyncErrorHandling() throws InterruptedException, ExecutionException {
        Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));
    }
}

Итак, теперь мы собираемся запустить тест и увидеть повышение исключения …

1
java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException

К сожалению, тест не пройден, поскольку ни одно исключение не достигло его Что произошло? Ну, объяснение ниже:

1
2
3
4
5
6
7
<int:channel id="asyncChannel">
    <int:queue capacity="5"/>
</int:channel>
 
<int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor">
    <int:poller fixed-delay="2000"/>
</int:service-activator>

Так как мы используем асинхронный канал (очередь), отправитель отправляет заказ и движется дальше. Затем получатель опрашивает заказ из другого потока. По этой причине будет невозможно вернуть исключение отправителю. Давайте действовать, как ничего не случилось тогда? Ну лучше не надо, есть и другие варианты.

5.Асинхронная обработка ошибок

При использовании асинхронного обмена сообщениями Spring Integration обрабатывает исключения, публикуя их в каналах сообщений. Сгенерированное исключение будет помещено в MessagingException и станет полезной нагрузкой сообщения.

На какой канал отправлено сообщение об ошибке? Сначала он проверит, содержит ли сообщение запроса заголовок с именем «errorChannel». Если найдено, сообщение об ошибке будет отправлено туда. В противном случае сообщение будет отправлено на так называемый канал глобальной ошибки.

5.1 Канал глобальной ошибки

По умолчанию Spring Integration создает глобальный канал ошибок с именем errorChannel. Этот канал является каналом публикации-подписки. Это означает, что мы можем подписать несколько конечных точек на этот канал. Фактически, уже есть подписанная конечная точка: обработчик журналирования. Этот обработчик будет регистрировать полезную нагрузку сообщений, поступающих на канал, хотя он может быть настроен на другое поведение.

Теперь мы подпишем новый обработчик на этот глобальный канал и проверим, что он получает сообщение об исключении, сохраняя его в базе данных.

Прежде всего, нам нужно будет изменить несколько вещей в нашей конфигурации. Я создал новый файл, чтобы он не мешал нашим предыдущим тестам:

ИНТ-Асинхр-config.xml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
<context:component-scan base-package="xpadro.spring.integration"/>
 
<int:gateway default-request-channel="asyncChannel" service-interface="xpadro.spring.integration.service.OrderService"
    error-channel="errorChannel"/>
 
<int:channel id="asyncChannel">
    <int:queue capacity="5"/>
</int:channel>
 
<int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor">
    <int:poller fixed-delay="2000"/>
</int:service-activator>
 
<int:service-activator input-channel="errorChannel" ref="orderErrorHandler" method="handleFailedOrder"/>
 
<bean id="orderErrorHandler" class="xpadro.spring.integration.activator.OrderErrorHandler"/>

Шлюз : Я добавил канал ошибок. Если вызов не удался, сообщение об ошибке будет отправлено на этот канал. Если бы я не определил канал ошибки, шлюз передал бы исключение вызывающей стороне, но в этом случае это не сработало бы, так как это асинхронный шлюз.

Обработчик ошибок : я определил новую конечную точку, которая подписана на глобальный канал ошибок. Теперь любое сообщение об ошибке, отправленное в глобальный канал ошибок, будет доставлено нашему обработчику.

Я также добавил файл конфигурации для настройки базы данных. Наш обработчик ошибок вставит полученные ошибки в эту базу данных:

дб-config.xml

1
2
3
4
5
6
7
8
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
    <constructor-arg ref="dataSource"/>
</bean>
 
<!-- in-memory database -->
<jdbc:embedded-database id="dataSource">
    <jdbc:script location="classpath:db/schemas/schema.sql" />
</jdbc:embedded-database>

Обработчик ошибок довольно прост; он получает сообщение об ошибке и вставляет свою информацию в базу данных:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
public class OrderErrorHandler {
    @Autowired
    private JdbcTemplate jdbcTemplate;
     
    @ServiceActivator
    public void handleFailedOrder(Message<MessageHandlingException> message) {
        Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload();
        saveToBD(requestedOrder.getId(), message.getPayload().getMessage());
    }
     
    private void saveToBD(int orderId, String errorMessage) {
        String query = "insert into errors(orderid, message) values (?,?)";
        jdbcTemplate.update(query, orderId, errorMessage);
    }
}

Хорошо, теперь все готово. Давайте реализуем новый тест:

TestOkAsyncErrorHandlingTest.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-async-config.xml",
    "/xpadro/spring/integration/config/db-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestOkAsyncErrorHandling {
    @Autowired
    private JdbcTemplate jdbcTemplate;
     
    @Autowired
    private OrderService service;
     
    @Before
    public void prepareTest() {
        jdbcTemplate.update("delete from errors");
    }
     
    @Test
    public void testCorrectOrder() throws InterruptedException, ExecutionException {
        Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(7, "another correct order"));
        OrderConfirmation orderConfirmation = confirmation.get();
        Assert.assertNotNull(orderConfirmation);
        Assert.assertEquals("confirmed", orderConfirmation.getId());
    }
     
    @Test
    public void testAsyncErrorHandling() throws InterruptedException, ExecutionException {
        Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));
        Thread.sleep(2000);
        Assert.assertEquals(1, getSavedErrors());
        validateSavedError(6);
    }
     
    private int getSavedErrors() {
        return jdbcTemplate.queryForObject("select count(*) from errors", Integer.class);
    }
     
    private void validateSavedError(int orderId) {
        String query = "select * from errors where orderid=?";
        Map<String, Object> result = jdbcTemplate.queryForMap(query, orderId);
        Assert.assertEquals(6, result.get("orderid"));
        assertThat((String)result.get("message"), containsString("Order ID is invalid"));
    }
}

На этот раз тест пройден успешно, сообщение об ошибке было сохранено в базе данных.

5.2 Другие механизмы

Пользовательский канал ошибок : Вы можете определить свой канал ошибок и определить его как канал очереди вместо стандартного канала публикации-подписки:

1
2
3
4
5
<int:poller id="defaultPoller" default="true" fixed-delay="5000" />
 
<int:channel id="errorChannel">
    <int:queue capacity="10"/>
</int:channel>

ErrorMessageExceptionTypeRouter : этот специализированный маршрутизатор Spring Integration разрешит канал, на который будет отправлено сообщение об ошибке. Решение основывается на наиболее конкретной причине ошибки:

1
2
3
4
<int:exception-type-router input-channel="errorChannel" default-output-channel="genericErrorChannel">
    <int:mapping exception-type="xpadro.spring.integration.exception.InvalidOrderException" channel="invalidChannel" />
    <int:mapping exception-type="xpadro.spring.integration.exception.FooException" channel="fooChannel" />
</int:exception-type-router>

6. Заключение

Мы узнали, каковы различные механизмы обработки ошибок при использовании Spring Integration. С помощью этой базы вы сможете расширить ее и настроить обработку ошибок, внедряя преобразователи для извлечения информации из сообщения об ошибке, используя обогатители заголовков для настройки канала ошибок или внедряя собственный маршрутизатор, помимо прочего.