Эта статья является частью нашего курса Академии под названием Spring Integration для EAI .
В этом курсе вы познакомитесь с шаблонами интеграции корпоративных приложений и с тем, как Spring Integration обращается к ним. Далее вы углубитесь в основы Spring Integration, такие как каналы, преобразователи и адаптеры. Проверьте это здесь !
Содержание
1. Введение
Поэкспериментировав с основными компонентами Spring Integration и увидев, как он хорошо интегрируется с другими системами, такими как JMS-очереди или веб-службы, эта глава заканчивает курс, демонстрируя различные механизмы мониторинга или сбора дополнительной информации о том, что происходит в системе обмена сообщениями. система.
Некоторые из этих механизмов состоят из управления или мониторинга приложения через MBeans, которые являются частью спецификации JMX. Мы также узнаем, как отслеживать сообщения, чтобы видеть, какие компоненты были задействованы во время обмена сообщениями, и как сохранять сообщения для компонентов, которые имеют возможность буферизовать сообщения.
Другой механизм, обсуждаемый в этой главе, заключается в том, как мы реализуем шаблон идемпотентного приемника EIP с использованием хранилища метаданных.
Наконец, последний описанный механизм — это управляющая шина. Это позволит нам отправлять сообщения, которые будут запускать операции над компонентами в контексте приложения.
2. Публикация и получение уведомлений JMX
Спецификация JMX определяет механизм, который позволяет MBeans публиковать уведомления, которые будут отправлены другим MBeans или приложению управления. Документация Oracle объясняет, как реализовать этот механизм.
Spring Integration поддерживает эту функцию, предоставляя адаптеры каналов, которые могут публиковать и получать уведомления JMX. Мы рассмотрим пример, в котором используются оба канальных адаптера:
- Адаптер канала прослушивания уведомлений
- Адаптер канала публикации уведомлений
2.1. Публикация уведомления JMX
В первой части примера система обмена сообщениями получает сообщение String (сообщение с полезной нагрузкой типа String) через свой входной шлюз. Затем он использует активатор службы (обработчик уведомлений) для создания javax.management.Notification
и отправляет его адаптеру канала публикации уведомлений, который опубликует уведомление JMX.
Ход этой первой части показан ниже:
Конфигурация xml эквивалентна предыдущему графику:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
< context:component-scan base-package = "xpadro.spring.integration.jmx.notification" /> < context:mbean-export /> < context:mbean-server /> <!-- Sending Notifications --> < int:gateway service-interface = "xpadro.spring.integration.jmx.notification.JmxNotificationGateway" default-request-channel = "entryChannel" /> < int:channel id = "entryChannel" /> < int:service-activator input-channel = "entryChannel" output-channel = "sendNotificationChannel" ref = "notificationHandler" method = "buildNotification" /> < int:channel id = "sendNotificationChannel" /> < int-jmx:notification-publishing-channel-adapter channel = "sendNotificationChannel" object-name = "xpadro.spring.integration.jmx.adapter:type=integrationMBean,name=integrationMbean" /> |
Шлюз так же прост, как и в предыдущих примерах. Помните, что аннотация @Gateway
не нужна, если у вас есть только один метод:
1
2
3
4
|
public interface JmxNotificationGateway { public void send(String type); } |
Message
достигнет активатора службы, который создаст сообщение с уведомлением JMX:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
@Component ( "notificationHandler" ) public class NotificationHandler { private Logger logger = LoggerFactory.getLogger( this .getClass()); private static final String NOTIFICATION_TYPE_HEADER = "jmx_notificationType" ; public void receive(Message<Notification> msg) { logger.info( "Notification received: {}" , msg.getPayload().getType()); } public Message<Notification> buildNotification(Message<String> msg) { Notification notification = new Notification(msg.getPayload(), this , 0 ); return MessageBuilder.withPayload(notification) .copyHeadersIfAbsent(msg.getHeaders()).setHeader(NOTIFICATION_TYPE_HEADER, "myJmxNotification" ).build(); } } |
Обратите внимание, что мы установили новый заголовок. Это необходимо для предоставления типа уведомления, иначе адаптер JMX сгенерирует IllegalArgumentException
с сообщением «Нет доступного заголовка типа уведомления и не предоставлено значение по умолчанию».
Наконец, нам просто нужно вернуть сообщение, чтобы отправить его в адаптер публикации. Остальное обрабатывается Spring Integration.
2.2. Получение уведомления JMX
Вторая часть потока состоит в адаптере канала прослушивания уведомлений, который получит наше ранее опубликованное уведомление.
Конфигурация xml:
1
2
3
4
5
6
7
8
|
<!-- Receiving Notifications --> < int-jmx:notification-listening-channel-adapter channel = "receiveNotificationChannel" object-name = "xpadro.spring.integration.jmx.adapter:type=integrationMBean,name=integrationMbean" /> < int:channel id = "receiveNotificationChannel" /> < int:service-activator input-channel = "receiveNotificationChannel" ref = "notificationHandler" method = "receive" /> |
Мы просто получим уведомление и зарегистрируем его:
1
2
3
|
public void receive(Message<Notification> msg) { logger.info( "Notification received: {}" , msg.getPayload().getType()); } |
Приложение, которое запускает пример:
01
02
03
04
05
06
07
08
09
10
|
public class NotificationApp { public static void main(String[] args) throws InterruptedException { AbstractApplicationContext context = new ClassPathXmlApplicationContext( "classpath:xpadro/spring/integration/jmx/config/int-notification-config.xml" ); JmxNotificationGateway gateway = context.getBean(JmxNotificationGateway. class ); gateway.send( "gatewayNotification" ); Thread.sleep( 1000 ); context.close(); } } |
3. Опрос управляемых атрибутов из MBean
Представьте, что у нас есть MBean
который отслеживает некоторые функции. С помощью адаптера канала опроса атрибутов ваше приложение сможет опрашивать MBean
и получать обновленные данные.
Я реализовал MBean
который генерирует случайное число при каждом запросе. Не самая важная особенность, но послужит нам примером:
01
02
03
04
05
06
07
08
09
10
11
|
@Component ( "pollingMbean" ) @ManagedResource public class JmxPollingMBean { @ManagedAttribute public int getNumber() { Random rnd = new Random(); int randomNum = rnd.nextInt( 100 ); return randomNum; } } |
Поток не может быть проще; нам нужен адаптер канала опроса атрибутов, определяющий тип и имя нашего MBean
. Адаптер MBean
и поместит результат в канал результатов. Каждый результат опроса будет отображаться на консоли через адаптер канала потока stdout:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
< context:component-scan base-package = "xpadro.spring.integration.jmx.polling" /> < context:mbean-export /> < context:mbean-server /> <!-- Polling --> < int-jmx:attribute-polling-channel-adapter channel = "resultChannel" object-name = "xpadro.spring.integration.jmx.polling:type=JmxPollingMBean,name=pollingMbean" attribute-name = "Number" > < int:poller max-messages-per-poll = "1" fixed-delay = "1000" /> </ int-jmx:attribute-polling-channel-adapter > < int:channel id = "resultChannel" /> < int-stream:stdout-channel-adapter channel = "resultChannel" append-newline = "true" /> |
Приложение, которое запускает пример:
1
2
3
4
5
6
7
8
|
public class PollingApp { public static void main(String[] args) throws InterruptedException { AbstractApplicationContext context = new ClassPathXmlApplicationContext( "classpath:xpadro/spring/integration/jmx/config/int-polling-config.xml" ); context.registerShutdownHook(); Thread.sleep( 5000 ); context.close(); } } |
И вывод консоли:
1
2
3
4
5
6
7
|
2014-04-16 16:23:43,867|AbstractEndpoint|started org.springframework.integration.config.ConsumerEndpointFactoryBean #0 82 72 20 47 21 2014-04-16 16:23:48,878|AbstractApplicationContext|Closing org.springframework.context.support.ClassPathXmlApplicationContext@7283922 |
4. Вызов операций MBean
Следующий механизм позволяет нам вызывать операцию MBean
. Мы собираемся реализовать еще один bean-компонент, содержащий единственную операцию, наш старый привет-мир:
1
2
3
4
5
6
7
8
9
|
@Component ( "operationMbean" ) @ManagedResource public class JmxOperationMBean { @ManagedOperation public String hello(String name) { return "Hello " + name; } } |
Теперь мы можем использовать адаптер канала, если операция не возвращает результат, или шлюз, если это так. В следующей конфигурации XML мы экспортируем MBean
и используем шлюз для вызова операции и ожидания результата:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
< context:component-scan base-package = "xpadro.spring.integration.jmx.operation" /> < context:mbean-export /> < context:mbean-server /> < int:gateway service-interface = "xpadro.spring.integration.jmx.operation.JmxOperationGateway" default-request-channel = "entryChannel" /> < int-jmx:operation-invoking-outbound-gateway request-channel = "entryChannel" reply-channel = "replyChannel" object-name = "xpadro.spring.integration.jmx.operation:type=JmxOperationMBean,name=operationMbean" operation-name = "hello" /> < int:channel id = "replyChannel" /> < int-stream:stdout-channel-adapter channel = "replyChannel" append-newline = "true" /> |
Чтобы работать, нам нужно указать тип и имя MBean
и операцию, которую мы хотим вызвать. Результат будет отправлен адаптеру потокового канала для отображения на консоли.
Приложение, которое запускает пример:
01
02
03
04
05
06
07
08
09
10
|
public class OperationApp { public static void main(String[] args) throws InterruptedException { AbstractApplicationContext context = new ClassPathXmlApplicationContext( "classpath:xpadro/spring/integration/jmx/config/int-operation-config.xml" ); JmxOperationGateway gateway = context.getBean(JmxOperationGateway. class ); gateway.hello( "World" ); Thread.sleep( 1000 ); context.close(); } } |
5. Экспорт компонентов в MBeans
Этот компонент используется для экспорта каналов сообщений, обработчиков сообщений и конечных точек сообщений как MBean, чтобы вы могли отслеживать их.
Вам необходимо добавить следующую конфигурацию в ваше приложение:
1
2
3
4
5
6
|
< int-jmx:mbean-export id = "integrationMBeanExporter" default-domain = "xpadro.integration.exporter" server = "mbeanServer" /> < bean id = "mbeanServer" class = "org.springframework.jmx.support.MBeanServerFactoryBean" > < property name = "locateExistingServerIfPossible" value = "true" /> </ bean > |
И установите следующие аргументы VM, как описано в документации Spring:
1
2
3
4
|
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=6969 -Dcom.sun.management.jmxremote.ssl= false -Dcom.sun.management.jmxremote.authenticate= false |
Приложение, которое запускает пример, отправляет три сообщения:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
public class ExporterApp { public static void main(String[] args) throws InterruptedException { AbstractApplicationContext context = new ClassPathXmlApplicationContext( "classpath:xpadro/spring/integration/jmx/config/int-exporter-config.xml" ); context.registerShutdownHook(); JmxExporterGateway gateway = context.getBean(JmxExporterGateway. class ); gateway.sendMessage( "message 1" ); Thread.sleep( 500 ); gateway.sendMessage( "message 2" ); Thread.sleep( 500 ); gateway.sendMessage( "message 3" ); } } |
После запуска приложения вы можете увидеть информацию о компонентах. Следующий снимок экрана сделан на JConsole:
Вы можете заметить, что атрибут sendCount
канала входа имеет значение 3, потому что в нашем примере мы отправили три сообщения.
6. Конечный путь сообщения
В системе обмена сообщениями компоненты слабо связаны. Это означает, что отправляющему компоненту не нужно знать, кто получит сообщение. И наоборот, получатель просто интересуется полученным сообщением, а не тем, кто его отправил. Это преимущество может быть не очень хорошим, когда нам нужно отладить приложение.
История сообщения заключается в прикреплении к сообщению списка всех компонентов, через которые прошло сообщение.
Следующее приложение протестирует эту функцию, отправив сообщение через несколько компонентов:
Ключевой элемент конфигурации не виден на предыдущем рисунке: элемент message-history
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
< context:component-scan base-package = "xpadro.spring.integration.msg.history" /> < int:message-history /> < int:gateway id = "historyGateway" service-interface = "xpadro.spring.integration.msg.history.HistoryGateway" default-request-channel = "entryChannel" /> < int:channel id = "entryChannel" /> < int:transformer id = "msgTransformer" input-channel = "entryChannel" expression = "payload + 'transformed'" output-channel = "transformedChannel" /> < int:channel id = "transformedChannel" /> < int:service-activator input-channel = "transformedChannel" ref = "historyActivator" /> |
С этой установленной конфигурацией активатор службы в конце потока сообщений сможет получить список посещенных компонентов, посмотрев на заголовок сообщения:
01
02
03
04
05
06
07
08
09
10
11
|
@Component ( "historyActivator" ) public class HistoryActivator { private Logger logger = LoggerFactory.getLogger( this .getClass()); public void handle(Message<String> msg) { MessageHistory msgHistory = msg.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory. class ); if (msgHistory != null ) { logger.info( "Components visited: {}" , msgHistory.toString()); } } } |
Приложение, выполняющее этот пример:
01
02
03
04
05
06
07
08
09
10
|
public class MsgHistoryApp { public static void main(String[] args) throws InterruptedException { AbstractApplicationContext context = new ClassPathXmlApplicationContext( "classpath:xpadro/spring/integration/msg/history/config/int-msg-history-config.xml" ); HistoryGateway gateway = context.getBean(HistoryGateway. class ); gateway.send( "myTest" ); Thread.sleep( 1000 ); context.close(); } } |
Результат будет показан на консоли:
1
|
2014-04-16 17:34:52,551|HistoryActivator|Components visited: historyGateway,entryChannel,msgTransformer,transformedChannel |
7. Сохранение буферизованных сообщений
Некоторые компоненты Spring Integration могут буферизовать сообщения. Например, канал очереди будет буферизировать сообщения до тех пор, пока потребители не получат их из него. Другой пример — конечная точка агрегатора; как видно из второго руководства, эта конечная точка будет собирать сообщения до тех пор, пока группа не завершит свое решение, основываясь на стратегии выпуска.
Эти шаблоны интеграции подразумевают, что в случае сбоя буферизованные сообщения могут быть потеряны. Чтобы предотвратить это, мы можем сохранить эти сообщения, например, сохранить их в базе данных. По умолчанию Spring Integration сохраняет эти сообщения в памяти. Мы собираемся изменить это, используя хранилище сообщений .
В нашем примере мы будем хранить эти сообщения в базе данных MongoDB . Для этого нам просто необходима следующая конфигурация:
01
02
03
04
05
06
07
08
09
10
|
< bean id = "mongoDbFactory" class = "org.springframework.data.mongodb.core.SimpleMongoDbFactory" > < constructor-arg > < bean class = "com.mongodb.Mongo" /> </ constructor-arg > < constructor-arg value = "jcgdb" /> </ bean > < bean id = "mongoDbMessageStore" class = "org.springframework.integration.mongodb.store.ConfigurableMongoDbMessageStore" > < constructor-arg ref = "mongoDbFactory" /> </ bean > |
Теперь мы собираемся создать приложение для тестирования этой функции. Я реализовал поток, который получает через шлюз сообщение с полезной нагрузкой String. Это сообщение отправляется шлюзом в канал очереди, который буферизует сообщения до тех пор, пока активатор службы msgStoreActivator
его из очереди. Активатор службы будет опрашивать сообщения каждые пять секунд:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
< context:component-scan base-package = "xpadro.spring.integration.msg.store" /> < import resource = "mongodb-config.xml" /> < int:gateway id = "storeGateway" service-interface = "xpadro.spring.integration.msg.store.MsgStoreGateway" default-request-channel = "entryChannel" /> < int:channel id = "entryChannel" > < int:queue message-store = "myMessageStore" /> </ int:channel > < int:service-activator input-channel = "entryChannel" ref = "msgStoreActivator" > < int:poller fixed-rate = "5000" /> </ int:service-activator > |
Возможно, вы заметили myMessageStore
компонент myMessageStore
. Чтобы увидеть, как работает механизм постоянных сообщений, я расширил класс ConfigurableMongoDBMessageStore
чтобы в него помещались журналы и отлаживался результат. Если вы хотите попробовать это, вы можете удалить bean-компонент MongoDB messageStore в mongodb-config.xml
так как мы его больше не используем.
Я переписал два метода:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Component ( "myMessageStore" ) public class MyMessageStore extends ConfigurableMongoDbMessageStore { private Logger logger = LoggerFactory.getLogger( this .getClass()); private static final String STORE_COLLECTION_NAME = "messageStoreCollection" ; @Autowired public MyMessageStore(MongoDbFactory mongoDbFactory) { super (mongoDbFactory, STORE_COLLECTION_NAME); logger.info( "Creating message store '{}'" , STORE_COLLECTION_NAME); } @Override public MessageGroup addMessageToGroup(Object groupId, Message<?> message) { logger.info( "Adding message '{}' to group '{}'" , message.getPayload(), groupId); return super .addMessageToGroup(groupId, message); } @Override public Message<?> pollMessageFromGroup(Object groupId) { Message<?> msg = super .pollMessageFromGroup(groupId); if (msg != null ) { logger.info( "polling message '{}' from group '{}'" , msg.getPayload(), groupId); } else { logger.info( "Polling null message from group {}" , groupId); } return msg; } } |
Этот механизм работает следующим образом:
- Когда сообщение достигает канала очереди, для которого настроено наше хранилище сообщений, оно вызывает метод addMessageToGroup. Этот метод вставит документ с полезной нагрузкой в коллекцию MongoDB, указанную в конструкторе. Это делается с помощью
MongoTemplate
. - Когда потребитель опрашивает сообщение, будет вызываться
pollMessageFromGroup
, получая документ из коллекции.
Давайте посмотрим, как это работает путем отладки кода. Мы остановимся непосредственно перед опросом сообщения, чтобы увидеть, как оно хранится в базе данных:
На данный момент мы можем взглянуть на базу данных:
После возобновления сообщение опрашивается из коллекции:
Приложение, которое запускает пример:
01
02
03
04
05
06
07
08
09
10
11
|
public class MsgStoreApp { public static void main(String[] args) throws InterruptedException { AbstractApplicationContext context = new ClassPathXmlApplicationContext( "classpath:xpadro/spring/integration/msg/store/config/int-msg-store-config.xml" ); MsgStoreGateway gateway = context.getBean(MsgStoreGateway. class ); gateway.send( "myMessage" ); Thread.sleep( 30000 ); context.close(); } } |
8. Реализация идемпотентных компонентов
Если нашему приложению необходимо избежать дублирования сообщений, Spring Integration предоставляет этот механизм путем реализации шаблона идемпотентного приемника . Ответственность за обнаружение дублирующихся сообщений несет компонент хранилища метаданных. Этот компонент состоит в хранении пар ключ-значение. Фреймворк предоставляет две реализации интерфейса MetadataStore
:
- SimpleMetadataStore : реализация по умолчанию. Он хранит информацию, используя карту в памяти.
- PropertiesPersistingMetadataStore : полезно, если вам нужно сохранить данные. Он использует файл свойств. Мы собираемся использовать эту реализацию в нашем примере.
Хорошо, давайте начнем с файла конфигурации:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
< context:component-scan base-package = "xpadro.spring.integration.msg.metadata" /> < bean id = "metadataStore" class = "org.springframework.integration.metadata.PropertiesPersistingMetadataStore" /> < int:gateway id = "metadataGateway" service-interface = "xpadro.spring.integration.msg.metadata.MetadataGateway" default-request-channel = "entryChannel" /> < int:channel id = "entryChannel" /> < int:filter input-channel = "entryChannel" output-channel = "processChannel" discard-channel = "discardChannel" expression = "@metadataStore.get(headers.messageId) == null" /> <!-- Process message --> < int:publish-subscribe-channel id = "processChannel" /> < int:outbound-channel-adapter channel = "processChannel" expression = "@metadataStore.put(headers.messageId, '')" /> < int:service-activator input-channel = "processChannel" ref = "metadataActivator" method = "process" /> <!-- Duplicated message - discard it --> < int:channel id = "discardChannel" /> < int:service-activator input-channel = "discardChannel" ref = "metadataActivator" method = "discard" /> |
Мы определили «хранилище метаданных», чтобы использовать хранилище метаданных наших свойств вместо реализации по умолчанию в памяти.
Поток объясняется здесь:
- Сообщение отправляется на шлюз.
- Фильтр отправит сообщение в канал процесса, поскольку оно отправляется впервые.
- Канал процесса имеет два подписчика: активатор службы, который обрабатывает сообщение, и адаптер исходящего канала. Адаптер канала отправит значение заголовка сообщения
messagId
в хранилище метаданных. - Хранилище метаданных сохраняет значение в файле свойств.
- В следующий раз то же сообщение отправлено; фильтр найдет значение и откажется от сообщения.
Хранилище метаданных создает файл свойств в файловой системе. Если вы используете Windows, вы увидите файл metadata-store.properties в папке ‘C: \ Users \ username \ AppData \ Local \ Temp \ spring-интеграции’
В примере используется активатор службы для регистрации, если сообщение было обработано:
01
02
03
04
05
06
07
08
09
10
11
12
|
@Component ( "metadataActivator" ) public class MetadataActivator { private Logger logger = LoggerFactory.getLogger( this .getClass()); public void process(Message<String> msg) { logger.info( "Message processed: {}" , msg.getPayload()); } public void discard(Message<String> msg) { logger.info( "Message discarded: {}" , msg.getPayload()); } } |
Приложение запустит пример:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public class MetadataApp { private static final String MESSAGE_STORE_HEADER = "messageId" ; public static void main(String[] args) throws InterruptedException { AbstractApplicationContext context = new ClassPathXmlApplicationContext( "classpath:xpadro/spring/integration/msg/metadata/config/int-msg-metadata-config.xml" ); MetadataGateway gateway = context.getBean(MetadataGateway. class ); Map<String,String> headers = new HashMap<>(); headers.put(MESSAGE_STORE_HEADER, "msg1" ); Message<String> msg1 = MessageBuilder.withPayload( "msg1" ).copyHeaders(headers).build(); headers = new HashMap<>(); headers.put(MESSAGE_STORE_HEADER, "msg2" ); Message<String> msg2 = MessageBuilder.withPayload( "msg2" ).copyHeaders(headers).build(); gateway.sendMessage(msg1); Thread.sleep( 500 ); gateway.sendMessage(msg1); Thread.sleep( 500 ); gateway.sendMessage(msg2); Thread.sleep( 3000 ); context.close(); } } |
Первый вызов приведет к следующему выводу на консоль:
1
2
3
|
2014-04-17 13:00:08,223|MetadataActivator|Message processed: msg1 2014-04-17 13:00:08,726|MetadataActivator|Message discarded: msg1 2014-04-17 13:00:09,229|MetadataActivator|Message processed: msg2 |
Теперь помните, что PropertiesPersistingMetadataStore хранит данные в файле свойств. Это означает, что данные переживут перезапуск ApplicationContext. Итак, если мы не удалим файл свойств и снова запустим пример, результат будет другим:
1
2
3
|
2014-04-17 13:02:27,117|MetadataActivator|Message discarded: msg1 2014-04-17 13:02:27,620|MetadataActivator|Message discarded: msg1 2014-04-17 13:02:28,123|MetadataActivator|Message discarded: msg2 |
9. Отправка запросов вызова операции
Последним механизмом, обсуждаемым в этом руководстве, является шина управления . Шина управления позволит вам управлять системой так же, как это делает приложение. Сообщение будет выполнено как язык выражений Spring. Чтобы быть исполняемым с шины управления, метод должен использовать аннотацию @ManagedAttribute или @ManagedOperation.
Пример этого раздела использует управляющую шину для вызова метода в бине:
1
2
3
4
5
6
7
8
9
|
< context:component-scan base-package = "xpadro.spring.integration.control.bus" /> < int:channel id = "entryChannel" /> < int:control-bus input-channel = "entryChannel" output-channel = "resultChannel" /> < int:channel id = "resultChannel" /> < int:service-activator input-channel = "resultChannel" ref = "controlbusActivator" /> |
Операция, которая будет вызвана следующим образом:
1
2
3
4
5
6
7
8
|
@Component ( "controlbusBean" ) public class ControlBusBean { @ManagedOperation public String greet(String name) { return "Hello " + name; } } |
Приложение, которое запускает пример, отправляет сообщение с выражением для выполнения:
01
02
03
04
05
06
07
08
09
10
11
12
|
public class ControlBusApp { public static void main(String[] args) throws InterruptedException { AbstractApplicationContext context = new ClassPathXmlApplicationContext( "classpath:xpadro/spring/integration/control/bus/config/int-control-bus-config.xml" ); MessageChannel channel = context.getBean( "entryChannel" , MessageChannel. class ); Message<String> msg = MessageBuilder.withPayload( "@controlbusBean.greet('World!')" ).build(); channel.send(msg); Thread.sleep( 3000 ); context.close(); } } |
Результат показан на консоли:
1
|
2014-04-17 13:21:42,910|ControlBusActivator|Message received: Hello World! |