Статьи

Apache Camel с весны, маршрутизация с обогащением

Извините за то, что ничего не опубликовал в какое-то время, но у меня было много работы. В любом случае сегодня я продолжу пример с JMS, который я показал вам некоторое время назад.

Идея предыдущего примера состояла в том, чтобы упростить работу, которую мы должны были выполнить вручную — мы создали контейнеры JmsTemplate и Spring listener. Маршрут как таковой, к сожалению, все еще был сделан нами. Для облегчения этого процесса мы можем использовать Apache Camel .

Следующий пример основан на том, что мы видели в этом посте,  Spring JMS, автоматическое преобразование сообщений, шаблон JMS,  но с небольшими изменениями:

CamelRouter.java

package pl.grzejszczak.marcin.camel;

import org.apache.camel.spring.Main;

public class CamelRouter {

 /**
  * @param args
  * @throws Exception
  */
 public static void main(String[] args) throws Exception {
  Main main = new Main();
  main.setApplicationContextUri("/camel/camelContext.xml");
  main.run(args);
 }

}

Здесь мы видим использование класса Main Camel, который вы можете использовать для более легкой загрузки Camel и продолжения его работы до завершения JVM.

Затем у нас есть новый файл camelContext.xml, в котором у нас есть контекст Camel в файле конфигурации Spring.

camelContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsd ">

 <import resource="classpath:/camel/jmsApplicationContext.xml" />

 <camel:camelContext id="camel" xmlns:camel="http://camel.apache.org/schema/spring">
  <camel:dataFormats>
   <camel:jaxb id="jaxb" prettyPrint="true" contextPath="pl.grzejszczak.marcin.camel.jaxb.generated" />
  </camel:dataFormats>

  <camel:route>
   <camel:from uri="activemq:topic:Initial.Topic" />
   <camel:unmarshal ref="jaxb" />
   <camel:bean ref="enrichingService" />
   <camel:marshal ref="jaxb" />
   <camel:to uri="activemq:topic:Routed.Topic" />
  </camel:route>

 </camel:camelContext>
</beans>

Мы определяем в этом файле для создания Camel Route — от темы activemq с именем Initial.Topic до темы с именем Routed.Topic . В то же время мы делаем несколько демаршалинг и маршаллинг с помощью Jaxb.

В jmsApplicationContext мы больше не определяем отправителя последней темы: Routed.Topic .

jmsApplicationContext.java

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
 xmlns:jms="http://www.springframework.org/schema/jms" xmlns:oxm="http://www.springframework.org/schema/oxm"
 xsi:schemaLocation="http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms-3.0.xsdhttp://www.springframework.org/schema/oxmhttp://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">

 <!-- Spring configuration based on annotations -->
 <context:annotation-config />
 <!-- Show Spring where to search for the beans (in which packages) -->
 <context:component-scan base-package="pl.grzejszczak.marcin.camel" />

 <!-- Show Spring where to search for the properties files -->
 <context:property-placeholder location="classpath:/camel/jms.properties" />

 <!-- The ActiveMQ connection factory with specification of the server URL -->
 <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616" />
 </bean>

 <!-- Spring's jms connection factory -->
 <bean id="cachingConnectionFactory"
  class="org.springframework.jms.connection.CachingConnectionFactory">
  <property name="targetConnectionFactory" ref="activeMQConnectionFactory" />
  <property name="sessionCacheSize" value="10" />
 </bean>

 <!-- The name of the queue from which we will take the messages -->
 <bean id="origin" class="org.apache.activemq.command.ActiveMQTopic">
  <constructor-arg value="${jms.origin}" />
 </bean>
 <!-- The name of the queue to which we will route the messages -->
 <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
  <constructor-arg value="${jms.destination}" />
 </bean>

 <!-- Configuration of the JmsTemplate together with the connection factory and the message converter -->
 <bean id="producerTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="cachingConnectionFactory" />
  <property name="messageConverter" ref="oxmMessageConverter" />
 </bean>

 <!-- Custom message sender sending messages to the initial queue -->
 <bean id="originPlayerSender" class="pl.grzejszczak.marcin.camel.manual.jms.PlayerDetailsSenderImpl">
  <property name="destination" ref="origin" />
 </bean>

 <!-- Custom message listener - listens to the destination queue  -->
 <bean id="destinationListenerImpl" class="pl.grzejszczak.marcin.camel.manual.jms.FinalListenerImpl"/>


 <!-- Spring's jms message listener container - specified the connection factory, the queue to be listened to and the component that listens to the queue -->
 <bean id="jmsDestinationContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="cachingConnectionFactory" />
  <property name="destination" ref="destination" />
  <property name="messageListener" ref="destinationListenerImpl" />
 </bean>

 <!-- Message converter - automatically marshalls and unmarshalls messages using the provided marshaller / unmarshaller-->
 <bean id="oxmMessageConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter">
          <property name="marshaller" ref="marshaller" />
          <property name="unmarshaller" ref="marshaller" />
     </bean>

 <bean id="enrichingService" class="pl.grzejszczak.marcin.camel.service.EnrichingServiceImpl"/>

 <!-- Spring's JAXB implementation of marshaller - provided a class the JAXB generated class -->
     <oxm:jaxb2-marshaller id="marshaller">
          <oxm:class-to-be-bound name="pl.grzejszczak.marcin.camel.jaxb.generated.PlayerDetails" />
     </oxm:jaxb2-marshaller>

</beans>

Как только мы уже инициализировали наш верблюжий контекст, нам нужно отправить сообщение в Initial.Topic . Мы делаем это с помощью нашего модифицированного класса ActiveMQRouter.

ActiveMQRouter.java

package pl.grzejszczak.marcin.camel.manual;

import java.io.File;
import java.util.Scanner;

import javax.jms.JMSException;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

import pl.grzejszczak.marcin.camel.jaxb.PlayerDetailsConverter;
import pl.grzejszczak.marcin.camel.jaxb.generated.PlayerDetails;
import pl.grzejszczak.marcin.camel.manual.jms.FinalListenerImpl;
import pl.grzejszczak.marcin.camel.manual.jms.Sender;

public class ActiveMQRouter {

 /**
  * @param args
  * @throws JMSException
  */
 public static void main(String[] args) throws Exception {
  ApplicationContext context = new ClassPathXmlApplicationContext("/camel/jmsApplicationContext.xml");
  @SuppressWarnings("unchecked")
  Sender<PlayerDetails> sender = (Sender<PlayerDetails>) context.getBean("originPlayerSender");

  Resource resource = new ClassPathResource("/camel/RobertLewandowski.xml");

  Scanner scanner = new Scanner(new File(resource.getURI())).useDelimiter("\\Z");
  String contents = scanner.next();

  PlayerDetailsConverter converter = context.getBean(PlayerDetailsConverter.class);

  FinalListenerImpl listener = (FinalListenerImpl) context.getBean("finalListenerImpl");

  sender.sendMessage(converter.unmarshal(contents));
 }
}

Класс читает файл и отправляет его в начальную тему. Мы также инициализируем FinalListenerImpl — класс, который будет прослушивать сообщения, поступающие в последнюю тему — чтобы доказать, что все работает правильно.

Это оно! Теперь давайте проверим логи. Логи CamelRouter:

2012-11-22 22:51:39,429 INFO  [main] org.apache.camel.main.MainSupport:300 Apache Camel 2.9.2 starting
2012-11-22 22:51:40,028 INFO  [main] org.springframework.context.support.ClassPathXmlApplicationContext:495 Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@4c5e176f: startup date [Thu Nov 22 22:51:40 CET 2012]; root of context hierarchy
2012-11-22 22:51:40,213 INFO  [main] org.springframework.beans.factory.xml.XmlBeanDefinitionReader:315 Loading XML bean definitions from class path resource [camel/camelContext.xml]
2012-11-22 22:51:40,746 INFO  [main] org.springframework.beans.factory.xml.XmlBeanDefinitionReader:315 Loading XML bean definitions from class path resource [camel/jmsApplicationContext.xml]
2012-11-22 22:51:41,120 INFO  [main] org.springframework.context.annotation.ClassPathBeanDefinitionScanner:210 JSR-330 'javax.inject.Named' annotation found and supported for component scanning
2012-11-22 22:51:43,219 INFO  [main] org.springframework.beans.factory.config.PropertyPlaceholderConfigurer:177 Loading properties file from class path resource [camel/jms.properties]
2012-11-22 22:51:43,233 INFO  [main] org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor:139 JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
2012-11-22 22:51:43,274 INFO  [main] org.springframework.beans.factory.support.DefaultListableBeanFactory:557 Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@19d03a4e: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,AgeEnricher,ClubEnricher,PlayerDetailsConverter,finalListenerImpl,playerDetailsSenderImpl,org.springframework.beans.factory.config.PropertyPlaceholderConfigurer#0,activeMQConnectionFactory,cachingConnectionFactory,origin,destination,producerTemplate,originPlayerSender,destinationListenerImpl,jmsDestinationContainer,oxmMessageConverter,enrichingService,marshaller,template,consumerTemplate,camel:beanPostProcessor,camel,org.springframework.context.annotation.ConfigurationClassPostProcessor$ImportAwareBeanPostProcessor#0]; root of factory hierarchy
2012-11-22 22:51:43,424 INFO  [main] org.springframework.oxm.jaxb.Jaxb2Marshaller:436 Creating JAXBContext with classes to be bound [class pl.grzejszczak.marcin.camel.jaxb.generated.PlayerDetails]
2012-11-22 22:51:44,521 INFO  [main] org.springframework.context.support.DefaultLifecycleProcessor:334 Starting beans in phase 2147483647
2012-11-22 22:51:45,061 INFO  [main] org.springframework.jms.connection.CachingConnectionFactory:291 Established shared JMS Connection: ActiveMQConnection {id=ID:marcin-SR700-47684-1353621104666-1:1,clientId=null,started=false}
2012-11-22 22:51:45,608 INFO  [main] org.apache.camel.spring.SpringCamelContext:1374 Apache Camel 2.9.2 (CamelContext: camel) is starting
2012-11-22 22:51:45,611 INFO  [main] org.apache.camel.management.ManagementStrategyFactory:38 JMX enabled. Using ManagedManagementStrategy.
2012-11-22 22:51:45,850 INFO  [main] org.apache.camel.management.DefaultManagementLifecycleStrategy:790 StatisticsLevel at All so enabling load performance statistics
2012-11-22 22:51:45,961 INFO  [main] org.apache.camel.impl.converter.AnnotationTypeConverterLoader:119 Found 3 packages with 15 @Converter classes to load
2012-11-22 22:51:45,995 INFO  [main] org.apache.camel.impl.converter.DefaultTypeConverter:405 Loaded 170 core type converters (total 170 type converters)
2012-11-22 22:51:46,002 INFO  [main] org.apache.camel.impl.converter.AnnotationTypeConverterLoader:109 Loaded 2 @Converter classes
2012-11-22 22:51:46,023 INFO  [main] org.apache.camel.impl.converter.AnnotationTypeConverterLoader:119 Found 1 packages with 1 @Converter classes to load
2012-11-22 22:51:46,024 WARN  [main] org.apache.camel.impl.converter.DefaultTypeConverter:257 Overriding type converter from: StaticMethodTypeConverter: public static org.apache.activemq.command.ActiveMQDestination org.apache.camel.component.activemq.ActiveMQConverter.toDestination(java.lang.String) to: StaticMethodTypeConverter: public static org.apache.activemq.command.ActiveMQDestination org.apache.activemq.camel.converter.ActiveMQConverter.toDestination(java.lang.String)
2012-11-22 22:51:46,043 INFO  [main] org.apache.camel.impl.converter.DefaultTypeConverter:431 Loaded additional 3 type converters (total 173 type converters) in 0.041 seconds
2012-11-22 22:51:46,360 INFO  [main] org.apache.camel.converter.jaxb.JaxbDataFormat:277 Creating JAXBContext with contextPath: pl.grzejszczak.marcin.camel.jaxb.generated and ApplicationContextClassLoader: sun.misc.Launcher$AppClassLoader@35a16869
2012-11-22 22:51:46,500 INFO  [main] org.apache.camel.spring.SpringCamelContext:1980 Route: route1 started and consuming from: Endpoint[activemq://topic:Initial.Topic]
2012-11-22 22:51:46,509 INFO  [main] org.apache.camel.spring.SpringCamelContext:1409 Total 1 routes, of which 1 is started.
2012-11-22 22:51:46,510 INFO  [main] org.apache.camel.spring.SpringCamelContext:1410 Apache Camel 2.9.2 (CamelContext: camel) started in 0.901 seconds
2012-11-22 22:51:46,519 INFO  [main] org.springframework.context.support.DefaultLifecycleProcessor:334 Starting beans in phase 2147483647
2012-11-22 22:52:08,375 DEBUG [Camel (camel) thread #1 - JmsConsumer[Initial.Topic]] pl.grzejszczak.marcin.camel.service.EnrichingServiceImpl:21 Enriching player details
2012-11-22 22:52:08,377 DEBUG [Camel (camel) thread #1 - JmsConsumer[Initial.Topic]] pl.grzejszczak.marcin.camel.enricher.AgeEnricher:17 Enriching player [Lewandowski] with age data
2012-11-22 22:52:10,379 DEBUG [Camel (camel) thread #1 - JmsConsumer[Initial.Topic]] pl.grzejszczak.marcin.camel.enricher.ClubEnricher:16 Enriching player [Lewandowski] with club data
2012-11-22 22:52:12,462 DEBUG [jmsDestinationContainer-1] pl.grzejszczak.marcin.camel.manual.jms.FinalListenerImpl:35 Message already enriched! Shutting down the system

Мы можем видеть, что верблюжий контекст был инициализирован, а затем компонент, созданный нами в jmsApplicationContext.xml, который прослушивает конечный пункт назначения, подтверждает, что сообщение было обогащено должным образом.

А как насчет логов ActiveMQRouter.java?

2012-11-22 22:52:06,077 INFO  [main] org.springframework.context.support.ClassPathXmlApplicationContext:495 Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@43462851: startup date [Thu Nov 22 22:52:06 CET 2012]; root of context hierarchy
2012-11-22 22:52:06,153 INFO  [main] org.springframework.beans.factory.xml.XmlBeanDefinitionReader:315 Loading XML bean definitions from class path resource [camel/jmsApplicationContext.xml]
2012-11-22 22:52:06,417 INFO  [main] org.springframework.context.annotation.ClassPathBeanDefinitionScanner:210 JSR-330 'javax.inject.Named' annotation found and supported for component scanning
2012-11-22 22:52:06,721 INFO  [main] org.springframework.beans.factory.config.PropertyPlaceholderConfigurer:177 Loading properties file from class path resource [camel/jms.properties]
2012-11-22 22:52:06,733 INFO  [main] org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor:139 JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
2012-11-22 22:52:06,758 INFO  [main] org.springframework.beans.factory.support.DefaultListableBeanFactory:557 Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@362f0d54: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,AgeEnricher,ClubEnricher,PlayerDetailsConverter,finalListenerImpl,playerDetailsSenderImpl,org.springframework.beans.factory.config.PropertyPlaceholderConfigurer#0,activeMQConnectionFactory,cachingConnectionFactory,origin,destination,producerTemplate,originPlayerSender,destinationListenerImpl,jmsDestinationContainer,oxmMessageConverter,enrichingService,marshaller,org.springframework.context.annotation.ConfigurationClassPostProcessor$ImportAwareBeanPostProcessor#0]; root of factory hierarchy
2012-11-22 22:52:07,224 INFO  [main] org.springframework.oxm.jaxb.Jaxb2Marshaller:436 Creating JAXBContext with classes to be bound [class pl.grzejszczak.marcin.camel.jaxb.generated.PlayerDetails]
2012-11-22 22:52:07,628 INFO  [main] org.springframework.context.support.DefaultLifecycleProcessor:334 Starting beans in phase 2147483647
2012-11-22 22:52:07,883 INFO  [main] org.springframework.jms.connection.CachingConnectionFactory:291 Established shared JMS Connection: ActiveMQConnection {id=ID:marcin-SR700-53586-1353621127755-1:1,clientId=null,started=false}
2012-11-22 22:52:08,093 DEBUG [main] pl.grzejszczak.marcin.camel.manual.jms.PlayerDetailsSenderImpl:26 Sending [pl.grzejszczak.marcin.camel.jaxb.generated.PlayerDetails@3ea86d12] to topic [topic://Initial.Topic]
2012-11-22 22:52:12,463 DEBUG [jmsDestinationContainer-1] pl.grzejszczak.marcin.camel.manual.jms.FinalListenerImpl:35 Message already enriched! Shutting down the system

Сначала мы видим, что наш весенний контекст был инициализирован, а затем мы видим, что сообщение было отправлено в Initial.Topic . В конце обработки мы видим, что слушатель подтверждает, что сообщение было правильно обогащено, поэтому вся работа с верблюдом была выполнена надлежащим образом.

Этот пример показывает, насколько легко и просто можно создать сервис маршрутизации / обогащения с помощью Spring и Camel (интегрированный с Spring).

Источники доступны в репозитории Too Much Coding на bitbucket .