Статьи

Spring Integration Publisher

Рассмотрим гипотетическое требование — у вас есть класс обслуживания в вашем приложении, и вы хотите собрать некоторую информацию о вызовах к этому сервису:

1
2
3
4
5
6
7
8
9
@Service
public class SampleBean {
 private static final Logger logger = LoggerFactory.getLogger(SampleBean.class);
 
 public Response call(Request request) {
  logger.info("SampleBean.call invoked");
  return new Response(true);
 }
}

AOP отлично подходит для такого требования, он позволяет аккуратно захватить информацию о вызове метода (pointcut) и выполнить некоторую обработку (рекомендацию) с этой информацией:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
public class AuditAspect {
 private static final Logger logger = LoggerFactory.getLogger(AuditAspect.class);
 @Pointcut("execution( * core.SampleBean.call(model.Request)) && args(r)")
 public void beanCalls(Request r){}
 
 @Around("beanCalls(r)")
 public Object auditCalls(ProceedingJoinPoint pjp, Request r) {
     logger.info("Capturing request: " + r);
  try{
   Object response = pjp.proceed();
   logger.info("Capturing response: " + response);
   return response;
  }catch(Throwable e) {
   throw new RuntimeException(e);
  }
 }
}

Это кажется достаточно хорошим. Теперь, что если бы я хотел немедленно вернуть ответ клиенту, но продолжил обрабатывать контекст вызова метода — мы можем поместить логику Advice в отдельный поток, используя ThreadPool. Позвольте мне добавить еще один уровень сложности сейчас: что если мы хотим абсолютно гарантировать, что контекст не потерян — хороший способ сделать это — сохранить контекст вызова метода вне JVM, обычно провайдеры обмена сообщениями, такие как RabbitMQ и ActiveMQ, будут вписывается очень хорошо.

Принимая во внимание эти дополнительные требования, более простым решением, особенно с учетом сценариев обмена сообщениями, будет использование Spring Integration. Давайте начнем с определения нового контекста приложения Spring Integration:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
<?xml version="1.0" encoding="UTF-8"?>
 xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
 
    <annotation-config/>
 
    <channel id="tobeprocessedlater"/>
 
    <logging-channel-adapter channel="tobeprocessedlater" log-full-message="true"/>
 
</beans:beans>

У него просто есть определение канала и исходящий адаптер, который читает этот канал и регистрирует полное сообщение. Чтобы захватить контекст вызова к SampleBean, аннотация Publisher может быть добавлена ​​к соответствующим методам SampleBean, которые будут направлять «вещи» в канал, который добавляется в аннотацию.

01
02
03
04
05
06
07
08
09
10
@Service
public class SampleBean {
 private static final Logger logger = LoggerFactory.getLogger(SampleBean.class);
 
 @Publisher(channel = "tobeprocessedlater")
 public Response call(@Header("request") Request request) {
  logger.info("SampleBean.call invoked");
  return new Response(true);
 }
}

то, что «вещество» отправляется на этот канал «tobeprocessedlater», указывается в дополнительных аннотациях — по умолчанию возвращаемое значение метода отправляется в канал, дополнительно я пометил запрос также аннотацией @Header, это сделает запрос быть отправленным в качестве заголовка к ответному сообщению. Просто для полноты, у контекста интеграции есть тег <annotation-config />, этот тег регистрирует соответствующие компоненты, которые ищут аннотацию @Publisher и включаются в дополнительное действие, которое будет выполнено, если он его найдет.

Если этот код выполняется сейчас, вывод будет выглядеть следующим образом:

1
2
core.SampleBean - SampleBean.call invoked
o.s.integration.handler.LoggingHandler - [Payload=Response{success=true}][Headers={request=RequestType1{attr='null'}, id=52997b10-dc8e-e0eb-a82a-88c1df68fca5, timestamp=1389268390212}]

Теперь, чтобы наложить первое требование, обработать совет (в данном случае ведение журнала) в отдельном потоке выполнения:

Это можно сделать, просто изменив конфигурацию! — вместо публикации сообщения на прямом канале, публикации его на канале, который может буферизовать сообщения или использовать исполнителя для отправки сообщений, я решил использовать канал исполнителя в этом примере:

1
2
3
<channel id="tobeprocessedlater">
        <dispatcher task-executor="taskExecutor"/>
    </channel>

Теперь, чтобы добавить требование сделать асинхронную обработку сообщений немного более надежной, опубликовав ее внешнему провайдеру обмена сообщениями (и обработав сообщения позже), позвольте мне продемонстрировать это, опубликовав сообщения в RabbitMQ, изменение кода снова является чистой конфигурацией и ничего в коде не меняется!

1
2
3
<channel id="tobeprocessedlater"/>
 
    <int-amqp:outbound-channel-adapter amqp-template="amqpTemplate" channel="tobeprocessedlater"  />

Приемник сообщений мог быть чем угодно — база данных, файловая система, ActiveMQ, и изменение, которое потребовалось бы, — это чистая конфигурация.

Ссылка: Spring Integration Publisher от нашего партнера JCG Биджу Кунджуммен в блоге all and sundry.