Статьи

Обработка ошибок Spring Integration с помощью маршрутизатора, ErrorChannel и Transformer

В этой статье объясняется, как обрабатываются ошибки при использовании системы обмена сообщениями с Spring Integration и как обрабатывать маршрут и перенаправлять на определенный канал.

Диаграмма высокого уровня

весна-mockrunner.xml

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:c="http://www.springframework.org/schema/c"
xmlns:context="http://www.springframework.org/schema/context"
default-autowire="default"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.1.xsd">


  <bean id="destinationManager" class="com.mockrunner.jms.DestinationManager"/>

      <bean id="inBoundQueue" factory-bean="destinationManager" factory-method="createQueue">
      <constructor-arg index="0" value="MOCKRUNNER-IN-QUEUE" />
      </bean>

      <bean id="configurationManager" class="com.mockrunner.jms.ConfigurationManager"/>

      <bean id="jmsQueueConnectionFactory" class="com.mockrunner.mock.jms.MockQueueConnectionFactory">
      <constructor-arg index="0" ref="destinationManager" />
      <constructor-arg index="1" ref="configurationManager" />
      </bean>

      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="jmsQueueConnectionFactory"/>
      </bean>     
</beans>

ApplicationContext-JMS-int.xml

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:c="http://www.springframework.org/schema/c"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integrationhttp://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jmshttp://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.1.xsd">

<import resource="spring-mockrunner.xml"/> 

      <int:poller default="true" fixed-delay="50"/>

      <int:channel id="inputChannel">
      <int:queue  capacity="5"/>
      </int:channel>

       <int:channel id="outputChannel">
       <int:queue  capacity="5"/>
      </int:channel>


<!--  chain, inbound message, convert msg to file -->    

 <int-jms:inbound-channel-adapter connection-factory="jmsQueueConnectionFactory" destination="inBoundQueue"  channel="inputChannel" >
 <int:poller fixed-rate="1000"></int:poller>
 </int-jms:inbound-channel-adapter>

  <int:transformer input-channel="inputChannel" ref="xmlMsgToCustomerPojoTransformer" output-channel="outputChannel">
  </int:transformer>

<int:payload-type-router  input-channel="outputChannel">
  <int:mapping type="org.springframework.oxm.UnmarshallingFailureException" channel="errorChannel"/>
  <int:mapping type="com.spijb.domain.Customer" channel="outputChannel" />
</int:payload-type-router>

<int:service-activator input-channel="errorChannel" ref="errorLogger" method="logError"/>

<int:service-activator input-channel="outputChannel" ref="customerActivator"  output-channel="nullChannel" method="handleCustomer"/>

<bean id="marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>com.spijb.domain.Customer</value>
</list>
</property>
</bean>

   <bean id="xmlMsgToCustomerPojoTransformer" class="com.spijb.transformers.CustomerMessageTransformer">
   <property name="unmarshaller" ref="marshaller" />
   </bean>



<bean id="customerActivator" class="com.spijb.serviceactivator.CustomerServiceActivator"/>

<bean id="errorLogger" class="com.spijb.seriveactivator.ErrorLogger" />
  </beans>

Маршрутизатор.   Компонент маршрутизатора можно использовать для рассылки сообщений по нескольким адресатам. Например, требование состоит в том, что когда исключение повышения в отображении должно быть отправлено в канал ошибок, в то время как в другом сценарии, например, при успешном разборе xml, его следует отправить в выходной канал. Компонент Router позаботится об этой задаче. 

PayloadTypeRouter : PayloadTypeRouter определяет маршрутизацию сообщений на разные каналы в зависимости 
от типа полезной нагрузки. Конечная точка маршрутизатора, подключенная к входящему каналу, будет оценивать тип и, соответственно, рассылать (направлять) сообщения другим каналам, ожидающим этот конкретный тип. Элемент payload-type-router используется для подключения этого типа логики маршрутизации
payload-type-router  
к входящему каналу, все-в-канале, который принимает все типы сообщений. Затем используйте атрибут mapping, чтобы установить ожидаемый тип и соответствующий ему канал. 

Все сообщения с типом полезной нагрузки 
UnmarshallingFailureException  будут отправляться в errorChannel  , а сообщения типа Customer отправляются в  outputChannel  . Это простая категоризация, 
основанная на типе полезной нагрузки.

Customer.java 

package com.spijb.domain;


import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement
public class Customer {

   String name;
   int age;
   int id;

   public String getName() {
      return name;
   }

   @XmlElement
   public void setName(String name) {
      this.name = name;
   }

   public int getAge() {
      return age;
   }

   @XmlElement
   public void setAge(int age) {
      this.age = age;
   }

   public int getId() {
      return id;
   }

   @XmlAttribute
   public void setId(int id) {
      this.id = id;
   }
   @Override
   public String toString()
   {
      return "Name "+this.name+"\n"+"Id: "+this.id+"\n"+" Age "+this.age;
   }
}

CustomerMessageTransformer.java

package com.spijb.transformers;

import java.io.IOException;
import java.io.StringReader;

import javax.xml.transform.stream.StreamSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.oxm.Unmarshaller;
import org.springframework.oxm.XmlMappingException;

import com.spijb.domain.Customer;


public class CustomerMessageTransformer {

   private static final Logger logger = LoggerFactory.getLogger(CustomerMessageTransformer.class); 
   private Unmarshaller unmarshaller;

   public void setUnmarshaller(Unmarshaller unmarshaller) {
      this.unmarshaller = unmarshaller;
   }

   @SuppressWarnings("rawtypes")
   public Message transform(String message)
   {
      logger.info("Message Received \r\n"+message);
      try {
         Customer customer = (Customer)unmarshaller.unmarshal(new StreamSource(new StringReader(message)));

         return MessageBuilder.withPayload(customer).build();
      } catch (XmlMappingException e) {
         return MessageBuilder.withPayload(e).build();
      } catch (IOException e){
         return MessageBuilder.withPayload(e).build();
        }
        catch(Exception e){
           return MessageBuilder.withPayload(e).build();
        }
   }
}

CustomerMessageTransformer: этот преобразователь, в основном, генерирует Exception, когда он возникает, и переносится в виде передачи сообщения на маршрутизатор полезной нагрузки, а затем маршрутизатор полезной нагрузки решает, какой канал должен переслать это исключение. В этом классе, если какой-либо тип не связанных с маршалом исключений, таких как XMLMappingException, IOException или Exception, генерирует исключение, он переносит его как Message и возвращает его, в противном случае он возвращает объект Customer. Здесь вы можете настроить или определить собственное исключение, обернуть его и вернуть, чтобы у вас был только один тип карты в маршрутизаторе типа полезной нагрузки.

ErrorLogger.java

package com.spijb.serviceactivator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.Message;
import org.springframework.integration.MessageDeliveryException;

public class ErrorLogger {

 private static final Logger logger = LoggerFactory.getLogger(ErrorLogger.class);   
   public void logError(Message<Exception> message)
   {
      Exception msgex=message.getPayload();
      logger.error("Exception "+msgex);
   }

}

ErrrorLogger: класс ErrorLoagger фактически обрабатывает исключения и входит в систему регистрации.

CustomerServiceActivator.java

package com.spijb.seriveactivator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.spijb.domain.Customer;


public class CustomerServiceActivator {

   private static final Logger logger = LoggerFactory.getLogger(CustomerServiceActivator.class); 
   public void handleCustomer(Customer customer)
   {
      logger.info("Customer information received and invoke some services here....\r\n"+customer);
   }

}

CustomerServiceActivator:  этот класс является сценарием успеха, я имею в виду, что если при преобразовании не возникает исключение, то маршрутизатор payloadtype направляет  сообщение клиента в outputChannel, затем будет вызываться этот метод класса handleCustomer. В этом классе нет реализации, просто ведение журнала информации о клиентах.

ErrorHandlingTest.java

package com.spijb.invoker;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.mockrunner.mock.jms.MockQueue;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:applicationContext-file-jms.xml"})
public class ErrorHandlingTest {


   @Autowired
   private JmsTemplate jmsTemplate;

   @Autowired
   private MockQueue inBoundQueue;


   @Test
   public void shouldSendCustomerToWebServiceActivator() throws InterruptedException
   {

      final StringBuffer sb = new StringBuffer();
      sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>")
       .append("<customer id=\"10001\">")
       .append("<age>35</age>")
       .append("<name>Upender</name>")
       .append("</customer>");
      jmsTemplate.send(inBoundQueue,new MessageCreator() {

      @Override
      public Message createMessage(Session session) throws JMSException {
         TextMessage textMessage = session.createTextMessage();
         textMessage.setText(sb.toString());
         return textMessage;
      }
   });
      Thread.sleep(3000);
   }

   @Test
   public void shouldSendMessageErrorLogActivator() throws InterruptedException
   {

      final StringBuffer sb = new StringBuffer();
      sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>")
       .append("<customer id=\"10001\">")
       .append("<age>a3<age>")
       .append("<name>Upender</name>")
       .append("</customer>");
      jmsTemplate.send(inBoundQueue,new MessageCreator() {

      @Override
      public Message createMessage(Session session) throws JMSException {
         TextMessage textMessage = session.createTextMessage();
         textMessage.setText(sb.toString());
         return textMessage;
      }
   });
     Thread.sleep(3000);
   }
}

ErrorHandlerTest: у тестового интеграционного класса Spring есть два метода: один метод shouldSendCustomerToWebServiceActivator, который отправит полный сценарий успеха, а другой метод shouldSendMessageErrorLogActivator   отправит недопустимое сообщение xml и потерпит неудачу при немаршальном xml объекту java.

Неудачный вывод:

12:58:51.798 [task-scheduler-4] ERROR c.spijb.seriveactivator.ErrorLogger - Exception org.springframework.integration.MessageHandlingException: org.springframework.expression.spel.SpelEvaluationException: EL1004E:(pos 8): Method call: Method handleCustomer(org.springframework.oxm.UnmarshallingFailureException) cannot be found on com.spijb.seriveactivator.CustomerServiceActivator type

Результат на выходе:

13:00:47.245 [task-scheduler-5] INFO  c.s.t.CustomerMessageTransformer - Message Received <?xml version="1.0" encoding="UTF-8" standalone="yes"?><customer id="10001"><age>35</age><name>Upender</name></customer>
13:00:47.277 [task-scheduler-6] INFO  c.s.s.CustomerServiceActivator - Customer information received and invoke some services here....
Name Upender
Id: 10001
Age 35

Прикрепленный файл maven pom.xml для всех зависимостей.