В этой статье объясняется, как обрабатываются ошибки при использовании системы обмена сообщениями с Spring Integration и как обрабатывать маршрут и перенаправлять на определенный канал.
Диаграмма высокого уровня
весна-mockrunner.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:c="http://www.springframework.org/schema/c"
xmlns:context="http://www.springframework.org/schema/context"
default-autowire="default"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.1.xsd">
<bean id="destinationManager" class="com.mockrunner.jms.DestinationManager"/>
<bean id="inBoundQueue" factory-bean="destinationManager" factory-method="createQueue">
<constructor-arg index="0" value="MOCKRUNNER-IN-QUEUE" />
</bean>
<bean id="configurationManager" class="com.mockrunner.jms.ConfigurationManager"/>
<bean id="jmsQueueConnectionFactory" class="com.mockrunner.mock.jms.MockQueueConnectionFactory">
<constructor-arg index="0" ref="destinationManager" />
<constructor-arg index="1" ref="configurationManager" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsQueueConnectionFactory"/>
</bean>
</beans>
ApplicationContext-JMS-int.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:c="http://www.springframework.org/schema/c"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integrationhttp://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jmshttp://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context-3.1.xsd">
<import resource="spring-mockrunner.xml"/>
<int:poller default="true" fixed-delay="50"/>
<int:channel id="inputChannel">
<int:queue capacity="5"/>
</int:channel>
<int:channel id="outputChannel">
<int:queue capacity="5"/>
</int:channel>
<!-- chain, inbound message, convert msg to file -->
<int-jms:inbound-channel-adapter connection-factory="jmsQueueConnectionFactory" destination="inBoundQueue" channel="inputChannel" >
<int:poller fixed-rate="1000"></int:poller>
</int-jms:inbound-channel-adapter>
<int:transformer input-channel="inputChannel" ref="xmlMsgToCustomerPojoTransformer" output-channel="outputChannel">
</int:transformer>
<int:payload-type-router input-channel="outputChannel">
<int:mapping type="org.springframework.oxm.UnmarshallingFailureException" channel="errorChannel"/>
<int:mapping type="com.spijb.domain.Customer" channel="outputChannel" />
</int:payload-type-router>
<int:service-activator input-channel="errorChannel" ref="errorLogger" method="logError"/>
<int:service-activator input-channel="outputChannel" ref="customerActivator" output-channel="nullChannel" method="handleCustomer"/>
<bean id="marshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>com.spijb.domain.Customer</value>
</list>
</property>
</bean>
<bean id="xmlMsgToCustomerPojoTransformer" class="com.spijb.transformers.CustomerMessageTransformer">
<property name="unmarshaller" ref="marshaller" />
</bean>
<bean id="customerActivator" class="com.spijb.serviceactivator.CustomerServiceActivator"/>
<bean id="errorLogger" class="com.spijb.seriveactivator.ErrorLogger" />
</beans>
Маршрутизатор. Компонент маршрутизатора можно использовать для рассылки сообщений по нескольким адресатам. Например, требование состоит в том, что когда исключение повышения в отображении должно быть отправлено в канал ошибок, в то время как в другом сценарии, например, при успешном разборе xml, его следует отправить в выходной канал. Компонент Router позаботится об этой задаче.
от типа полезной нагрузки. Конечная точка маршрутизатора, подключенная к входящему каналу, будет оценивать тип и, соответственно, рассылать (направлять) сообщения другим каналам, ожидающим этот конкретный тип. Элемент payload-type-router используется для подключения этого типа логики маршрутизации
к входящему каналу, все-в-канале, который принимает все типы сообщений. Затем используйте атрибут mapping, чтобы установить ожидаемый тип и соответствующий ему канал.
Все сообщения с типом полезной нагрузки
UnmarshallingFailureException будут отправляться в errorChannel , а сообщения типа Customer отправляются в outputChannel . Это простая категоризация,
основанная на типе полезной нагрузки.
Customer.java
package com.spijb.domain;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement
public class Customer {
String name;
int age;
int id;
public String getName() {
return name;
}
@XmlElement
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
@XmlElement
public void setAge(int age) {
this.age = age;
}
public int getId() {
return id;
}
@XmlAttribute
public void setId(int id) {
this.id = id;
}
@Override
public String toString()
{
return "Name "+this.name+"\n"+"Id: "+this.id+"\n"+" Age "+this.age;
}
}
CustomerMessageTransformer.java
package com.spijb.transformers;
import java.io.IOException;
import java.io.StringReader;
import javax.xml.transform.stream.StreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.oxm.Unmarshaller;
import org.springframework.oxm.XmlMappingException;
import com.spijb.domain.Customer;
public class CustomerMessageTransformer {
private static final Logger logger = LoggerFactory.getLogger(CustomerMessageTransformer.class);
private Unmarshaller unmarshaller;
public void setUnmarshaller(Unmarshaller unmarshaller) {
this.unmarshaller = unmarshaller;
}
@SuppressWarnings("rawtypes")
public Message transform(String message)
{
logger.info("Message Received \r\n"+message);
try {
Customer customer = (Customer)unmarshaller.unmarshal(new StreamSource(new StringReader(message)));
return MessageBuilder.withPayload(customer).build();
} catch (XmlMappingException e) {
return MessageBuilder.withPayload(e).build();
} catch (IOException e){
return MessageBuilder.withPayload(e).build();
}
catch(Exception e){
return MessageBuilder.withPayload(e).build();
}
}
}
CustomerMessageTransformer: этот преобразователь, в основном, генерирует Exception, когда он возникает, и переносится в виде передачи сообщения на маршрутизатор полезной нагрузки, а затем маршрутизатор полезной нагрузки решает, какой канал должен переслать это исключение. В этом классе, если какой-либо тип не связанных с маршалом исключений, таких как XMLMappingException, IOException или Exception, генерирует исключение, он переносит его как Message и возвращает его, в противном случае он возвращает объект Customer. Здесь вы можете настроить или определить собственное исключение, обернуть его и вернуть, чтобы у вас был только один тип карты в маршрутизаторе типа полезной нагрузки.
ErrorLogger.java
package com.spijb.serviceactivator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.Message;
import org.springframework.integration.MessageDeliveryException;
public class ErrorLogger {
private static final Logger logger = LoggerFactory.getLogger(ErrorLogger.class);
public void logError(Message<Exception> message)
{
Exception msgex=message.getPayload();
logger.error("Exception "+msgex);
}
}
ErrrorLogger: класс ErrorLoagger фактически обрабатывает исключения и входит в систему регистрации.
CustomerServiceActivator.java
package com.spijb.seriveactivator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.spijb.domain.Customer;
public class CustomerServiceActivator {
private static final Logger logger = LoggerFactory.getLogger(CustomerServiceActivator.class);
public void handleCustomer(Customer customer)
{
logger.info("Customer information received and invoke some services here....\r\n"+customer);
}
}
CustomerServiceActivator: этот класс является сценарием успеха, я имею в виду, что если при преобразовании не возникает исключение, то маршрутизатор payloadtype направляет сообщение клиента в outputChannel, затем будет вызываться этот метод класса handleCustomer. В этом классе нет реализации, просто ведение журнала информации о клиентах.
ErrorHandlingTest.java
package com.spijb.invoker;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.mockrunner.mock.jms.MockQueue;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:applicationContext-file-jms.xml"})
public class ErrorHandlingTest {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private MockQueue inBoundQueue;
@Test
public void shouldSendCustomerToWebServiceActivator() throws InterruptedException
{
final StringBuffer sb = new StringBuffer();
sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>")
.append("<customer id=\"10001\">")
.append("<age>35</age>")
.append("<name>Upender</name>")
.append("</customer>");
jmsTemplate.send(inBoundQueue,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText(sb.toString());
return textMessage;
}
});
Thread.sleep(3000);
}
@Test
public void shouldSendMessageErrorLogActivator() throws InterruptedException
{
final StringBuffer sb = new StringBuffer();
sb.append("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>")
.append("<customer id=\"10001\">")
.append("<age>a3<age>")
.append("<name>Upender</name>")
.append("</customer>");
jmsTemplate.send(inBoundQueue,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText(sb.toString());
return textMessage;
}
});
Thread.sleep(3000);
}
}
ErrorHandlerTest: у тестового интеграционного класса Spring есть два метода: один метод shouldSendCustomerToWebServiceActivator, который отправит полный сценарий успеха, а другой метод shouldSendMessageErrorLogActivator отправит недопустимое сообщение xml и потерпит неудачу при немаршальном xml объекту java.
Неудачный вывод:
12:58:51.798 [task-scheduler-4] ERROR c.spijb.seriveactivator.ErrorLogger - Exception org.springframework.integration.MessageHandlingException: org.springframework.expression.spel.SpelEvaluationException: EL1004E:(pos 8): Method call: Method handleCustomer(org.springframework.oxm.UnmarshallingFailureException) cannot be found on com.spijb.seriveactivator.CustomerServiceActivator type
Результат на выходе:
13:00:47.245 [task-scheduler-5] INFO c.s.t.CustomerMessageTransformer - Message Received <?xml version="1.0" encoding="UTF-8" standalone="yes"?><customer id="10001"><age>35</age><name>Upender</name></customer>
13:00:47.277 [task-scheduler-6] INFO c.s.s.CustomerServiceActivator - Customer information received and invoke some services here....
Name Upender
Id: 10001
Age 35
Прикрепленный файл maven pom.xml для всех зависимостей.