Статьи

Мул ESB, ActiveMQ и DLQ

апач-ActiveMQ-логотип
В этой статье я покажу простой поток Mule ESB, чтобы увидеть функцию DLQ Active MQ в действии.
Я предполагаю, что у вас есть работающий экземпляр Apache ActiveMQ (если вы не можете скачать версию здесь ). В этом примере я использую Mule ESB 3.4.2 и ActiveMQ 5.9.0. Мы можем создать простой проект Mule на основе следующего файла pom:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
<?xml version="1.0" encoding="UTF-8"?>
 
    <modelVersion>4.0.0</modelVersion>
    <groupId>net.pascalalma.demo</groupId>
    <artifactId>activemq-test-flow</artifactId>
    <packaging>mule</packaging>
    <name>${project.artifactId}</name>
    <version>1.0.0-SNAPSHOT</version>
    <properties>
        <mule.version>3.4.2</mule.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <jdk.version>1.7</jdk.version>
        <junit.version>4.9</junit.version>
        <activemq.version>5.9.0</activemq.version>
    </properties>
    <dependencies>
        <!-- Mule Dependencies -->
        <dependency>
            <groupId>org.mule</groupId>
            <artifactId>mule-core</artifactId>
            <version>${mule.version}</version>
        </dependency>
        <!-- Mule Transports -->
        <dependency>
            <groupId>org.mule.transports</groupId>
            <artifactId>mule-transport-jms</artifactId>
            <version>${mule.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mule.transports</groupId>
            <artifactId>mule-transport-vm</artifactId>
            <version>${mule.version}</version>
        </dependency>
        <!-- Mule Modules -->
        <dependency>
            <groupId>org.mule.modules</groupId>
            <artifactId>mule-module-client</artifactId>
            <version>${mule.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mule.modules</groupId>
            <artifactId>mule-module-scripting</artifactId>
            <version>${mule.version}</version>
        </dependency>
        <!-- for testing -->
        <dependency>
            <groupId>org.mule.tests</groupId>
            <artifactId>mule-tests-functional</artifactId>
            <version>${mule.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>${activemq.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>2.5</version>
                <configuration>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.mule.tools</groupId>
                <artifactId>maven-mule-plugin</artifactId>
                <version>1.9</version>
                <extensions>true</extensions>
                <configuration>
                    <copyToAppsDirectory>false</copyToAppsDirectory>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Здесь не так много особенного. Помимо необходимых зависимостей, я добавил maven-mule-plugin, чтобы я мог создать тип упаковки ‘mule’ и запустить Mule из моей IDE.
С этим помпоном Maven мы можем создать следующие две конфигурации Mule. Один для потока Мула, чтобы проверить нашу транзакцию:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>
      xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
      version="EE-3.4.1"
      xsi:schemaLocation="
    <flow name="MainFlow">
        <inbound-endpoint ref="event-queue" />
        <logger category="net.pascalalma.demo.MainFlow" level="INFO" message="Received message from activeMQ" />
        <scripting:component>
            <scripting:script engine="Groovy">
                throw new Exception('Soap Fault Response detected')
            </scripting:script>
        </scripting:component>
        <outbound-endpoint ref="result-queue" />
    </flow>
</mule>

В этом потоке мы получаем сообщение от входящей конечной точки, регистрируем сообщение и генерируем исключение, прежде чем сообщение будет помещено в следующую очередь. Как мы видим, я не добавил обработчик исключений. Конфигурация конечных точек и разъемов выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?xml version="1.0" encoding="UTF-8"?>
 
      version="EE-3.4.1"
      xsi:schemaLocation="
 
    <spring:bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <spring:property name="maximumRedeliveries" value="5"/>
        <spring:property name="initialRedeliveryDelay" value="500"/>
        <spring:property name="maximumRedeliveryDelay" value="10000"/>
        <spring:property name="useExponentialBackOff" value="false"/>
        <spring:property name="backOffMultiplier" value="3"/>
    </spring:bean>
    <!-- ActiveMQ Connection factory -->
    <spring:bean id="amqFactory" class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true">
        <spring:property name="brokerURL" value="tcp://localhost:61616" />
        <spring:property name="redeliveryPolicy" ref="redeliveryPolicy" />
    </spring:bean>
    <jms:activemq-connector name="activeMqConnector"
                            connectionFactory-ref="amqFactory"
                            persistentDelivery="true"
                            numberOfConcurrentTransactedReceivers="2"
                            specification="1.1" />
    <jms:endpoint name="event-queue" connector-ref="activeMqConnector" queue="event-queue" >
        <jms:transaction action="ALWAYS_BEGIN" />
    </jms:endpoint>
    <jms:endpoint name="result-queue" connector-ref="activeMqConnector" queue="result-queue" >
        <jms:transaction action="ALWAYS_JOIN" />
    </jms:endpoint>
</mule>

Я определил bean-компонент Spring для фабрики соединений ActiveMQ и один для политики повторной доставки этой фабрики. С помощью этой политики повторной доставки мы можем настроить частоту повторения попыток Mule для обработки сообщения из очереди, если исходная попытка не удалась. Приятной особенностью политики повторной доставки является комбинация «backOffMultiplier» и «useExponentialBackOff». С помощью этих опций вы можете увеличить период между двумя попытками повторной доставки до экспоненциального достижения MaximumRedeliveryDelay. В этом случае Мул будет ожидать «MaximumRedeliveryDelay» для следующей попытки.

Таким образом, с этими конфигурациями мы можем создать тестовый класс Mule и запустить его. Тестовый класс будет выглядеть примерно так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package net.pascalalma.demo;
 
import org.junit.Test;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleMessage;
import org.mule.module.client.MuleClient;
import org.mule.tck.junit4.FunctionalTestCase;
 
public class TransactionFlowTest extends FunctionalTestCase {
 
    @Override
    protected String getConfigResources() {
        return "app/test-flow.xml, app/test-endpoints.xml";
    }
 
    @Test
    public void testError() throws Exception {
        MuleClient client = new MuleClient(muleContext);
        MuleMessage inMsg = new DefaultMuleMessage("<txt>Some message</txt>", muleContext);
        client.dispatch("event-queue", inMsg);
 
        // Give Mule the chance to redeliver the message
        Thread.sleep(4000);
    }
}

Если мы запустим этот тест, вы увидите сообщения в журнале, такие как:

1
2
3
Exception stack is:
1. "Message with id "ID:Pascals-MacBook-Pro-2.local-59158-1406440948059-1:1:3:1:1" has been redelivered 3 times on endpoint "jms://event-queue", which exceeds the maxRedelivery setting of 0 on the connector "activeMqConnector". Message payload is of type: ActiveMQTextMessage (org.mule.transport.jms.redelivery.MessageRedeliveredException)
  org.mule.transport.jms.redelivery.JmsXRedeliveryHandler:87 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/transport/jms/redelivery/MessageRedeliveredException.html)

Если теперь мы переключимся на консоль ActiveMQ, доступную по адресу http: // localhost: 8161 для локальной установки по умолчанию, мы увидим следующие очереди:

Скриншот-2014-07-27-10-08-09
Как и ожидалось, мы видим создание двух очередей: очередь событий, которая пуста, и ActiveMQ.DLQ по умолчанию, который содержит наше сообщение:

Скриншот-2014-07-27-10-13-54

Как вы можете себе представить, было бы удобно иметь определенный DLQ для каждой очереди вместо одного DLQ, который будет содержать все виды недоставленных сообщений. К счастью, это легко настроить в ActiveMQ. Просто поместите следующее в файл ‘activemq.xml’, который можно найти в папке ‘$ ACTIVEMQ_HOME / conf’.

1
2
3
4
5
6
<!-- Set the following policy on all queues using the '>' wildcard -->
<policyEntry queue=">">
  <deadLetterStrategy>
    <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
  </deadLetterStrategy>
</policyEntry>

Если мы теперь перезапустим ActiveMQ, удалим существующие очереди и повторно запустим наш тест, мы увидим следующий результат:

Скриншот-2014-07-27-10-29-22
Таким образом, с этой настройкой каждая очередь имеет свой собственный DLQ. Для получения дополнительной информации об этих настройках ActieMQ см. Здесь . С потоком Mule, созданным в этом посте, легко протестировать и поиграть с этими настройками

Ссылка: Mule ESB, ActiveMQ и DLQ от нашего партнера JCG Паскаля Альмы в блоге Pragmatic Integrator .