Статьи

Spring Integration — Адаптеры шлюза сообщений

В этой статье будет представлена ​​методика обработки типов ответов из подсистем сообщений в основанной на службах архитектуре, управляемой сообщениями (MDA). Он предоставит некоторую теорию, опыт разработки подсистемы сообщений и некоторый надежный рабочий код, который выполняется в нескольких производственных развертываниях финансовых учреждений.

Поскольку Spring Integration поддерживается хорошо разработанным набором Spring API, знакомым большинству разработчиков, я выбрал это в качестве основы для этой статьи.

Если бы разработчикам когда-либо приходилось создавать интеграционные потоки для обработки положительных бизнес-ответов, эта статья не была бы необходимой. Однако развитие в реальном мире, особенно в средах интеграции служб, приводит к тому, что разработчикам приходится иметь дело не только с положительными ответами бизнеса, но и с исключениями бизнеса, техническими исключениями и услугами, которые перестают отвечать.

Spring Integration Gateways

Шлюзы Spring Integration (SI) являются точками входа для подсистем сообщений. S
pecify шлюз с использованием интерфейса Java и реализаций динамического прокси автоматически генерируются в рамках SI во время выполнения таким образом, что он может быть использован любой фасолью , в котором он инжектированный. Между прочим,
шлюзы SI также
предлагают ряд других преимуществ, таких как поведение времени ожидания запроса и ответа.


Типичные приложения, управляемые сообщениями (MDA)

Как и в случае шаблонов проектирования Java для разработки приложений, аналогичные шаблоны существуют для приложений интеграции. Одним из таких общих шаблонов интеграции является цепочка услуг — ряд сервисов соединен последовательно или параллельно, которые вместе выполняют бизнес-функцию. Если вы создали и развернули сервисы, используя MuleSoft Mule, SpringSource Spring Integration, FuseSource ServiceMix (Camel), Oracle Service Bus или IBM WebSphere ESB, есть вероятность, что вы уже создали приложение с использованием chained-сервисов. Этот шаблон будет становиться все более распространенным, поскольку индустрия программного обеспечения переходит от топологий клиент-сервер к сервис-ориентированным архитектурам.

Недавнее мероприятие, в котором Incept5 предоставил консультации по архитектуре, будет использовано в качестве примера реализации шаблона приложения цепочки услуг. Для участия команды потребовалось создать решение, которое переводило бы финансовые сообщения (SWIFT FIN) из исходного состояния (ISO15022) через привязку, проверку и преобразование (в XML) и в конечном итоге добавлялось в хранилище данных для дальнейшей обработки или управления бизнес-исключениями.

Используя обозначение графа EIP, состав службы первого этапа может быть представлен следующим образом. Сообщение документа поступает в домен приложения, анализируется и связывается с объектом Java, семантически проверяется как сообщение SWIFT, преобразуется в XML и затем сохраняется в хранилище данных. Напомним, что связывание, семантическая проверка и преобразование выполняются продуктом C24 iO. Кроме того, полный пример решения, который можно найти в GitHub для этого проекта, содержит конфигурации диспетчера, чтобы пулы потоков можно было использовать для каждой подсистемы сообщений, для краткости и ясности такие детали будут опущены в этой статье.


Несмотря на то, что эта конфигурация определяет цепочку услуг, ее недостаточно для формирования основ надежного производственного развертывания. Исключение, выброшенное какой-либо из служб, будет отброшено обратно к шлюзу входа, что приведет к потере контекста, т. Е. К какой службе выдается исключение, любой код отсутствия ответа, вызванный службой, может привести к тому, что его поток Java будет припаркован, а нулевые значения возвращены служба может вызвать непредвиденные проблемы или даже зависнуть входной шлюз; если входной шлюз используется в отличие от этой схемы.

Неотзывчивый вызов службы

Специальная конструкция Spring Integration для доступа к подсистеме сообщений — это шлюз. Несмотря на кажущуюся простоту, SI-шлюз является мощной функцией, которая приводит к генерации динамического прокси-сервера, генерируемого Spring GatewayProxyFactoryBean. Этот bean-компонент может быть внедрен в новый или существующий код или службы в качестве реализации интерфейса. Шлюз SI также предоставляет средства для
работы с таймаутами и предоставляет некоторые средства обработки ошибок. Типичная конфигурация XML пространства имен Spring Integration выглядит следующим образом:
<int:channel id="parsing-gw-request-channel" datatype="java.lang.String">
  <int:queue capacity="${gateway.parse.queue.capacity}"/>
</int:channel>
    

<int:gateway id="parseGateway"
             service-interface=

                 "com.c24.solution.swift.flatten.gateway.ParseGateway"

             default-request-channel="parsing-gw-request-channel"
             default-reply-channel="parsing-gw-reply-channel"
             default-reply-timeout="${gateway.parse.timeout}"/>

<int:channel id="parsing-gw-reply-channel" 

             datatype="biz.c24.io.api.data.ComplexDataObject"/>

Развивая дизайн, на следующем этапе проектирования архитектуры приложений необходимо использовать преимущества этих шлюзов, контекст Spring Integration можно довольно просто расширить, создав интерфейс Java для каждого сервиса.
Построение шлюзов подсистемы сообщений ведет нас к модели проектирования, подобной следующей:

Небольшое количество дополнительной конфигурации и некоторые очень простые интерфейсы Java означают, что разработчики теперь могут настраивать тайм-ауты запроса / ответа на шлюзе и избегать любого не отвечающего кода — предполагая (как мы всегда должны), что код, написанный локально или предоставленный сторонними разработчиками, способен плохое поведение.

Кроме того, шлюз SI позволяет указать канал обработки ошибок, чтобы у вас была возможность обрабатывать ошибки. Этот дизайн, который я здесь представляю, не будет использовать каналы ошибок, а будет обрабатывать их по-другому — надеюсь, это станет более очевидным по мере развития дизайна.

Конкретную семантику и примеры конфигурации для шлюзов и времени ожидания шлюзов можно найти в справочном материале, предоставленном SpringSource и другими блогами .

A significant benefit to the integration solution has been added with the use of Spring Integration gateways. However, further improvements need to be made for the following reasons:

A gateway timeout will result in a null value being returned to the calling, or outer, flow. This is a business exception condition that means that the payload that was undergoing processing needs to be pushed into a business exception management process in order that it can be progressed through to conclusion — normally dictated by the business. If a transient technical issue caused this problem, resubmission may solve the exception, otherwise further investigation will be required. Whatever the eventual outcome, context about the failure and it’s location need to be recorded and made available to the business exception management operative. The context must contain information about the message sub-system that failed to process the message and the message itself.

  • Exceptions generated by message sub-systems can be handled in a few different ways, through the error channel inside the gateway undergoing message processing failure or by the calling flow. Again, context needs to be recorded. In this case, the technical exception needs to be added to the context, along with the gateway processing the message undergoing failure and any additional information that may be used within the business exception management process for resolution.
  •  

    If transactions were involved in this flow, for example a transactional JMS message consumption flow trigger, it would be possible to rollback the transaction in order that it can be re-queued to a DLQ. However, the design in this software compensates for failures by pushing failing messages to a destination configured by the developer directly; and this may of course be a JMS queue if that’s required. This avoids transaction rollback and adds the benefit of exception context directly rather than operations staff and developers having to scour logs to locate exception details.

    The Gateway Adapter

    In order to handle all exceptions taking place within a message sub-system and also handle null values a Gateway Adapter class can be used. The reason that the SI gateway error channel is not used for this purpose is that it would be complex to define and would have to be done in several places. The gateway adapter allows all business exception conditions to be handled in one place and treat them in the same way. The Gateway Adapter is a custom written Spring bean that invokes the injected gateway directly and manages null and exception responses before allowing the invocation request to return to the caller (or outer flow).

    The architectural design diagram evolved from the previous design phase includes a service activator backed by a Gateway Adapter bean, this calls the injected gateway (dynamic proxy) which calls the business service.

    Nuts and Bolts

    The design diagrams are a useful guide for making the point but maybe the more interesting part is the configuration and code itself. As with the entire solution, the outer or calling flow can be seen in the project located in GitHub, however a useful snippet to view is one of the gateway adapter namespace configurations, the gateway, the gateway adapter and the gateway service configuration. As a number of gateways exist in this application, and they all follow the same pattern, the following configuration and code will merely demonstrate one of them.

    Gateway Adapter Namespace Configuration

    <int:channel id="message-parse-channel" datatype="java.lang.String"/>
    <int:chain input-channel="message-parse-channel" 
               output-channel="message-validate-channel">
        <int:service-activator ref="parseGatewayService" method="service"/>
    </int:chain>

    Gateway

    package com.c24.solution.swift.flatten.gateway;
    
    import org.springframework.integration.Message;
    
    /**
     * @author Matt Vickery - [email protected]
     * @since 17/05/2012
     */
    public interface ParseGateway {
        public Message<?> send(Message<String> message);
    }

    GatewayAdapter

    package com.c24.solution.swift.flatten.gateway.adapter;
    
    import biz.c24.io.api.data.ComplexDataObject;
    import com.c24.solution.swift.flatten.exception.ExceptionContext;
    import com.c24.solution.swift.flatten.exception.ExceptionSubContext;
    import com.c24.solution.swift.flatten.gateway.ExceptionGatewayService;
    import com.c24.solution.swift.flatten.gateway.ParseGateway;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.integration.Message;
    import org.springframework.util.Assert;
    
    /**
     * @author Matt Vickery - [email protected]
     * @since 17/05/2012
     */
    public class ParseGatewayService extends AbstractGatewayService {
    
      private static final Logger LOG = 
    
          LoggerFactory.getLogger(ParseGatewayService.class);
      private final ParseGateway parseGateway;
      private final ExceptionGatewayService exceptionGatewayService;
    
      public ParseGatewayService(
    
             final ParseGateway parseGateway,
             final ExceptionGatewayService exceptionGatewayService) {
          this.parseGateway = parseGateway;
          this.exceptionGatewayService = exceptionGatewayService;
      }
    
      public Message<ComplexDataObject> service(Message<String> message) {
    
          Message<?> response;
          try {
              LOG.debug("Entering parse gateway.");
              response = parseGateway.send(message);
          } catch (RuntimeException e) {
              LOG.error("Exception response .. {}, exception: {}", getClass(), e);
              LOG.error("Invoking ... process because: {}.", e.getCause());
              buildExceptionContextAndDispatch(
    
                message, 
    
                ExceptionContext.PARSE_FAILURE,
                ExceptionSubContext.EXCEPTION_GATEWAY_RESPONSE, 
    
                exceptionGatewayService);
              throw e;
          }
    
          if (response != null) {
            if (!(response.getPayload() instanceof ComplexDataObject))
                throw new IllegalStateException(INTERRUPTING_..._EXCEPTION);
          } else {
            LOG.info("Null response received ....", getClass());
            buildExceptionContextAndDispatch(
    
              message, 
    
              ExceptionContext.PARSE_FAILURE,
              ExceptionSubContext.NULL_GATEWAY_RESPONSE, 
    
              exceptionGatewayService);
            throw new GatewayAdapterException(NULL_GATEWAY_RESPONSE_CAUGHT);
          }
    
          Assert.state(response.getPayload() instanceof ComplexDataObject);
          return (Message<ComplexDataObject>) response;
      }
    }

    GatewayService

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:c24="http://schema.c24.biz/spring-integration"
           xmlns:int="http://www.springframework.org/schema/integration"
           xsi:schemaLocation="http://www.springframework.org/schema/integration
     http://www..org/schema/integration/spring-integration.xsd  http://schema.c24.biz/spring-integration
     http://schema.c24.biz/spring-integration.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <int:channel id="parsing-gw-request-channel" 
                     datatype="java.lang.String">
            <int:queue capacity="${gateway.parse.queue.capacity}"/>
        </int:channel>
        
    
        <int:gateway 
    
          id="parseGateway"
          service-interface="com.c24.solution.swift.flatten.gateway.ParseGateway"
          default-request-channel="parsing-gw-request-channel"
          default-reply-channel="parsing-gw-reply-channel"
          default-reply-timeout="${gateway.parse.timeout}"/>
        
    
        <int:channel id="parsing-gw-reply-channel" 
                     datatype="biz.c24.io.api.data.ComplexDataObject"/>
    
        <int:chain input-channel="parsing-gw-request-channel" 
                   output-channel="parsing-gw-reply-channel">
            <int:poller fixed-delay="50" 
                        task-executor="binding-thread"/>
            <c24:unmarshalling-transformer 
    
              id="c24Mt541UnmarshallingTransformer"
              source-factory-ref="textualSourceFactory"
              model-ref="mt541Model"/>
        </int:chain>
    
    </beans>

     

    Summary

    Независимо от того, какой тип системы интеграции на основе сообщений вы разрабатываете, как только вы выйдете за рамки использования примера кода для создания прототипа технологии и, давайте посмотрим правде в глаза, появятся миллионы строк кода, скопированных из тривиальных примеров, и вам нужно учитывать вызов перед лицом технических исключений, бизнес-исключений и неотвечающих услуг.
    Использование шлюзов Spring Integration для представления входа в подсистемы сообщений означает, что мы должны быть в состоянии справиться со всеми этими типами поведения. Шаблон адаптера шлюза обеспечивает единую центральную точку обработки для таких условий.
    Назначение шлюзового адаптера должно быть ясно из примера конфигурации и предоставленного кода, но его также можно запустить и протестировать с использованием полного источника, расположенного на GitHub.


    . Please leave any feedback or comments as you see fit.