Статьи

Spring Integration: Splitter-Aggregator

В Spring Integration одна из форм рассеяния EIP  обеспечивается конструкциями сплиттера и агрегатора  . Семантика для обоих из них довольно проста для понимания, разделитель получает входное сообщение и возвращает список объектов, каждый из которых превращается в сообщения первого класса. Эти сообщения направляются обработчикам сообщений, после чего они агрегируются в виде списка входных сообщений агрегатором. Этот шаблон может успешно использоваться с довольно простой конфигурацией.

Следующий пример работает с тривиальной обработкой сообщений. Он использует простую семантику обработки ошибок » один сбой — все сбой» . Это означает, что любые ошибки, возникающие в одной или обеих службах, приведут к тому, что исключение будет возвращено инициатору.

Работая над проектами, которые требуют гораздо более надежного решения после частичного сбоя, я написал еще один пост, в котором содержатся подробные сведения о стратегии проектирования, поддерживающей это, — Стратегия надежного агрегатора сплиттера <URL>.

Давайте рассмотрим, как работает эта конфигурация контекста и что происходит в ситуациях, когда одна из служб генерирует исключение.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration
       http://www.springframework.org/schema/integration/spring-integration-2.1.xsd">

    <!--##############################################-->
    <!-- Bean specifications.  -->
    <!--##############################################-->
    <import resource="bean-refs.xml"/>

    <!--##############################################-->
    <!-- Gateway specification. -->
    <!--##############################################-->
    <int:gateway service-interface="com.l8mdv.sample.gateway.BrokerRequestGateway"/>
    <int:channel id="broker-request-channel"
                 datatype="com.l8mdv.sa.BrokerRequestMessage"/>

    <!--##############################################-->
    <!-- Request message splitter. -->
    <!--##############################################-->
    <int:splitter input-channel="broker-request-channel"
                  output-channel="broker-router-channel"

                  ref="brokerQuoteRequestSplitter"/>

    <!--##############################################-->
    <!-- Request message routing. -->
    <!--##############################################-->
    <int:channel id="broker-router-channel"
                 datatype="com.l8mdv.sa.BrokerQuoteRequestMessage"/>
    <int:recipient-list-router input-channel="broker-router-channel">
        <int:recipient channel="openex-broker-channel"
                       selector-expression="payload.BrokerName.equals('openex')"/>
        <int:recipient channel="yahoo-broker-channel"
                       selector-expression="payload.BrokerName.equals('yahoo')"/>
    </int:recipient-list-router>

    <!--##############################################-->
    <!-- Request message routing to OpenEx. -->
    <!--##############################################-->
    <int:channel id="openex-broker-channel" datatype="com.l8mdv.sa.BrokerQuoteRequestMessage"/>
    <int:chain input-channel="openex-broker-channel"
               output-channel="aggregator-channel">
        <int:service-activator>
            <bean id="OpenExServiceFaker" class="com.l8mdv.sample.ServiceFaker">
                <constructor-arg name="response" ref="OpenExFakeResponseData"/>
            </bean>
        </int:service-activator>
    </int:chain>

    <!--##############################################-->
    <!-- Request message routing to Yahoo. -->
    <!--##############################################-->
    <int:channel id="yahoo-broker-channel"
                 datatype="com.l8mdv.sa.BrokerQuoteRequestMessage"/>
    <int:chain input-channel="yahoo-broker-channel"
               output-channel="aggregator-channel">
        <int:service-activator>
            <bean id="YahooServiceFaker" class="com.l8mdv.sample.ServiceFaker">
                <constructor-arg name="response" ref="YahooFakeResponseData"/>
            </bean>
        </int:service-activator>
    </int:chain>

    <!--##############################################-->
    <!-- Response message handling, return the best -->
    <!-- quote to the invoker. -->
    <!--##############################################-->
    <int:channel id="aggregator-channel" datatype="com.l8mdv.sa.BrokerQuoteResponseMessage"/>
    <int:chain input-channel="aggregator-channel">
        <int:aggregator/>
        <int:transformer ref="aggregationToBrokerQuoteResponseTransformer"/>
    </int:chain>

</beans>

Что касается сплиттера и агрегатора , интересная конфигурация начинается в строке 25 со спецификацией сплиттера . Конструкция была настроена с атрибутами input-channel , output-channel  и ref . Конфигурация канала для этой конструкции очевидна, другой атрибут позволяет связать ссылку на компонент, которая может выполнять функцию разделения. Обычно верно, что компонент-сплиттер с соответствующими аргументами и возвращаемыми типами будет вызываться, если он однозначен.

Следующая остановка для сообщений — это маршрутизатор, который определен в строке 35, маршрутизатор списка получателей. Эта конечная точка сообщения может пересылать сообщения маршрутизации в соответствующие каналы, используя выражение для вызова в полезной нагрузке сообщения. Этот маршрутизатор проверит полезную нагрузку и направит каждое сообщение одной из двух служб — они расположены в строках 48 и 62. Сервисы, которые вызываются, полностью зависят от того, что возвращается из сплиттера. Любая или обе из этих служб могут быть вызваны для данного входного сообщения сплиттера.

Наконец, результаты одного или двух вызовов службы направляются в агрегатор, конфигурация начинается для этого в цепочке, определенной в строке 74. Наконец, результат агрегирования вводится в преобразователь, где происходит некоторая дальнейшая обработка. Обратите внимание, что в этой цепочке нет выходного канала, здесь используется неявно созданный выходной канал по умолчанию — он возвращается через шлюз к вызывающему шлюзу.

У этого сервиса есть несколько интересных аспектов:

  1. Если разветвитель в строке 25 получает сообщение, но не генерирует список из одного или нескольких ответных сообщений, тогда пустой список приведет к тому, что маршрутизатор не будет вызван. Это можно переопределить с помощью атрибута require-reply, и в этом случае пустой список приведет к возникновению исключения обработки сообщения.
  2. Строгая типизация использовалась в каналах данных, чтобы обеспечить соблюдение строгих правил обработки и облегчить понимание и настройку конфигурации.
  3. Цепь конструкция была использована в попытке сохранить конфигурации прессовки , где полезно. Следует отметить, что определение цепочки и строгая типизация часто являются двумя сторонами одной медали. Группируя агрегатор и трансформатор в цепочку, я не смог контролировать и, следовательно, документировать ввод типа сообщения в конечную точку трансформатора.
  4. Spring bean-компоненты, на которые есть ссылки в этом контексте, были загружены из внешнего файла. Хотя они могли быть отсканированы по компонентам или определены в одном и том же файле, я выбрал их различие, чтобы они не загружались, если бы не было необходимости в работе — обычно я создавал объекты-шпионы и объекты-шпионы вокруг этих конечных точек сообщений.
  5. Любое исключение в вызове Service Activator в строках 48 и 62 приведет к тому, что агрегация не завершится. В этом примере я не создал обработчик ошибок на шлюзе в строке 18, поэтому любые исключения, создаваемые SA, могут привести к тому, что исключение будет сгенерировано для вызова шлюза. Более надежное решение, перед лицом обработки исключений, потребовало бы другого подхода к проектированию.
  6. Я задокументировал, хотя и кратко, намерения для каждого раздела конфигурации, специально для того, чтобы помочь читателям понять намерения моего проекта.

В случае, если они полезны, определения bean-компонента Spring следующие:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="brokerQuoteRequestSplitter"
          class="com.l8mdv.sample.service.BrokerQuoteRequestSplitter"/>

    <bean id="brokerRawResponseTransformer"
          class="com.l8mdv.sample.service.impl.BrokerRawResponseTransformer"/>

    <bean id="aggregationToBrokerQuoteResponseTransformer"
          class="com.l8mdv.sample.service.impl.AggregationToBrokerQuoteResponseTransformer"/>

    <bean id="YahooFakeResponseData" class="com.l8mdv.sa.BrokerResponseMessage">
        <property name="brokerName" value="Yahoo"/>
        <property name="value" value="2"/>
        <property name="brokerRequestType" value="#{T(com.l8mdv.sa.BrokerRequestType).QUOTE}"/>
    </bean>

    <bean id="OpenExFakeResponseData" class="com.l8mdv.sa.BrokerResponseMessage">
        <property name="brokerName" value="OpenEx"/>
        <property name="value" value="5"/>
        <property name="brokerRequestType" value="#{T(com.l8mdv.sa.BrokerRequestType).QUOTE}"/>
    </bean>

</beans>

Код сплиттера:

package com.l8mdv.sample.service;

import com.l8mdv.sa.BrokerQuoteRequestMessage;
import com.l8mdv.sa.BrokerRequest;
import com.l8mdv.sa.BrokerRequestMessage;
import com.l8mdv.sa.QuoteRequestSortPolicy;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.List;

public class BrokerQuoteRequestSplitter {

    public List<BrokerQuoteRequestMessage> split(BrokerRequestMessage message) {

        Assert.notNull(message, "Mandatory argument missing.");
        List requests = new ArrayList();

        for (BrokerRequest brokerRequest:
                message.getBrokerRequest()) {
            BrokerQuoteRequestMessage brokerQuoteRequestMessage
                    = new BrokerQuoteRequestMessage();
            brokerQuoteRequestMessage
                    .setBrokerName(brokerRequest.getBrokerName());
            brokerQuoteRequestMessage
                    .setQuoteRequestSortPolicy(QuoteRequestSortPolicy.BUY_LOWEST);
            requests.add(brokerQuoteRequestMessage);
        }

        return requests;
    }
}

Трансформатор:

package com.l8mdv.sample.service.impl;

import com.l8mdv.sa.BrokerQuoteResponseMessage;
import org.springframework.util.Assert;

import java.util.List;

public class AggregationToBrokerQuoteResponseTransformer {

    public BrokerQuoteResponseMessage
    transform(List<BrokerQuoteResponseMessage> serviceResponses) {

        Assert.notNull(serviceResponses, "Mandatory argument missing.");

        BrokerQuoteResponseMessage bestQuote = null;
        for (BrokerQuoteResponseMessage 
                brokerQuoteResponseMessage: serviceResponses) {
            if (bestQuote == null)
                bestQuote = brokerQuoteResponseMessage;
            else {
                if (brokerQuoteResponseMessage.getSellPrice()
                        .compareTo(bestQuote.getSellPrice()) > 0)
                    bestQuote = brokerQuoteResponseMessage;
            }
        }

        return bestQuote;
    }
}

и интеграционный тест:

package com.l8mdv.sample;

import com.l8mdv.sa.BrokerQuoteResponseMessage;
import com.l8mdv.sa.BrokerRequest;
import com.l8mdv.sa.BrokerRequestMessage;
import com.l8mdv.sa.BrokerRequestType;
import com.l8mdv.sample.gateway.BrokerRequestGateway;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(
    locations = {"classpath:META-INF/spring/simple-splitter-aggregator.xml"}
)
public class SimpleSplitterAggregatorIntegrationTest {

    @Autowired
    public BrokerRequestGateway brokerRequestGateway;

    @Test
    public void run() throws Exception {
        BrokerRequestMessage requestMessage = new BrokerRequestMessage();

        BrokerRequest yahooRequest = new BrokerRequest();
        yahooRequest.setBrokerName("yahoo");
        yahooRequest.setBrokerRequestType(BrokerRequestType.QUOTE);
        requestMessage.getBrokerRequest().add(yahooRequest);

        BrokerRequest oxRequest = new BrokerRequest();
        oxRequest.setBrokerName("openex");
        oxRequest.setBrokerRequestType(BrokerRequestType.QUOTE);
        requestMessage.getBrokerRequest().add(oxRequest);

        BrokerQuoteResponseMessage response = 
           brokerRequestGateway.send(requestMessage);
    }

}