Статьи

Spring Integration — Робастный сплиттер-агрегатор

Надежная стратегия проектирования Splitter Aggregator — шаблон адаптера шлюза обмена сообщениями

Что мы подразумеваем под устойчивым?

В контексте этой статьи устойчивость означает способность управлять условиями исключения в потоке без немедленного возврата к вызывающей стороне. В некоторых сценариях обработки n из М ответов достаточно хорошо, чтобы перейти к выводу. Примеры сценариев обработки, которые обычно имеют следующие тенденции:

  1. Расценки на финансы, страхование и системы бронирования.
  2. Разветвленные издательские системы.

Зачем нам нужны надежные конструкции агрегатора сплиттера?

Прежде всего, может понадобиться введение в типичную схему агрегатора Splitter. Разделитель — это шаблон EIP, который описывает механизм разбиения составных сообщений на части, чтобы их можно было обрабатывать по отдельности. Маршрутизатор — это шаблон EIP, который описывает маршрутизацию сообщений в каналы, нацеливая их на конкретные конечные точки обмена сообщениями. Агрегатор — это шаблон EIP, который объединяет и хранит набор сообщений, принадлежащих группе, и освобождает их, когда эта группа завершена.

Вместе эти три EIP-конструкции образуют мощный механизм для разделения обработки на отдельные единицы работы. Spring Integration (SI) использует ту же терминологию шаблонов, что и EIP, поэтому читатели этой методологии будут вполне знакомы с конструкциями Spring Integration Framework. Инфраструктура SI позволяет существенно настроить все три из этих конструкций и, более того, просто используя асинхронные каналы, как в любой другой многопоточной конфигурации, позволяет выполнять эти блоки работы параллельно.

Интересной проблемой при работе с проекторами SI Splitter Aggregator является создание надлежащим образом устойчивых потоков, которые предсказуемо работают в ряде сценариев вызова. Простая конструкция агрегатора-сплиттера может использоваться во многих случаях и работать без сложной настройки конструкций SI. Однако некоторые требования к обслуживанию требуют более надежной стратегии обработки и, следовательно, более сложной конфигурации. Следующие разделы описывают и показывают, как на самом деле выглядит конструкция Simple Splitter Aggregator, тип обработки, с которым должна работать ваша конструкция, а затем предлагаются варианты решения для более надежной обработки.

Простой агрегатор сплиттеров

В следующей схеме Splitter Aggregator показан простой поток, который получает сообщения с запросами документов в шлюз обмена сообщениями, разбивает сообщение на два маршрута обработки и затем агрегирует ответ. Обратите внимание, что диаграмма была построена из конструкций EIP в OmniGraffle, а не в виде графика интеграции из STS; ради краткости отсутствуют каналы на диаграмме.

СИ конструирует подробно:

Шлюз обмена сообщениями — есть три шлюза обмена сообщениями. Для спецификаций шлюза доступно несколько конфигураций, но они могут возвращать бизнес-объекты, исключения и нули (по истечении времени ожидания). Крайний левый шлюз — это служебный шлюз, для которого мы определяем поток. Два других шлюза, между маршрутизатором и агрегатором, являются внешними системами, которые будут предоставлять ответы на бизнес-вопросы, которые генерирует наш поток.

Разделитель — существует единственный разделитель, который отвечает за потребление сообщения документа и создание коллекции сообщений для последующей обработки. Сигнатура Java для, чаще всего, пользовательского Splitter указывает единственный аргумент объекта и коллекцию для возврата.

Маршрутизатор списка получателей — существует один маршрутизатор, можно использовать любой подходящий маршрутизатор, выберите тот, который соответствует вашим требованиям — вы можете легко маршрутизировать по выражению или типу полезной нагрузки. Основным назначением маршрутизатора является маршрутизация коллекции сообщений, предоставляемых разделителем. Это довольно типичная конфигурация Splitter Aggregator.

Агрегатор — единая конструкция, которая отвечает за сбор сообщений в группе для дальнейшей обработки ответов шлюза. Хотя агрегатор может быть настроен с атрибутами и определениями компонентов для предоставления альтернативных стратегий группировки и выпуска, чаще всего достаточно стратегии агрегации по умолчанию.

Интересные аспекты работы агрегатора Splitter

  1. Шлюз — входящий шлюз, крайний левый, может иметь или не иметь указанную ссылку на компонент обработки ошибок. Если это произойдет, то у этого компонента будет возможность обрабатывать исключения, выданные в потоке справа от этого шлюза. Если нет, то любое исключение будет выброшено прямо из шлюза.
  2. Шлюз — на каждом из шлюзов может быть установлен необязательный тайм-аут по умолчанию , есть существенные последствия для установки этого значения, убедитесь, что они хорошо поняты. Истекший тайм-аут приведет к возврату нулевого значения из шлюза. Это то же самое условие, которое может привести к тому, что поток будет припаркован, если в вышестоящем шлюзе также не задан тайм-аут по умолчанию .
  3. Входной канал Splitter — это может быть простой прямой канал или прямой канал с определенным диспетчером. Если на канале указан диспетчер, поток ниже этой точки будет асинхронным, многопоточным. Это также изменяет семантику вышестоящего шлюза, поскольку обычно это означает, что в противном случае становится активным в противном случае бессрочное время ожидания ответа по умолчанию .
  4. Splitter — разделитель должен возвращать один объект. Единственный объект, возвращаемый разделителем, является коллекцией, java.util.List. Инфраструктура SI будет принимать каждого члена этого списка и подавать его в выходной канал Splitter — как в этом примере, обычно прямо в маршрутизатор. Контракт на возврат Splitter List заключается в том, что он используется в Java — он может содержать ноль, один или несколько элементов. Если сплиттер возвращает пустой список, маловероятно, что у маршрутизатора будет какая-либо работа, и поэтому вызов потока будет завершен. Однако, если список содержит один элемент, инфраструктура SI извлечет этот элемент из списка и протолкнет его в маршрутизатор, если он успешно маршрутизируется, поток будет продолжен.
  5. Маршрутизатор — в этом примере маршрутизатор просто направит сообщения в один из двух шлюзов.
  6. Шлюзы — интересны два шлюза, которые используются между Сплиттером и Агрегатором. В этом примере я использовал шаблон EIP общего шлюза для представления подсистемы сообщений, но не определил его явно — мы могли бы использовать исходящий шлюз HTTP, другой поток SI или любую другую внешнюю систему. Конечно, для каждой из этих подсистем возможен ряд ответов. В зависимости от протокола и внешней системы запрос сообщения может не отправиться, ответ не может быть получен, запущен длительный процесс, произошла сетевая ошибка или тайм-аут или общее исключение обработки.
  7. Агрегатор — один агрегатор будет ожидать нескольких ответов в зависимости от того, что было создано Splitter. В случае, если список возврата разделителя пуст, агрегатор не будет вызван. В случае, если список возврата Splitter имеет одну запись, агрегатор будет ожидать ответа одного шлюза для завершения группы. В случае, если список Splitter содержит n записей, агрегатор будет ожидать n записей для завершения группы. Пользовательские стратегии корреляции, стратегии выпуска и хранилища сообщений могут быть внедрены среди множества аспектов конфигурации.

Интересные аспекты работы простого агрегатора сплиттера

Основным решающим фактором для определения того, соответствует ли этот тип простого шлюза требованиям, является понимание того, что происходит в случае сбоя. Если какое-либо исключение, возникающее в вашем потоке SI, приводит к тому, что вызов потока отменяется и соответствует вашим требованиям, больше нет необходимости читать дальше. Однако, если вам необходимо продолжить обработку после сбоя в одном из шлюзов, оставшаяся часть этой статьи может представлять больший интерес.

Исключения из любого источника, сгенерированного между сплиттером и агрегатором, приводят к тому, что агрегатор отбрасывает пустую или частичную группу. Исключение будет распространяться обратно к ближайшему вышестоящему шлюзу для обработки пользовательским компонентом или повторной передачи шлюзом. Обратите внимание, что пользовательскую стратегию выпуска в Агрегаторе сложно использовать, особенно в случае тайм-аутов, но она не поможет в этом случае, поскольку исключение будет распространяться обратно к крайнему левому шлюзу до вызова агрегатора.

Также возможно настроить обработчики исключений на самых внутренних шлюзах, сообщение об исключении может быть перехвачено, но как перенаправить сообщения из настраиваемого обработчика исключений в агрегатор для завершения группы, внедрить определение канала агрегатора в обработчик настраиваемого исключения? Это плохой подход, при котором распаковывается полезная нагрузка сообщения об исключении, копируется заголовок исходного сообщения в новое сообщение SI, а затем добавляется исходная полезная нагрузка — всего четыре или пять строк кода, но это грязно.

После генерации исключения сообщения об исключении ( без модификации ) не могут быть направлены в агрегатор для завершения группы. Исходное сообщение, содержащее идентификаторы корреляции и последовательности для группы и позиции группы, находится внутри полезной нагрузки исключений сообщений SI.

Если обработка должна продолжаться после генерации исключения, должно быть ясно, что для продолжения обработки необходимо выполнить следующее:

  • группа агрегации должна быть завершена,
  • любые исключения должны быть перехвачены и обработаны, прежде чем вернуться к входному шлюзу шкафа,
  • идентификаторы корреляции и последовательности, которые позволяют завершение группы в агрегаторе, скрыты в полезной нагрузке сообщения об исключении и потребуют извлечения и настройки для сообщения, связанного для агрегатора.

Более надежное решение — шаблон адаптера шлюза обмена сообщениями

Работа с исключениями и нулевыми возвратами из шлюзов, естественно, приводит к разработке, которая реализует оболочку вокруг шлюза обмена сообщениями. Это обеспечивает уровень контроля, который в противном случае было бы очень трудно установить.

Этот метод адаптера позволяет перехватывать и обрабатывать все возвраты из шлюзов обмена сообщениями, поскольку шлюз обмена сообщениями вводится в Service Activator и вызывается непосредственно из него. Шлюз обмена сообщениями больше не отвечает на агрегатор напрямую, он отвечает на пользовательский Java-бин Spring, настроенный в определении пространства имен Service Activator. Как и ожидалось, обработка, которая не подвергается исключению, будет продолжаться как обычно. Те потоки, которые испытывают условия исключения или неожиданные или отсутствующие ответы от шлюзов обмена сообщениями, должны обрабатывать сообщения таким образом, чтобы группы сообщений, связанные для агрегирования, могли быть завершены. Если бы Service Activator разрешил распространение исключения за пределы своего компонента поддержки, группа не была бы завершена. То же самое относится не только к исключениям, но и к любому возвращаемому объекту, который не имеет идентификатора корреляции обязательных групп и заголовков последовательностей — здесь применяется адаптация.

Сообщения об исключениях или нулевые ответы от шлюзов сообщений перехватываются и обрабатываются, как показано в следующем примере кода:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import com.l8mdv.sample.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.Message;
import org.springframework.integration.MessageHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.util.Assert;
 
public class AvsServiceImpl implements AvsService {
 
    private static final Logger logger
            = LoggerFactory.getLogger(AvsServiceImpl.class);
     
 
    public static final String MISSING_MANDATORY_ARG
            = "Mandatory argument is missing.";
     
 
    private AvsGateway avsGateway;
 
    public AvsServiceImpl(final AvsGateway avsGateway) {
        this.avsGateway = avsGateway;
    }
 
    public Message<AvsResponse> service(Message<AvsRequest> message) {
 
        Assert.notNull(message, MISSING_MANDATORY_ARG);
        Assert.notNull(message.getPayload(), MISSING_MANDATORY_ARG);
 
        MessageHeaders requestMessageHeaders = message.getHeaders();
        Message<AvsResponse> responseMessage = null;
        try {
            logger.debug("Entering AVS Gateway");
            responseMessage = avsGateway.send(message);
            if (responseMessage == null)
                responseMessage = buildNewResponse(requestMessageHeaders,
                        AvsResponseType.NULL_RESULT);
            logger.debug("Exited AVS Gateway");
             
 
            return responseMessage;
        }
 
        catch (Exception e) {
            return buildNewResponse(responseMessage, requestMessageHeaders,
                    AvsResponseType.EXCEPTION_RESULT, e);
        }
    }
 
    private Message<AvsResponse>
 
                     buildNewResponse(MessageHeaders requestMessageHeaders,
                     AvsResponseType avsResponseType) {
 
        Assert.notNull(requestMessageHeaders, MISSING_MANDATORY_ARG);
        Assert.notNull(avsResponseType, MISSING_MANDATORY_ARG);
 
        AvsResponse avsResponse = new AvsResponse();
        avsResponse.setError(avsResponseType);
 
        return MessageBuilder.withPayload(avsResponse)
                .copyHeadersIfAbsent(requestMessageHeaders).build();
    }
 
    private Message<AvsResponse>
 
                     buildNewResponse(Message<AvsResponse> responseMessage,
                     MessageHeaders requestMessageHeaders,
                     AvsResponseType avsResponseType,
                     Exception e) {
 
        Assert.notNull(responseMessage, MISSING_MANDATORY_ARG);
        Assert.notNull(responseMessage.getPayload(), MISSING_MANDATORY_ARG);
        Assert.notNull(requestMessageHeaders, MISSING_MANDATORY_ARG);
        Assert.notNull(avsResponseType, MISSING_MANDATORY_ARG);
        Assert.notNull(e, MISSING_MANDATORY_ARG);
 
        AvsResponse avsResponse = new AvsResponse();
        avsResponse.setError(avsResponseType,
                responseMessage.getPayload(), e);
 
        return MessageBuilder.withPayload(avsResponse)
                .copyHeadersIfAbsent(requestMessageHeaders).build();
    }
}

Обратите внимание на последнюю строку предложения catch блока обработки исключений. Эта строка кода копирует заголовки корреляции и последовательности в ответное сообщение, это обязательно, если группа агрегации будет разрешено завершить, и всегда будет необходимо после исключения, как показано здесь.

Последствия использования этой техники

Нет сомнений, что введение адаптера шлюза обмена сообщениями в конфигурацию SI делает конфигурацию более сложной для чтения и отслеживания. Ключевым фактором здесь является то, что больше нет линейной прогрессии через файл конфигурации. Это связано с тем, что Service Activator должен перенаправить ссылку на шлюз или шлюз, определенный до его адаптации Service Activator — в обоих случаях результат одинаков.

Ресурсы

Примечание: — Дизайн программного обеспечения, который послужил основой для создания этого мета-шаблона, основывался на требовании, чтобы к нескольким службам внешней оценки рисков обращался единый центральный сервис по оценке рисков. Чтобы удовлетворить клиентов службы, вызов должен происходить параллельно и продолжаться, несмотря на сбой в любой из этих внешних служб. Это требование привело к разработке шаблона адаптера шлюза обмена сообщениями для проекта.

  1. Spring Integration Справочное руководство
  2. Подход к решению этой проблемы обсуждался непосредственно с Марком Фишером (SpringSource) в контексте построения потоков оценки рисков для крупного финансового института США. Хотя конфигурация и код защищены NDA и авторскими правами, в этой статье приемлемо выразить намерение разработки и подобный код.

Ссылка: Spring Integration — Надежный агрегатор разветвления от нашего партнера JCG Мэтта Викери в блоге TechiQuest .