В WSO2 мы можем реализовать EIP Splitter and Aggregator, используя посредники Iterate и Aggregate. С помощью шаблона Splitter мы можем разделить сообщение, составленное из различных элементов, которые необходимо обрабатывать индивидуально, а затем мы используем шаблон Aggregator для агрегирования результатов каждого отдельного вызова, а затем выполняем некоторую обработку по агрегированным результатам.
Пример Happy Path
В примере с удачным путем все запросы и обработка, выполняемые внутри итеративного посредника, будут выполняться без сбоев, а агрегатный посредник будет обрабатывать результаты всех выполненных запросов. Мы можем видеть это в прокси ниже:
XML
xxxxxxxxxx
1
2
<proxy name="ProxyIterateAggregateFaultNotWorking" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
3
<target>
4
<inSequence>
5
<!-- This is to force going to fault sequence in case of SOAP Fault -->
6
<!-- Set this to force a fault in case of SOAPFault returned by the backend service -->
7
<payloadFactory media-type="xml">
8
<format>
9
<payload xmlns="">
10
<echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
11
<in>1</in>
12
</echo:echoInt>
13
<echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
14
<in>1</in>
15
</echo:echoInt>
16
</payload>
17
</format>
18
<args></args>
19
</payloadFactory>
20
<!-- 1. Iterate over the echoInt elements -->
21
<iterate expression="//echo:echoInt" xmlns:echo="http://echo.services.core.carbon.wso2.org">
22
<target>
23
<sequence>
24
<property name="FORCE_ERROR_ON_SOAP_FAULT" scope="default" type="STRING" value="true"></property>
25
<header name="Action" scope="default" value="urn:echoInt"></header>
26
<call>
27
<endpoint>
28
<address format="soap11" uri="http://localhost:8280/services/echo">
29
<suspendOnFailure>
30
<initialDuration>-1</initialDuration>
31
<progressionFactor>-1</progressionFactor>
32
<maximumDuration>0</maximumDuration>
33
</suspendOnFailure>
34
<markForSuspension>
35
<retriesBeforeSuspension>0</retriesBeforeSuspension>
36
</markForSuspension>
37
</address>
38
</endpoint>
39
</call>
40
</sequence>
41
</target>
42
</iterate>
43
<property name="result" scope="default">
44
<result xmlns=""></result>
45
</property>
46
<aggregate>
47
<completeCondition>
48
<messageCount max="-1" min="-1"></messageCount>
49
</completeCondition>
50
<onComplete enclosingElementProperty="result" expression="$body/*[1]">
51
<log level="custom">
52
<property name="ON Aggregate SEQ" value="faultSequence default"></property>
53
</log>
54
<respond></respond>
55
</onComplete>
56
</aggregate>
57
</inSequence>
58
<outSequence></outSequence>
59
<faultSequence>
60
<log level="custom">
61
<property name="ON FAULT SEQ" value="faultSequence default"></property>
62
</log>
63
</faultSequence>
64
</target>
65
</proxy>
Этот прокси-сервер в основном создает полезную нагрузку, содержащую несколько запросов echoInt , а затем мы перебираем каждый echoInt, чтобы сделать запрос к службе echo . Внутри итерации мы устанавливаем заголовок Action и делаем запрос к конечной точке, используя посредник вызова. Мы добавили свойство FORCE_ERROR_ON_SOAP_FAULT, чтобы в случае SOAPFault, возвращенного серверной службой, поток был перенаправлен на последовательность ошибок.
В этом примере ошибки нет, поэтому агрегирующий посредник после итерации соберет все ответы, объединит их в одно сообщение и ответит. Мы можем увидеть ответ этой прокси-службы в xml ниже:
XML
xxxxxxxxxx
1
<result>
2
<ns:echoIntResponse xmlns:ns="http://echo.services.core.carbon.wso2.org">
3
<return>1</return>
4
</ns:echoIntResponse>
5
<ns:echoIntResponse xmlns:ns="http://echo.services.core.carbon.wso2.org">
6
<return>1</return>
7
</ns:echoIntResponse>
8
</result>
Вы также можете прочитать:
Итеративная обработка с использованием For Every Scope в Mule
Пример ошибки
В приведенном ниже прокси мы внесли небольшое изменение в payloadFactory , чтобы вызвать ошибку мыла из бэкэнда:
XML
xxxxxxxxxx
1
<payloadFactory media-type="xml">
2
<format>
3
<payload xmlns="">
4
<echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
5
<in>1</in>
6
</echo:echoInt>
7
<echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
8
<in>abc</in>
9
</echo:echoInt>
10
</payload>
11
</format>
12
<args></args>
13
</payloadFactory>
Мы передаем значение ‘abc’ во второй элемент echoInt . В этом примере, когда мы пробуем прокси-сервер, мы не получим никакого ответа, поскольку поток перенаправлен на faultSequence, и мы можем увидеть запись в журнале:
[2020-02-16 21:32:41,526] [EI-Core] INFO - LogMediator ON FAULT SEQ = faultSequence default
Полный код прокси-сервера можно увидеть ниже:
XML
xxxxxxxxxx
1
2
<proxy name="ProxyIterateAggregateFaultNotWorking" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
3
<target>
4
<inSequence>
5
<!-- This is to force going to fault sequence in case of SOAP Fault -->
6
<!-- Set this to force a fault in case of SOAPFault returned by the backend service -->
7
<payloadFactory media-type="xml">
8
<format>
9
<payload xmlns="">
10
<echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
11
<in>1</in>
12
</echo:echoInt>
13
<echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
14
<in>abc</in>
15
</echo:echoInt>
16
</payload>
17
</format>
18
<args></args>
19
</payloadFactory>
20
<!-- 1. Iterate over the echoInt elements -->
21
<iterate expression="//echo:echoInt" xmlns:echo="http://echo.services.core.carbon.wso2.org">
22
<target>
23
<sequence>
24
<property name="FORCE_ERROR_ON_SOAP_FAULT" scope="default" type="STRING" value="true"></property>
25
<header name="Action" scope="default" value="urn:echoInt"></header>
26
<call>
27
<endpoint>
28
<address format="soap11" uri="http://localhost:8280/services/echo">
29
<suspendOnFailure>
30
<initialDuration>-1</initialDuration>
31
<progressionFactor>-1</progressionFactor>
32
<maximumDuration>0</maximumDuration>
33
</suspendOnFailure>
34
<markForSuspension>
35
<retriesBeforeSuspension>0</retriesBeforeSuspension>
36
</markForSuspension>
37
</address>
38
</endpoint>
39
</call>
40
</sequence>
41
</target>
42
</iterate>
43
<property name="result" scope="default">
44
<result xmlns=""></result>
45
</property>
46
<aggregate>
47
<completeCondition>
48
<messageCount max="-1" min="-1"></messageCount>
49
</completeCondition>
50
<onComplete enclosingElementProperty="result" expression="$body/*[1]">
51
<log level="custom">
52
<property name="ON Aggregate SEQ" value="faultSequence default"></property>
53
</log>
54
<respond></respond>
55
</onComplete>
56
</aggregate>
57
</inSequence>
58
<outSequence></outSequence>
59
<faultSequence>
60
<log level="custom">
61
<property name="ON FAULT SEQ" value="faultSequence default"></property>
62
</log>
63
</faultSequence>
64
</target>
65
</proxy>
Обработка ошибок с итерацией / агрегированием
Для того, чтобы агрегат работал при возникновении ошибки, нам нужно внести некоторые изменения в прокси и использовать последовательности. После изменений у нас будут следующие артефакты:
- Прокси Сервис
- Итерация последовательности
- Совокупная последовательность
- IterateFaultHandler последовательность
Код прокси-сервера можно увидеть ниже:
XML
xxxxxxxxxx
1
2
<proxy name="ProxyIterateAggregateWorking" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
3
<target>
4
<inSequence>
5
<!-- This is to force going to fault sequence in case of SOAP Fault -->
6
<!-- Set this to force a fault in case of SOAPFault returned by the backend service -->
7
<payloadFactory media-type="xml">
8
<format>
9
<payload xmlns="">
10
<echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
11
<in>1</in>
12
</echo:echoInt>
13
<echo:echoInt xmlns:echo="http://echo.services.core.carbon.wso2.org">
14
<in>abc</in>
15
</echo:echoInt>
16
</payload>
17
</format>
18
<args></args>
19
</payloadFactory>
20
<!-- 1. Iterate over the echoInt elements -->
21
<iterate expression="//echo:echoInt" xmlns:echo="http://echo.services.core.carbon.wso2.org">
22
<target>
23
<sequence>
24
<sequence key="IterateSequence"></sequence>
25
</sequence>
26
</target>
27
</iterate>
28
<sequence key="AggregateSequence"></sequence>
29
</inSequence>
30
<outSequence></outSequence>
31
<faultSequence>
32
<log level="custom">
33
<property name="ON FAULT SEQ" value="faultSequence default"></property>
34
</log>
35
</faultSequence>
36
</target>
37
</proxy>
Основное отличие от предыдущего прокси состоит в том, что внутри итерации мы используем предопределенную последовательность вместо анонимной последовательности. И у нас есть AggregateSequence сразу после итеративного посредника.
AggregateSequence просто содержит код, который у нас был ранее внутри прокси:
XML
xxxxxxxxxx
1
2
<sequence name="AggregateSequence" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
3
<property name="result" scope="default">
4
<result xmlns=""></result>
5
</property>
6
<aggregate>
7
<completeCondition>
8
<messageCount max="-1" min="-1"></messageCount>
9
</completeCondition>
10
<onComplete enclosingElementProperty="result" expression="$body/*[1]">
11
<log level="custom">
12
<property name="ON Aggregate SEQ" value="faultSequence default"></property>
13
</log>
14
<respond></respond>
15
</onComplete>
16
</aggregate>
17
</sequence>
В этой последовательности нет ничего особенного.
Теперь давайте посмотрим на IterateSequence :
XML
xxxxxxxxxx
1
2
<sequence name="IterateSequence" onError="IterateFaultHandler" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
3
<!-- Set this to force a fault in case of SOAPFault returned by the backend service -->
4
<property name="FORCE_ERROR_ON_SOAP_FAULT" scope="default" type="STRING" value="true"></property>
5
<header name="Action" scope="default" value="urn:echoInt"></header>
6
<call>
7
<endpoint>
8
<address format="soap11" uri="http://localhost:8280/services/echo">
9
<suspendOnFailure>
10
<initialDuration>-1</initialDuration>
11
<progressionFactor>-1</progressionFactor>
12
<maximumDuration>0</maximumDuration>
13
</suspendOnFailure>
14
<markForSuspension>
15
<retriesBeforeSuspension>0</retriesBeforeSuspension>
16
</markForSuspension>
17
</address>
18
</endpoint>
19
</call>
20
</sequence>
Он содержит тот же код, который был у нас в прокси, но сложная часть находится в свойстве onError последовательности, где мы указываем, какая последовательность должна выполняться в случае ошибок в последовательности, в данном случае это IterateFaultHandler . Таким образом, в этом случае, когда он пытается выполнить echoInt со значением abc, он получает SOAPFault и перенаправляет поток в IterateFaultHandler .
Итак, теперь давайте посмотрим IterateFaultHandler :
XML
xxxxxxxxxx
1
2
<sequence name="IterateFaultHandler" onError="IterateFaultHandler" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
3
<log level="custom">
4
<property name="ON ITERATE FAULT SEQ" value="IterateFaultHandler"></property>
5
</log>
6
<payloadFactory media-type="xml">
7
<format>
8
<errorProcessing xmlns="">
9
<error>$1</error>
10
</errorProcessing>
11
</format>
12
<args>
13
<arg evaluator="xml" expression="//faultstring"></arg>
14
</args>
15
</payloadFactory>
16
<property name="RESPONSE" scope="default" type="STRING" value="true"></property>
17
<sequence key="AggregateSequence"></sequence>
18
</sequence>
Внутри этой последовательности ошибок мы создаем полезную нагрузку с помощью строки ошибок . Затем мы устанавливаем для свойства RESPONSE значение true, это указывает механизму синапса, что это сообщение в направлении ответа. И тогда мы вызываем AggregateSequence . Нам нужно указать свойство RESPONSE, потому что агрегирующий посредник ожидает, что поток сообщений будет в направлении ответа.
Итак, теперь, когда мы пробуем прокси-сервер, он получит ошибку для элемента с недопустимым значением, а затем перенаправит на последовательность ошибок, он сгенерирует полезную нагрузку и вызовет агрегат. Ниже мы видим журнал ошибок, показывающий, что он выполнил последовательность ошибок, а затем окончательную полезную нагрузку:
[2020-02-16 21:55:48,155] [EI-Core] INFO - LogMediator ON ITERATE FAULT SEQ = IterateFaultHandler
Полезная нагрузка ответа:
XML
xxxxxxxxxx
1
<result>
2
<ns:echoIntResponse xmlns:ns="http://echo.services.core.carbon.wso2.org">
3
<return>1</return>
4
</ns:echoIntResponse>
5
<errorProcessing>
6
<error>Invalid value "abc" for element in</error>
7
</errorProcessing>
8
</result>
При таком подходе агрегат всегда выполняется, и поток завершается без каких-либо проблем.
Вы можете спросить: «Почему бы не вызвать AggregateSequence внутри Proxy faultSequence?» Когда мы пытаемся использовать AggregateSequence внутри faultSequence прокси, он выдаст ошибку ниже, и этого не произойдет, если мы используем именованную последовательность для обработки ошибок:
Джава
xxxxxxxxxx
1
[2020-02-16 21:59:54,782] [EI-Core] ERROR - SequenceMediator Runtime error occurred while mediating the message
2
java.util.EmptyStackException
3
at java.util.Stack.peek(Stack.java:102)
4
at org.apache.synapse.mediators.eip.aggregator.AggregateMediator.mediate(AggregateMediator.java:302)
5
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109)
6
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
7
at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:158)
8
at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:214)
9
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109)
10
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
11
at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:158)
12
at org.apache.synapse.mediators.MediatorFaultHandler.onFault(MediatorFaultHandler.java:96)
13
at org.apache.synapse.FaultHandler.handleFault(FaultHandler.java:53)
14
at org.apache.synapse.endpoints.AbstractEndpoint.invokeNextFaultHandler(AbstractEndpoint.java:735)
15
at org.apache.synapse.endpoints.AbstractEndpoint.onFault(AbstractEndpoint.java:550)
16
at org.apache.synapse.endpoints.AddressEndpoint.onFault(AddressEndpoint.java:46)
17
at org.apache.synapse.FaultHandler.handleFault(FaultHandler.java:101)
18
at org.apache.synapse.core.axis2.SynapseCallbackReceiver.handleMessage(SynapseCallbackReceiver.java:527)
19
at org.apache.synapse.core.axis2.SynapseCallbackReceiver.receive(SynapseCallbackReceiver.java:195)
20
at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
21
at org.apache.synapse.transport.passthru.ClientWorker.run(ClientWorker.java:265)
22
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
23
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
24
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
25
at java.lang.Thread.run(Thread.java:748)
Надеюсь, это поможет! Увидимся в следующем посте.