Статьи

AMQP Backed Spring Integration с использованием vFabric RabbitMQ


Для людей, которые спешат,
выполните следующие шаги, чтобы запустить демонстрацию .

Вступление

Объединяя vFabric RabbitMQ и Spring Integration, мы можем создавать слабосвязанные рабочие процессы на основе сообщений класса Enterprise, используя Spring AMQP . Обратитесь к документации Spring Integration, AMQP Backed Channels .

В этой демонстрации мы спроектируем рабочий процесс Spring Integration , в котором сообщение публикуется на  канале p2p-pollable, а канал публикации-подписки прослушивает его, принимает это сообщение и передает его двум различным активаторам службы. для дальнейшей обработки. Я также продемонстрирую, что оба эти канала сообщений сами по себе слушают внешние приложения, и когда внешнее приложение публикует сообщение в эти очереди, они будут обрабатывать их в рабочем процессе. Типичный пример использования этого — в ИТ-среде предприятия, где мы можем создавать надежные рабочие процессы, в которых, если сообщение не обрабатывается на определенном этапе, мы можем отправить данные и обработать их оттуда. С весны AMQPявляется проводным протоколом, внешнее приложение может использоваться любым сетевым протоколом для публикации сообщения в очереди vFabric RabbitMQ, и запускается поток интеграции Spring.

Канал сообщений с поддержкой AMQP в потоке интеграции Spring

Канал сообщений на основе AMQP в Spring Integration

Как всегда в моем блоге, согласно TDD , я сначала напишу тест, как показано ниже,

public class PublisherSubscriberTest {

@Test
public void testIntegration() {
try {
String request = streamToString(getClass().getResourceAsStream(
"/data/payload.xml"));
Message<String> message = MessageBuilder.withPayload(request)
.build();
channel.send(message);
//assert various messages
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

Поток Конфигурации Spring Integration:

<int:bridge input-channel="p2p-pollable-channel"
output-channel="pub-sub-channel" />

<int:service-activator input-channel="pub-sub-channel"
id="serviceActivator1" ref="serviceActivator1Bean" method="logXml" />

<int:service-activator input-channel="pub-sub-channel"
id="serviceActivator2" ref="serviceActivator2Bean" method="logXml" />

<bean id="serviceActivator1Bean"
class="com.goSmarter.amqp.service.Subscriber1ServiceActivator">
</bean>

<bean id="serviceActivator2Bean"
class="com.goSmarter.amqp.service.Subscriber2ServiceActivator">
</bean>

Определение тестового канала:

<int:poller default="true" fixed-rate="1000" />
<int:channel id="p2p-pollable-channel" />
<int:publish-subscribe-channel id="pub-sub-channel" />

Фактическое определение канала, используемого как часть web.xml, можно увидеть здесь

<!-- A reference to the org.springframework.amqp.rabbit.connection.ConnectionFactory -->
<rabbit:connection-factory id="connectionFactory" />
<!-- Creates a org.springframework.amqp.rabbit.core.RabbitAdmin to manage
exchanges, queues and bindings -->
<rabbit:admin connection-factory="connectionFactory" />

<int-amqp:channel id="p2p-pollable-channel" connection-factory="connectionFactory" />
<int-amqp:publish-subscribe-channel id="pub-sub-channel" connection-factory="connectionFactory" />

Когда вы запускаете это приложение в STS как «Запуск на сервере» в первый раз, вы заметите, что в консоли администратора vFabric RabbitMQ будет одна очередь, p2p-pollable-channel и Exchange с именем si.fanout.pub-sub-channel. Вы заметите, что среда интеграции Spring добавила «si.fanout» перед паб-подканалом. Если вы хотите проверить работоспособность сквозного соединения, вы можете опубликовать сообщение в очереди или на сервере Exchange , оно продолжит рабочий процесс с этого момента.

Если вы используете каналы с поддержкой AMQP, вы можете ответить на полезную нагрузку с любого канала, чтобы продолжить рабочий процесс, это будет полезно при обработке ошибок и повторных попытках загрузки.

Вывод

В этом примере мы продемонстрировали, что с одним компонентом, поддерживаемым AMQP, мы можем создать слабо связанный рабочий процесс. В зависимости от контекста может быть 2 различных способа построения рабочего процесса публикации подписчика, один из которых описан выше, а другой — один компонент издателя будет иметь несколько компонентов подписчика.

Я надеюсь, что этот блог помог вам.