
В этой статье я покажу простой поток Mule ESB, чтобы увидеть функцию DLQ Active MQ в действии.
Я предполагаю, что у вас есть работающий экземпляр Apache ActiveMQ (если вы не можете скачать версию здесь ). В этом примере я использую Mule ESB 3.4.2 и ActiveMQ 5.9.0. Мы можем создать простой проект Mule на основе следующего файла pom:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <modelVersion>4.0.0</modelVersion> <groupId>net.pascalalma.demo</groupId> <artifactId>activemq-test-flow</artifactId> <packaging>mule</packaging> <name>${project.artifactId}</name> <version>1.0.0-SNAPSHOT</version> <properties> <mule.version>3.4.2</mule.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <jdk.version>1.7</jdk.version> <junit.version>4.9</junit.version> <activemq.version>5.9.0</activemq.version> </properties> <dependencies> <!-- Mule Dependencies --> <dependency> <groupId>org.mule</groupId> <artifactId>mule-core</artifactId> <version>${mule.version}</version> </dependency> <!-- Mule Transports --> <dependency> <groupId>org.mule.transports</groupId> <artifactId>mule-transport-jms</artifactId> <version>${mule.version}</version> </dependency> <dependency> <groupId>org.mule.transports</groupId> <artifactId>mule-transport-vm</artifactId> <version>${mule.version}</version> </dependency> <!-- Mule Modules --> <dependency> <groupId>org.mule.modules</groupId> <artifactId>mule-module-client</artifactId> <version>${mule.version}</version> </dependency> <dependency> <groupId>org.mule.modules</groupId> <artifactId>mule-module-scripting</artifactId> <version>${mule.version}</version> </dependency> <!-- for testing --> <dependency> <groupId>org.mule.tests</groupId> <artifactId>mule-tests-functional</artifactId> <version>${mule.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>${activemq.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>2.5</version> <configuration> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> <plugin> <groupId>org.mule.tools</groupId> <artifactId>maven-mule-plugin</artifactId> <version>1.9</version> <extensions>true</extensions> <configuration> <copyToAppsDirectory>false</copyToAppsDirectory> </configuration> </plugin> </plugins> </build></project> |
Здесь не так много особенного. Помимо необходимых зависимостей, я добавил maven-mule-plugin, чтобы я мог создать тип упаковки ‘mule’ и запустить Mule из моей IDE.
С этим помпоном Maven мы можем создать следующие две конфигурации Mule. Один для потока Мула, чтобы проверить нашу транзакцию:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
<?xml version="1.0" encoding="UTF-8"?> version="EE-3.4.1" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd"> <flow name="MainFlow"> <inbound-endpoint ref="event-queue" /> <logger category="net.pascalalma.demo.MainFlow" level="INFO" message="Received message from activeMQ" /> <scripting:component> <scripting:script engine="Groovy"> throw new Exception('Soap Fault Response detected') </scripting:script> </scripting:component> <outbound-endpoint ref="result-queue" /> </flow></mule> |
В этом потоке мы получаем сообщение от входящей конечной точки, регистрируем сообщение и генерируем исключение, прежде чем сообщение будет помещено в следующую очередь. Как мы видим, я не добавил обработчик исключений. Конфигурация конечных точек и разъемов выглядит следующим образом:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
<?xml version="1.0" encoding="UTF-8"?> version="EE-3.4.1" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd"> <spring:bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <spring:property name="maximumRedeliveries" value="5"/> <spring:property name="initialRedeliveryDelay" value="500"/> <spring:property name="maximumRedeliveryDelay" value="10000"/> <spring:property name="useExponentialBackOff" value="false"/> <spring:property name="backOffMultiplier" value="3"/> </spring:bean> <!-- ActiveMQ Connection factory --> <spring:bean id="amqFactory" class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true"> <spring:property name="brokerURL" value="tcp://localhost:61616" /> <spring:property name="redeliveryPolicy" ref="redeliveryPolicy" /> </spring:bean> <jms:activemq-connector name="activeMqConnector" connectionFactory-ref="amqFactory" persistentDelivery="true" numberOfConcurrentTransactedReceivers="2" specification="1.1" /> <jms:endpoint name="event-queue" connector-ref="activeMqConnector" queue="event-queue" > <jms:transaction action="ALWAYS_BEGIN" /> </jms:endpoint> <jms:endpoint name="result-queue" connector-ref="activeMqConnector" queue="result-queue" > <jms:transaction action="ALWAYS_JOIN" /> </jms:endpoint></mule> |
Я определил bean-компонент Spring для фабрики соединений ActiveMQ и один для политики повторной доставки этой фабрики. С помощью этой политики повторной доставки мы можем настроить частоту повторения попыток Mule для обработки сообщения из очереди, если исходная попытка не удалась. Приятной особенностью политики повторной доставки является комбинация «backOffMultiplier» и «useExponentialBackOff». С помощью этих опций вы можете увеличить период между двумя попытками повторной доставки до экспоненциального достижения MaximumRedeliveryDelay. В этом случае Мул будет ожидать «MaximumRedeliveryDelay» для следующей попытки.
Таким образом, с этими конфигурациями мы можем создать тестовый класс Mule и запустить его. Тестовый класс будет выглядеть примерно так:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package net.pascalalma.demo;import org.junit.Test;import org.mule.DefaultMuleMessage;import org.mule.api.MuleMessage;import org.mule.module.client.MuleClient;import org.mule.tck.junit4.FunctionalTestCase;public class TransactionFlowTest extends FunctionalTestCase { @Override protected String getConfigResources() { return "app/test-flow.xml, app/test-endpoints.xml"; } @Test public void testError() throws Exception { MuleClient client = new MuleClient(muleContext); MuleMessage inMsg = new DefaultMuleMessage("<txt>Some message</txt>", muleContext); client.dispatch("event-queue", inMsg); // Give Mule the chance to redeliver the message Thread.sleep(4000); }} |
Если мы запустим этот тест, вы увидите сообщения в журнале, такие как:
|
1
2
3
|
Exception stack is:1. "Message with id "ID:Pascals-MacBook-Pro-2.local-59158-1406440948059-1:1:3:1:1" has been redelivered 3 times on endpoint "jms://event-queue", which exceeds the maxRedelivery setting of 0 on the connector "activeMqConnector". Message payload is of type: ActiveMQTextMessage (org.mule.transport.jms.redelivery.MessageRedeliveredException) org.mule.transport.jms.redelivery.JmsXRedeliveryHandler:87 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/transport/jms/redelivery/MessageRedeliveredException.html) |
Если теперь мы переключимся на консоль ActiveMQ, доступную по адресу http: // localhost: 8161 для локальной установки по умолчанию, мы увидим следующие очереди:

Как и ожидалось, мы видим создание двух очередей: очередь событий, которая пуста, и ActiveMQ.DLQ по умолчанию, который содержит наше сообщение:
Как вы можете себе представить, было бы удобно иметь определенный DLQ для каждой очереди вместо одного DLQ, который будет содержать все виды недоставленных сообщений. К счастью, это легко настроить в ActiveMQ. Просто поместите следующее в файл ‘activemq.xml’, который можно найти в папке ‘$ ACTIVEMQ_HOME / conf’.
|
1
2
3
4
5
6
|
<!-- Set the following policy on all queues using the '>' wildcard --><policyEntry queue=">"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" /> </deadLetterStrategy></policyEntry> |
Если мы теперь перезапустим ActiveMQ, удалим существующие очереди и повторно запустим наш тест, мы увидим следующий результат:

Таким образом, с этой настройкой каждая очередь имеет свой собственный DLQ. Для получения дополнительной информации об этих настройках ActieMQ см. Здесь . С потоком Mule, созданным в этом посте, легко протестировать и поиграть с этими настройками
| Ссылка: | Mule ESB, ActiveMQ и DLQ от нашего партнера JCG Паскаля Альмы в блоге Pragmatic Integrator . |
