Статьи

Spring JMS: обработка сообщений в транзакциях

1. Введение

Этот пост покажет вам, как ошибка при выполнении потребителем при асинхронном приеме сообщений с JMS может привести к потере сообщений. Затем я объясню, как вы можете решить эту проблему с помощью локальных транзакций.

Вы также увидите, что это решение может вызвать в некоторых случаях дублирование сообщений (например, когда оно сохраняет сообщение в базе данных, а затем выполнение слушателя завершается неудачно). Причина, по которой это происходит, заключается в том, что JMS-транзакция независима от других транзакционных ресурсов, таких как БД. Если ваша обработка не идемпотентна или если ваше приложение не поддерживает обнаружение повторяющихся сообщений, вам придется использовать распределенные транзакции.

Распределенные транзакции выходят за рамки этого поста. Если вы заинтересованы в обработке распределенных транзакций, вы можете прочитать эту интересную статью.

Я реализовал тестовое приложение, которое воспроизводит следующие случаи:

  1. Отправка и прием сообщения. Потребитель обработает полученное сообщение и сохранит его в базе данных.

    Производитель отправляет сообщение в очередь:

    Send1

    Потребитель получает сообщение из очереди и обрабатывает его:

    Send2

  2. Произошла ошибка перед обработкой сообщения: потребитель получает сообщение, но выполнение не удается, прежде чем сохранить его в БД.

    send3

  3. Произошла ошибка после обработки сообщения: потребитель извлекает сообщение, сохраняет его в БД, а затем происходит сбой выполнения.

    send4

  • Исходный код этого приложения можно найти на github .

2. Тестовое приложение

Тестовое приложение выполняет два тестовых класса, TestNotTransactedMessaging и TestTransactedMessaging . Оба эти класса будут выполнять три описанных выше случая.

Давайте посмотрим конфигурацию приложения, когда оно выполняется без транзакций.

приложение-config.xml

Конфигурация приложения. В основном он проверяет в указанных пакетах автоопределение компонентов приложения: производителя и потребителя. Он также настраивает базу данных в памяти, где будут храниться обработанные уведомления.

1
2
3
4
5
6
7
8
9
<context:component-scan base-package="xpadro.spring.jms.producer, xpadro.spring.jms.receiver"/>
 
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
    <constructor-arg ref="dataSource"/>
</bean>
 
<jdbc:embedded-database id="dataSource">
    <jdbc:script location="classpath:db/schema.sql" />
</jdbc:embedded-database>

notx-JMS-config.xml

Настраивает инфраструктуру JMS, которая:

  • Брокерская связь
  • Шаблон JmsTemplate
  • Очередь, куда будут отправляться уведомления
  • Контейнер слушателя, который будет отправлять уведомления слушателю для их обработки
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<!-- Infrastructure -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://embedded?broker.persistent=false"/>
</bean>
 
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="connectionFactory"/>
</bean>
 
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachingConnectionFactory"/>
    <property name="defaultDestination" ref="incomingQueue"/>
</bean>
 
<!-- Destinations -->
<bean id="incomingQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="incoming.queue"/>
</bean>
     
<!-- Listeners -->
<jms:listener-container connection-factory="connectionFactory">
    <jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>

Производитель просто использует jmsTemplate для отправки уведомлений.

01
02
03
04
05
06
07
08
09
10
11
12
@Component("producer")
public class Producer {
    private static Logger logger = LoggerFactory.getLogger(Producer.class);
     
    @Autowired
    private JmsTemplate jmsTemplate;
     
    public void convertAndSendMessage(String destination, Notification notification) {
        jmsTemplate.convertAndSend(destination, notification);
        logger.info("Sending notification | Id: "+notification.getId());
    }
}

Слушатель отвечает за извлечение уведомлений из очереди и сохраняет их в базе данных.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component("notificationProcessor")
public class NotificationProcessor implements MessageListener {
    private static Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);
     
    @Autowired
    private JdbcTemplate jdbcTemplate;
     
    @Override
    public void onMessage(Message message) {
        try {
            Notification notification = (Notification) ((ObjectMessage) message).getObject();
            logger.info("Received notification | Id: "+notification.getId()+" | Redelivery: "+getDeliveryNumber(message));
             
            checkPreprocessException(notification);
            saveToBD(notification);
            checkPostprocessException(message, notification);
        } catch (JMSException e) {
            throw JmsUtils.convertJmsAccessException(e);
        }
    }  
    ...
}

Метод checkPreprocessException будет генерировать исключение во время выполнения при получении уведомления с id = 1. Таким образом, мы будем вызывать ошибку перед сохранением сообщения в БД.

Метод checkPostprocessException выдает исключение, если приходит уведомление с id = 2, что приводит к ошибке сразу после сохранения в БД.

Метод getDeliveryNumber возвращает количество раз, когда сообщение было отправлено. Это применимо только к транзакциям, поскольку посредник попытается повторно отправить сообщение после сбоя обработки прослушивателя, приведшего к откату.

Наконец, метод saveToDB довольно очевиден. Хранит уведомление в БД.

Вы всегда можете проверить исходный код этого приложения, нажав на ссылку в начале этой статьи.

3. Тестирование приема сообщений без транзакций

Я запустил два тестовых класса, один без транзакций, а другой внутри локальной транзакции. Оба класса расширяют базовый класс, который загружает общий контекст приложения и содержит несколько служебных методов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@ContextConfiguration(locations = {"/xpadro/spring/jms/config/app-config.xml"})
@DirtiesContext
public class TestBaseMessaging {
    protected static final String QUEUE_INCOMING = "incoming.queue";
    protected static final String QUEUE_DLQ = "ActiveMQ.DLQ";
     
    @Autowired
    protected JdbcTemplate jdbcTemplate;
     
    @Autowired
    protected JmsTemplate jmsTemplate;
     
    @Autowired
    protected Producer producer;
     
    @Before
    public void prepareTest() {
        jdbcTemplate.update("delete from Notifications");
    }
     
    protected int getSavedNotifications() {
        return jdbcTemplate.queryForObject("select count(*) from Notifications", Integer.class);
    }
     
    protected int getMessagesInQueue(String queueName) {
        return jmsTemplate.browse(queueName, new BrowserCallback<Integer>() {
            @Override
            public Integer doInJms(Session session, QueueBrowser browser) throws JMSException {
                Enumeration<?> messages = browser.getEnumeration();
                int total = 0;
                while (messages.hasMoreElements()) {
                    messages.nextElement();
                    total++;
                }
                 
                return total;
            }
        });
    }
}

Служебные методы описаны ниже:

  • getSavedNotifications : возвращает количество уведомлений, хранящихся в БД. Я использовал метод queryForObject, потому что он рекомендуется с версии 3.2.2. Метод queryForInt устарел.
  • getMessagesInQueue : позволяет проверить, какие сообщения все еще ожидают в указанной очереди. Для этого теста нам интересно знать, сколько уведомлений еще ожидает обработки.

Теперь позвольте мне показать вам код для первого теста ( TestNotTransactedMessaging ). Этот тест запускает 3 случая, указанных в начале статьи.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Test
public void testCorrectMessage() throws InterruptedException {
    Notification notification = new Notification(0, "notification to deliver correctly");
    producer.convertAndSendMessage(QUEUE_INCOMING, notification);
     
    Thread.sleep(6000);
    printResults();
     
    assertEquals(1, getSavedNotifications());
    assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}
 
@Test
public void testFailedAfterReceiveMessage() throws InterruptedException {
    Notification notification = new Notification(1, "notification to fail after receiving");
    producer.convertAndSendMessage(QUEUE_INCOMING, notification);
     
    Thread.sleep(6000);
    printResults();
     
    assertEquals(0, getSavedNotifications());
    assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}
 
@Test
public void testFailedAfterProcessingMessage() throws InterruptedException {
    Notification notification = new Notification(2, "notification to fail after processing");
    producer.convertAndSendMessage(QUEUE_INCOMING, notification);
     
    Thread.sleep(6000);
    printResults();
     
    assertEquals(1, getSavedNotifications());
    assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}
 
private void printResults() {
    logger.info("Total items in \"incoming\" queue: "+getMessagesInQueue(QUEUE_INCOMING));
    logger.info("Total items in DB: "+getSavedNotifications());
}

4. Выполнение теста

Хорошо, давайте выполним тест и посмотрим, каковы результаты:

Вывод testCorrectMessage:

1
2
3
4
Producer|Sending notification | Id: 0
NotificationProcessor|Received notification | Id: 0 | Redelivery: 1
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 1

Здесь нет проблем, очередь пуста, так как сообщение было правильно получено и сохранено в базе данных.

Вывод testFailedAfterReceiveMessage:

1
2
3
4
5
6
Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 0

Поскольку он выполняется вне транзакции, используется режим подтверждения (по умолчанию автоматический). Это означает, что сообщение считается успешно доставленным после вызова метода onMessage и, следовательно, удаленного из очереди. Поскольку прослушиватель не смог сохранить сообщение в БД, мы потеряли сообщение !!

Вывод testFailedAfterProcessingMessage:

1
2
3
4
5
6
2013-08-22 18:39:09,906|Producer|Sending notification | Id: 2
2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in "incoming" queue: 0
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1

В этом случае сообщение было удалено из очереди (AUTO_ACKNOWLEDGE) и сохранено в БД до сбоя выполнения.

5. Добавление локальных транзакций

Обычно мы не можем допустить потерю сообщений, как во втором случае теста, поэтому мы будем вызывать прослушиватель в локальной транзакции. Изменение довольно простое и не подразумевает изменение одной строки кода из нашего приложения. Нам нужно только изменить файл конфигурации.

Чтобы проверить 3 случая с транзакциями, я заменю файл конфигурации notx-jms-config.xml следующим:

ТХ-JMS-config.xml

Во-первых, я добавил количество повторных доставок, сделанных в случае отката (вызванного ошибкой при выполнении слушателя):

1
2
3
4
5
6
7
8
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://embedded?broker.persistent=false"/>
    <property name="redeliveryPolicy">
        <bean class="org.apache.activemq.RedeliveryPolicy">
            <property name="maximumRedeliveries" value="4"/>
        </bean>
    </property>
</bean>

Далее я указываю, что слушатель будет выполнен в рамках транзакции. Это можно сделать, изменив определение контейнера слушателя:

1
2
3
<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted">
    <jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>

Это приведет к тому, что каждый вызов слушателя будет выполнен в локальной транзакции JMS. Транзакция начнется после получения сообщения. Если выполнение слушателя завершится неудачно, прием сообщения будет отменен.

И это все, что мы должны изменить. Давайте запустим тесты с этой конфигурацией.

6. Тестирование приема сообщений в рамках транзакций

Код из класса TestTransactedMessaging практически такой же, как и в предыдущем тесте. Единственное отличие состоит в том, что он добавляет запрос в DLQ (очередь недоставленных сообщений). При выполнении в транзакциях, если прием сообщения откатывается, брокер отправляет сообщение в эту очередь (после неудачной повторной доставки).

Я пропускаю вывод успешного получения, так как он не приносит ничего нового.

Вывод testFailedAfterReceiveMessage:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 2
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
...
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 5
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 1
TestTransactedMessaging|Total items in DB: 0

Как видите, первое получение не удалось, и посредник попытался отправить его еще четыре раза (как указано в свойстве MaximumRedeliveries). Поскольку ситуация сохраняется, сообщение было отправлено в специальную очередь DLQ. Таким образом, мы не теряем сообщение.

Вывод testFailedAfterProcessingMessage:

1
2
3
4
5
6
7
8
Producer|Sending notification | Id: 2
NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
NotificationProcessor|Received notification | Id: 2 | Redelivery: 2
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 0
TestTransactedMessaging|Total items in DB: 2

В этом случае вот что произошло:

  1. Слушатель извлек сообщение
  2. Хранит сообщение в БД
  3. Не удалось выполнить слушатель
  4. Брокер повторно отправляет сообщение. Так как ситуация была решена, слушатель сохраняет сообщение в БД (снова). Сообщение было продублировано.

7.Conclusion

Добавление локальных транзакций к получению сообщений позволяет избежать потери сообщений. Мы должны учитывать, что могут возникать повторяющиеся сообщения, поэтому наш слушатель должен будет обнаружить его, или наша обработка должна быть идемпотентной, чтобы обрабатывать его снова без каких-либо проблем. Если это невозможно, нам придется перейти на распределенные транзакции, так как они поддерживают транзакции, которые используют разные ресурсы.