В этой статье я покажу простой поток Mule ESB, чтобы увидеть функцию DLQ Active MQ в действии.
Я предполагаю, что у вас есть работающий экземпляр Apache ActiveMQ (если вы не можете скачать версию здесь ). В этом примере я использую Mule ESB 3.4.2 и ActiveMQ 5.9.0. Мы можем создать простой проект Mule на основе следующего файла pom:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
<? xml version = "1.0" encoding = "UTF-8" ?> < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" < modelVersion >4.0.0</ modelVersion > < groupId >net.pascalalma.demo</ groupId > < artifactId >activemq-test-flow</ artifactId > < packaging >mule</ packaging > < name >${project.artifactId}</ name > < version >1.0.0-SNAPSHOT</ version > < properties > < mule.version >3.4.2</ mule.version > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > < project.reporting.outputEncoding >UTF-8</ project.reporting.outputEncoding > < jdk.version >1.7</ jdk.version > < junit.version >4.9</ junit.version > < activemq.version >5.9.0</ activemq.version > </ properties > < dependencies > <!-- Mule Dependencies --> < dependency > < groupId >org.mule</ groupId > < artifactId >mule-core</ artifactId > < version >${mule.version}</ version > </ dependency > <!-- Mule Transports --> < dependency > < groupId >org.mule.transports</ groupId > < artifactId >mule-transport-jms</ artifactId > < version >${mule.version}</ version > </ dependency > < dependency > < groupId >org.mule.transports</ groupId > < artifactId >mule-transport-vm</ artifactId > < version >${mule.version}</ version > </ dependency > <!-- Mule Modules --> < dependency > < groupId >org.mule.modules</ groupId > < artifactId >mule-module-client</ artifactId > < version >${mule.version}</ version > </ dependency > < dependency > < groupId >org.mule.modules</ groupId > < artifactId >mule-module-scripting</ artifactId > < version >${mule.version}</ version > </ dependency > <!-- for testing --> < dependency > < groupId >org.mule.tests</ groupId > < artifactId >mule-tests-functional</ artifactId > < version >${mule.version}</ version > </ dependency > < dependency > < groupId >junit</ groupId > < artifactId >junit</ artifactId > < version >${junit.version}</ version > </ dependency > < dependency > < groupId >org.apache.activemq</ groupId > < artifactId >activemq-client</ artifactId > < version >${activemq.version}</ version > </ dependency > </ dependencies > < build > < plugins > < plugin > < groupId >org.apache.maven.plugins</ groupId > < artifactId >maven-compiler-plugin</ artifactId > < version >2.3.2</ version > < configuration > < source >${jdk.version}</ source > < target >${jdk.version}</ target > < encoding >${project.build.sourceEncoding}</ encoding > </ configuration > </ plugin > < plugin > < groupId >org.apache.maven.plugins</ groupId > < artifactId >maven-resources-plugin</ artifactId > < version >2.5</ version > < configuration > < encoding >${project.build.sourceEncoding}</ encoding > </ configuration > </ plugin > < plugin > < groupId >org.mule.tools</ groupId > < artifactId >maven-mule-plugin</ artifactId > < version >1.9</ version > < extensions >true</ extensions > < configuration > < copyToAppsDirectory >false</ copyToAppsDirectory > </ configuration > </ plugin > </ plugins > </ build > </ project > |
Здесь не так много особенного. Помимо необходимых зависимостей, я добавил maven-mule-plugin, чтобы я мог создать тип упаковки ‘mule’ и запустить Mule из моей IDE.
С этим помпоном Maven мы можем создать следующие две конфигурации Mule. Один для потока Мула, чтобы проверить нашу транзакцию:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
<? xml version = "1.0" encoding = "UTF-8" ?> version = "EE-3.4.1" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd"> < flow name = "MainFlow" > < inbound-endpoint ref = "event-queue" /> < logger category = "net.pascalalma.demo.MainFlow" level = "INFO" message = "Received message from activeMQ" /> < scripting:component > < scripting:script engine = "Groovy" > throw new Exception('Soap Fault Response detected') </ scripting:script > </ scripting:component > < outbound-endpoint ref = "result-queue" /> </ flow > </ mule > |
В этом потоке мы получаем сообщение от входящей конечной точки, регистрируем сообщение и генерируем исключение, прежде чем сообщение будет помещено в следующую очередь. Как мы видим, я не добавил обработчик исключений. Конфигурация конечных точек и разъемов выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
<? xml version = "1.0" encoding = "UTF-8" ?> version = "EE-3.4.1" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd"> < spring:bean id = "redeliveryPolicy" class = "org.apache.activemq.RedeliveryPolicy" > < spring:property name = "maximumRedeliveries" value = "5" /> < spring:property name = "initialRedeliveryDelay" value = "500" /> < spring:property name = "maximumRedeliveryDelay" value = "10000" /> < spring:property name = "useExponentialBackOff" value = "false" /> < spring:property name = "backOffMultiplier" value = "3" /> </ spring:bean > <!-- ActiveMQ Connection factory --> < spring:bean id = "amqFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" lazy-init = "true" > < spring:property name = "brokerURL" value = "tcp://localhost:61616" /> < spring:property name = "redeliveryPolicy" ref = "redeliveryPolicy" /> </ spring:bean > < jms:activemq-connector name = "activeMqConnector" connectionFactory-ref = "amqFactory" persistentDelivery = "true" numberOfConcurrentTransactedReceivers = "2" specification = "1.1" /> < jms:endpoint name = "event-queue" connector-ref = "activeMqConnector" queue = "event-queue" > < jms:transaction action = "ALWAYS_BEGIN" /> </ jms:endpoint > < jms:endpoint name = "result-queue" connector-ref = "activeMqConnector" queue = "result-queue" > < jms:transaction action = "ALWAYS_JOIN" /> </ jms:endpoint > </ mule > |
Я определил bean-компонент Spring для фабрики соединений ActiveMQ и один для политики повторной доставки этой фабрики. С помощью этой политики повторной доставки мы можем настроить частоту повторения попыток Mule для обработки сообщения из очереди, если исходная попытка не удалась. Приятной особенностью политики повторной доставки является комбинация «backOffMultiplier» и «useExponentialBackOff». С помощью этих опций вы можете увеличить период между двумя попытками повторной доставки до экспоненциального достижения MaximumRedeliveryDelay. В этом случае Мул будет ожидать «MaximumRedeliveryDelay» для следующей попытки.
Таким образом, с этими конфигурациями мы можем создать тестовый класс Mule и запустить его. Тестовый класс будет выглядеть примерно так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package net.pascalalma.demo; import org.junit.Test; import org.mule.DefaultMuleMessage; import org.mule.api.MuleMessage; import org.mule.module.client.MuleClient; import org.mule.tck.junit4.FunctionalTestCase; public class TransactionFlowTest extends FunctionalTestCase { @Override protected String getConfigResources() { return "app/test-flow.xml, app/test-endpoints.xml" ; } @Test public void testError() throws Exception { MuleClient client = new MuleClient(muleContext); MuleMessage inMsg = new DefaultMuleMessage( "<txt>Some message</txt>" , muleContext); client.dispatch( "event-queue" , inMsg); // Give Mule the chance to redeliver the message Thread.sleep( 4000 ); } } |
Если мы запустим этот тест, вы увидите сообщения в журнале, такие как:
1
2
3
|
Exception stack is: 1. "Message with id " ID:Pascals-MacBook-Pro-2. local -59158-1406440948059-1:1:3:1:1 " has been redelivered 3 times on endpoint " jms: //event-queue ", which exceeds the maxRedelivery setting of 0 on the connector " activeMqConnector". Message payload is of type : ActiveMQTextMessage (org.mule.transport.jms.redelivery.MessageRedeliveredException) org.mule.transport.jms.redelivery.JmsXRedeliveryHandler:87 (http: //www .mulesoft.org /docs/site/current3/apidocs/org/mule/transport/jms/redelivery/MessageRedeliveredException .html) |
Если теперь мы переключимся на консоль ActiveMQ, доступную по адресу http: // localhost: 8161 для локальной установки по умолчанию, мы увидим следующие очереди:
Как и ожидалось, мы видим создание двух очередей: очередь событий, которая пуста, и ActiveMQ.DLQ по умолчанию, который содержит наше сообщение:
Как вы можете себе представить, было бы удобно иметь определенный DLQ для каждой очереди вместо одного DLQ, который будет содержать все виды недоставленных сообщений. К счастью, это легко настроить в ActiveMQ. Просто поместите следующее в файл ‘activemq.xml’, который можно найти в папке ‘$ ACTIVEMQ_HOME / conf’.
1
2
3
4
5
6
|
<!-- Set the following policy on all queues using the '>' wildcard --> < policyEntry queue=">"> < deadLetterStrategy > < individualDeadLetterStrategy queuePrefix = "DLQ." useQueueForQueueMessages = "true" /> </ deadLetterStrategy > </ policyEntry > |
Если мы теперь перезапустим ActiveMQ, удалим существующие очереди и повторно запустим наш тест, мы увидим следующий результат:
Таким образом, с этой настройкой каждая очередь имеет свой собственный DLQ. Для получения дополнительной информации об этих настройках ActieMQ см. Здесь . С потоком Mule, созданным в этом посте, легко протестировать и поиграть с этими настройками
Ссылка: | Mule ESB, ActiveMQ и DLQ от нашего партнера JCG Паскаля Альмы в блоге Pragmatic Integrator . |