Эта статья является частью нашего курса Академии под названием Spring Integration для EAI .
В этом курсе вы познакомитесь с шаблонами интеграции корпоративных приложений и с тем, как Spring Integration обращается к ним. Далее вы углубитесь в основы Spring Integration, такие как каналы, преобразователи и адаптеры. Проверьте это здесь !
Содержание
1. Введение
В этом руководстве вы увидите первый пример приложения, улучшенного с помощью Spring Integration. Для этого в этом примере основное внимание будет уделено интеграции с внешними веб-службами.
Сначала я объясню, какие необходимые адаптеры позволят нам вызывать веб-сервис из Spring Integration. Далее мы кратко расскажем о проекте Spring Web Services, который будет внешней веб-службой, которая будет вызываться из нашего приложения. Заканчивая основную часть руководства, мы реализуем приложение, которое будет вызывать веб-сервис.
В завершение учебного курса мы дополним наше приложение некоторыми функциями, предоставляемыми Spring Integration, такими как добавление тайм-аутов, использование перехватчиков и обучение повторению неудачного вызова.
Этот урок состоит из следующих разделов:
- Вступление
- Объяснение адаптеров канала веб-сервиса
- Создание проекта Spring Web Services
- Реализация потока интеграции Spring
- Добавление времени ожидания клиента
- Использование перехватчиков
- Повторные операции веб-службы
2. Объяснение адаптеров канала веб-сервиса
Связь с внешними веб-сервисами осуществляется Spring Integration со шлюзами. Как объяснялось в предыдущем руководстве, вы можете найти два типа шлюзов: входящие и исходящие. В этом руководстве мы будем использовать специальный тип шлюза: шлюз исходящего веб-сервиса . В этом разделе мы собираемся сосредоточиться на этом типе.
Чтобы использовать шлюз веб-службы, вам необходимо указать новое пространство имен:
С новым набором пространства имен мы можем теперь использовать шлюз веб-службы:
1
2
3
4
|
< int-ws:outbound-gateway id = "aGateway" request-channel = "requestChannel" reply-channel = "responseChannel" unmarshaller = "marshaller" /> |
Итак, каково поведение этого шлюза? Выполнение потока будет следующим:
- Сообщение отправляется на канал
requestChannel
. - Это сообщение затем отправляется на шлюз веб-службы, который подписывается на канал, устанавливая его атрибут
request-channel
. - Шлюз отправляет запрос на внешний веб-сервис, объясненный в следующем разделе. Атрибут
uri
указывает место назначения. - Шлюз ожидает внешнего веб-сервиса, пока не вернет ответ.
- Ответ возвращается и распределяется указанным маршаллером.
- Ответ оборачивается в сообщение и отправляется в канал
responseChannel
, указанный атрибутомresponseChannel
reply-channel
.
Как видите, вам просто нужно определить поток (каналы запроса и ответа) и куда звонить. Детали инфраструктуры, необходимые для отправки сообщения, обрабатываются Spring Integration.
2.1. Дополнительные атрибуты
Есть еще несколько атрибутов, которые доступны для настройки вызова со шлюзом. Ниже приведено краткое описание основных атрибутов:
-
Destination provider:
его можно использовать вместо предоставления атрибута «uri». Таким образом, вы можете реализовать свой собственный класс, который будет динамически определять, какая конечная точка вызывается. Вы должны предоставить компоненту с этим интерфейсом: -
Message sender:
позволяет нам определитьWebServiceMessageSender
. Мы будем использовать это, чтобы определить время ожидания клиента позже в этом руководстве. -
Interceptor/Interceptors:
вы можете определить клиентские перехватчики. Это также будет объяснено в следующем разделе этого руководства.
1
2
3
4
5
6
|
public class MyDestinationProvider implements DestinationProvider { @Override public URI getDestination() { //resolve destination } } |
В определении шлюза мы можем использовать этого провайдера вместо предоставления URI напрямую:
1
2
3
|
< int-ws:outbound-gateway id = "aGateway" request-channel = "requestChannel" reply-channel = "responseChannel" destination-provider = "myDestinationProvider" marshaller = "marshaller" unmarshaller = "marshaller" /> |
2.2. Входящий шлюз веб-службы
Этот раздел является просто краткой ссылкой на входящий сервисный шлюз, чтобы узнать, как он обычно работает, так как мы не будем использовать его в этом руководстве.
Этот шлюз будет получать запрос от внешней службы, упаковывать запрос в сообщение и отправлять его в нашу систему обмена сообщениями. Когда мы обработаем запрос, сообщение будет отправлено обратно на шлюз, чтобы доставить ответ, который ожидает веб-служба.
Синтаксис аналогичен шлюзу исходящей веб-службы:
1
2
|
< int-ws:inbound-gateway id = "anotherGateway" request-channel = "requestChannel" marshaller = "marshaller" unmarshaller = "marshaller" /> |
Как вы помните из предыдущего урока, ответ достигнет шлюза через канал временного сообщения. Не указывайте явно канал, если он не нужен.
3. Создание проекта Spring Web Services
В этом разделе описывается проект, который будет предоставлять веб-сервис, который будет использоваться нашим приложением. Он состоит из веб-приложения, реализованного с использованием проекта Spring Web Services .
Приложение довольно простое. Он состоит из сервисного интерфейса, который позволяет пользователю заказать билеты в кинотеатре. Когда заказ запрашивается, служба обработает его и TicketConfirmation
.
Диаграмма ниже показывает, как она структурирована:
Мы объясним это снизу вверх.
3.1. Интерфейс службы тикетов
Вот интерфейс и реализация сервиса:
1
2
3
4
|
public interface TicketService { public TicketConfirmation order(String filmId, Date sessionDate, int quantity); } |
Реализация строит экземпляр TicketConfirmation
из предоставленных данных.
01
02
03
04
05
06
07
08
09
10
11
|
@Service public class TicketServiceimpl implements TicketService { @Override public TicketConfirmation order(String filmId, Date sessionDate, int quantity) { float amount = 5 .95f * quantity; TicketConfirmation confirmation = new TicketConfirmation(filmId, sessionDate, quantity, amount); return confirmation; } } |
Объект TicketConfirmation
является неизменным классом, который будет использоваться для чтения данных подтверждения:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
public final class TicketConfirmation { private String confirmationId; private String filmId; private int quantity; private Date sessionDate; private float amount; public TicketConfirmation(String filmId, Date sessionDate, int quantity, float amount) { this .confirmationId = UUID.randomUUID().toString(); this .filmId = filmId; this .sessionDate = new Date(sessionDate.getTime()); this .quantity = quantity; this .amount = amount; } public String getConfirmationId() { return confirmationId; } public String getFilmId() { return filmId; } public int getQuantity() { return quantity; } public Date getSessionDate() { return new Date(sessionDate.getTime()); } public float getAmount() { return amount; } } |
3.2. Конечная точка билета
Конечная точка отвечает за получение запросов и делегирование обработки заказа службе Ticket
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Endpoint public class TicketEndpoint { @Autowired private TicketService ticketService; public @ResponsePayload TicketResponse order( @RequestPayload TicketRequest ticketRequest) throws InterruptedException { Calendar sessionDate = Calendar.getInstance(); sessionDate.set( 2013 , 9 , 26 ); TicketConfirmation confirmation = ticketService.order( ticketRequest.getFilmId(), DateUtils.toDate(ticketRequest.getSessionDate()), ticketRequest.getQuantity().intValue()); return buildResponse(confirmation); } private TicketResponse buildResponse(TicketConfirmation confirmation) { TicketResponse response = new TicketResponse(); response.setConfirmationId(confirmation.getConfirmationId()); response.setFilmId(confirmation.getFilmId()); response.setSessionDate(DateUtils.convertDate(confirmation.getSessionDate())); BigInteger quantity = new BigInteger(Integer.toString(confirmation.getQuantity())); response.setQuantity(quantity); BigDecimal amount = new BigDecimal(Float.toString(confirmation.getAmount())); response.setAmount(amount); return response; } } |
Служба будет получать запросы, отправленные с пространством имен "http://www.xpadro.spring.samples.com/tickets"
и с ticketRequest
запроса ticketRequest
.
3.3. Конфигурация сервиса
В весенней конфигурации мы определяем компоненты веб-сервиса:
01
02
03
04
05
06
07
08
09
10
|
<!-- Detects @Endpoint since it is a specialization of @Component --> < context:component-scan base-package = "xpadro.spring.ws" /> <!-- detects @PayloadRoot --> < ws:annotation-driven /> < ws:dynamic-wsdl id = "ticketDefinition" portTypeName = "Tickets" < ws:xsd location = "/WEB-INF/schemas/xsd/ticket-service.xsd" /> </ ws:dynamic-wsdl > |
Файл web.xml
предоставляет MessageDispatcherServlet:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
< context-param > < param-name >contextConfigLocation</ param-name > < param-value >classpath:xpadro/spring/ws/config/root-config.xml</ param-value > </ context-param > < listener > < listener-class >org.springframework.web.context.ContextLoaderListener</ listener-class > </ listener > < servlet > < servlet-name >Ticket Servlet</ servlet-name > < servlet-class >org.springframework.ws.transport.http.MessageDispatcherServlet</ servlet-class > < init-param > < param-name >contextConfigLocation</ param-name > < param-value >classpath:xpadro/spring/ws/config/servlet-config.xml</ param-value > </ init-param > < load-on-startup >1</ load-on-startup > </ servlet > < servlet-mapping > < servlet-name >Ticket Servlet</ servlet-name > < url-pattern >/tickets/*</ url-pattern > </ servlet-mapping > |
Теперь нам просто нужно развернуть его на сервере, и он будет готов обслуживать запросы на заказ билетов.
4. Реализация потока интеграции Spring
Наше приложение Spring Integration начинается с простого процесса.
Сообщение с запросом поступит через шлюз входа в систему. Затем сообщение будет доставлено на исходящий шлюз веб-службы, который отправит его на конечную точку и ожидает ответа. После получения он отправит ответ через канал ответа и обратно в шлюз входа в систему, который затем доставит его клиенту.
Клиентское приложение отправляет TicketRequest
в интерфейс TicketService
. Этот интерфейс перехвачен шлюзом. Таким образом, объект TicketRequest
включается в сообщение интеграции Spring и отправляется в систему обмена сообщениями.
01
02
03
04
05
06
07
08
09
10
11
|
public interface TicketService { /** * Entry to the messaging system. * All invocations to this method will be * intercepted and sent to the SI "system entry" gateway * * @param request */ @Gateway public TicketResponse invoke(TicketRequest request); } |
Глядя на конфигурацию шлюза, мы видим, что мы связали его с интерфейсом TicketService
:
1
2
3
|
< int:gateway id = "systemEntry" default-request-channel = "requestChannel" default-reply-channel = "responseChannel" service-interface = "xpadro.spring.integration.ws.gateway.TicketService" /> |
Мы также определили каналы запроса и ответа.
Сообщение запроса будет отправлено на канал requestChannel
где подписан исходящий шлюз веб-службы:
1
2
3
4
|
< int-ws:outbound-gateway id = "marshallingGateway" request-channel = "requestChannel" reply-channel = "responseChannel" unmarshaller = "marshaller" /> |
responseChannel
настроен как его канал ответа, где шлюз входа в систему подписан. Таким образом, клиент получит ответ.
Полный поток настраивается с использованием прямых каналов. Это означает, что поток является синхронным; клиент заблокирует ожидание ответа веб-службы:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
< context:component-scan base-package = "xpadro.spring.integration" /> <!-- Entry to the messaging system --> < int:gateway id = "systemEntry" default-request-channel = "requestChannel" default-reply-channel = "responseChannel" service-interface = "xpadro.spring.integration.ws.gateway.TicketService" /> < int:channel id = "requestChannel" /> < int-ws:outbound-gateway id = "marshallingGateway" request-channel = "requestChannel" reply-channel = "responseChannel" unmarshaller = "marshaller" /> < oxm:jaxb2-marshaller id = "marshaller" contextPath = "xpadro.spring.integration.ws.types" /> < int:channel id = "responseChannel" /> |
Система настроена; нам не нужно было реализовывать какой-либо класс Java. Все настраивается через конфигурацию.
Закончив с примером, давайте посмотрим тест, который выполняет этот поток:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@ContextConfiguration ({ "classpath:xpadro/spring/integration/ws/test/config/int-ws-config.xml" }) @RunWith (SpringJUnit4ClassRunner. class ) public class TestInvocation { @Autowired private TicketService service; @Test public void testInvocation() throws InterruptedException, ExecutionException { TicketRequest request = new TicketRequest(); request.setFilmId( "aFilm" ); request.setQuantity( new BigInteger( "3" )); request.setSessionDate(DateUtils.convertDate( new Date())); TicketResponse response = service.invoke(request); assertNotNull(response); assertEquals( "aFilm" , response.getFilmId()); assertEquals( new BigInteger( "3" ), response.getQuantity()); } } |
В следующих разделах мы добавим некоторые функции в этот пример приложения.
5. Добавление времени ожидания клиента
Проверив пространство имен шлюза, мы видим, что нет конфигурации для установки тайм-аута вызова. Независимо от этого, мы можем использовать отправителя сообщения.
Отправитель сообщения является реализацией WebServiceMessageSender
. Одной из интересных реализаций, предоставляемых проектом Spring Web Services, является класс HttpComponentsMessageSender
. Этот класс позволит нам добавить аутентификацию или пул соединений к вызову, внутренне используя Apache HttpClient
. Более того, мы также сможем определить таймауты чтения и подключения.
Следуя примеру, давайте добавим время ожидания.
Во-первых, нам нужно определить bean-компонент с указанным выше классом. Это будет наш отправитель сообщения:
1
2
3
4
|
< bean id = "messageSender" class = "org.springframework.ws.transport.http.HttpComponentsMessageSender" > < property name = "connectionTimeout" value = "5000" /> < property name = "readTimeout" value = "10000" /> </ bean > |
Далее мы настроим отправителя сообщения в нашем шлюзе веб-службы:
1
2
3
4
|
< int-ws:outbound-gateway id = "marshallingGateway" request-channel = "requestChannel" reply-channel = "responseChannel" unmarshaller = "marshaller" message-sender = "messageSender" /> |
Вот и все. Теперь WebServiceIOException
будет WebServiceIOException
, если истечет время ожидания.
6. Использование перехватчиков
Еще одна функция, включенная в пространство имен шлюза веб-службы, — это возможность настройки клиентских перехватчиков. Эти клиентские перехватчики являются функцией проекта Spring Web Services и относятся к перехватчикам конечных точек на стороне клиента. Реализация ClientInterceptor
имеет следующие методы:
1
2
3
4
5
6
7
8
|
public interface ClientInterceptor { boolean handleRequest(MessageContext messageContext) throws WebServiceClientException; boolean handleResponse(MessageContext messageContext) throws WebServiceClientException; boolean handleFault(MessageContext messageContext) throws WebServiceClientException; } |
-
handleRequest
: этот метод вызывается до вызова конечной точки. -
handleResponse
: этот метод вызывается после успешного возврата конечной точки. -
handleFault
: если конечная точка выдает ошибку, этот метод вызывается.
Обратите внимание, что эти методы могут манипулировать MessageContext
, который содержит запрос и ответ.
Давайте посмотрим на это на примере. Мы собираемся реализовать наш собственный клиентский перехватчик для перехвата вызова перед вызовом конечной точки, и мы собираемся изменить значение запроса.
Перехватчик реализует интерфейс ClientInterceptor
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class MyInterceptor implements ClientInterceptor { private Logger logger = LoggerFactory.getLogger( this .getClass()); @Override public boolean handleRequest(MessageContext messageContext) throws WebServiceClientException { WebServiceMessage message = messageContext.getRequest(); DOMSource source = (DOMSource) message.getPayloadSource(); Node quantityNode = source.getNode().getAttributes().getNamedItem( "quantity" ); String oldValue = quantityNode.getNodeValue(); quantityNode.setNodeValue( "5" ); logger.info( "Before endpoint invocation. Changed quantity old value {} for {}" , oldValue, 5 ); return true ; } @Override public boolean handleResponse(MessageContext messageContext) throws WebServiceClientException { logger.info( "endpoint invocation succeeds" ); return true ; } @Override public boolean handleFault(MessageContext messageContext) throws WebServiceClientException { logger.info( "endpoint returned a fault" ); return true ; } } |
Теперь нам нужно добавить наш перехватчик в конфигурацию шлюза:
1
2
3
4
5
6
|
< int-ws:outbound-gateway id = "marshallingGateway" request-channel = "requestChannel" reply-channel = "responseChannel" unmarshaller = "marshaller" message-sender = "messageSender" interceptor = "myInterceptor" /> < bean id = "myInterceptor" class = "xpadro.spring.integration.ws.interceptor.MyInterceptor" /> |
Пространство имен шлюза веб-службы также позволяет нам определять атрибут interceptors
. Это позволяет нам настроить список клиентских перехватчиков.
Тест подтвердит, что значение запроса было изменено:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@ContextConfiguration ({ "classpath:xpadro/spring/integration/ws/test/config/int-ws-config.xml" }) @RunWith (SpringJUnit4ClassRunner. class ) public class TestInvocation { @Autowired private TicketService service; @Test public void testInvocation() throws InterruptedException, ExecutionException { TicketRequest request = new TicketRequest(); request.setFilmId( "aFilm" ); request.setQuantity( new BigInteger( "3" )); request.setSessionDate(DateUtils.convertDate( new Date())); TicketResponse response = service.invoke(request); assertNotNull(response); assertEquals( "aFilm" , response.getFilmId()); assertEquals( new BigInteger( "5" ), response.getQuantity()); } } |
Перед реализацией пользовательского перехватчика учтите, что проект Spring Web Services предоставляет несколько реализаций:
-
PayloadValidatingInterceptor
:PayloadValidatingInterceptor
полезную нагрузку сообщения веб-службы с помощью схемы. Если проверка не пройдена, обработка будет отменена. -
Wss4jSecurityInterceptor
: перехватчик конечной точки безопасности веб-службы на основе Apache WSS4J. -
XwsSecurityInterceptor
: перехватчик конечной точки безопасности веб-службы на основе пакета Sun для XML и безопасности веб-служб.
7. Повторите операции веб-службы
Иногда нам может потребоваться вызвать службу, и она временно не работает, или, возможно, служба работает только в определенные дни. Если это произойдет, мы можем повторить вызов позже. Spring Integration предлагает возможность повторять попытку вызова службы до тех пор, пока не будет выполнено условие. Это может быть связано с тем, что служба наконец-то ответила или мы достигли максимального количества попыток. Для этой функции Spring Integration предлагает совет по повторной попытке. Этот совет поддерживается проектом Spring Retry .
Рекомендации по повторным попыткам включены в исходящий шлюз веб-службы. Таким образом, шлюз делегирует вызов веб-службы рекомендации по повторной попытке. В случае сбоя вызова службы, совет будет продолжать повторять операцию, основываясь на ее конфигурации.
7.1. Определение совета повторения
Мы должны определить новый компонент с RequestHandlerRetryAdvice
класса RequestHandlerRetryAdvice
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
< bean id = "retryAdvice" class = "org.springframework.integration.handler.advice.RequestHandlerRetryAdvice" > < property name = "retryTemplate" > < bean class = "org.springframework.retry.support.RetryTemplate" > < property name = "backOffPolicy" > < bean class = "org.springframework.retry.backoff.FixedBackOffPolicy" > < property name = "backOffPeriod" value = "5000" /> </ bean > </ property > < property name = "retryPolicy" > < bean class = "org.springframework.retry.policy.SimpleRetryPolicy" > < property name = "maxAttempts" value = "5" /> </ bean > </ property > </ bean > </ property > </ bean > |
Мы определили рекомендацию, что в случае неудачного вызова он будет повторять попытки каждые пять секунд до тех пор, пока служба не ответит или не попытается пять раз. Позже мы увидим, как эти политики определены в рекомендации.
7.2. Добавление совета в шлюз
Как только совет определен, нам нужно включить его в шлюз. Пространство имен Spring Integration Web Services уже предлагает элемент для этого:
1
2
3
4
5
6
7
8
9
|
< int-ws:outbound-gateway id = "marshallingGateway" request-channel = "requestChannel" reply-channel = "responseChannel" unmarshaller = "marshaller" message-sender = "messageSender" interceptor = "myInterceptor" > < int-ws:request-handler-advice-chain > < ref bean = "retryAdvice" /> </ int-ws:request-handler-advice-chain > </ int-ws:outbound-gateway > |
Мы включили совет в шлюз. Теперь давайте изменим наш пример, чтобы увидеть, как это работает.
7.3. Изменение конечной точки веб-службы
Мы собираемся изменить нашу конечную точку, чтобы не давать сбой, пока не будет предпринято указанное количество попыток. В этом случае два раза, пока не вернется ответ.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
public @ResponsePayload TicketResponse order( @RequestPayload TicketRequest ticketRequest) throws InterruptedException { Calendar sessionDate = Calendar.getInstance(); sessionDate.set( 2013 , 9 , 26 ); TicketConfirmation confirmation = ticketService.order( ticketRequest.getFilmId(), DateUtils.toDate(ticketRequest.getSessionDate()), ticketRequest.getQuantity().intValue()); TicketResponse response = buildResponse(confirmation); retries++; if (retries < 3 ) { throw new RuntimeException( "not enough retries" ); } else { retries = 0 ; } return response; } |
Теперь мы запустим тест и используем ранее определенный перехватчик, чтобы посмотреть, как он регистрирует попытки:
1
2
3
4
5
6
7
|
2014-03-26 08:24:50,535|AbstractEndpoint|started org.springframework.integration.endpoint.EventDrivenConsumer@392044a1 2014-03-26 08:24:50,626|MyInterceptor|Before endpoint invocation. Changed quantity old value 3 for 5 2014-03-26 08:24:51,224|MyInterceptor|endpoint returned a fault 2014-03-26 08:24:56,236|MyInterceptor|Before endpoint invocation. Changed quantity old value 3 for 5 2014-03-26 08:24:56,282|MyInterceptor|endpoint returned a fault 2014-03-26 08:25:01,285|MyInterceptor|Before endpoint invocation. Changed quantity old value 3 for 5 2014-03-26 08:25:01,377|MyInterceptor|endpoint invocation succeeds |
Шлюз продолжал пробовать вызов до тех пор, пока служба не ответит, так как совет повторения имеет большее количество повторов (пять).
7.4. Повторите рекомендации политики
Совет по повторной попытке Spring Integration поддерживается политиками проекта Spring Retry. Эти правила описаны ниже:
Отменить политику
Он устанавливает промежуток времени между повторными попытками или до первоначальной повторной попытки. Интерфейс BackOffPolicy определяет два метода:
BackOffContext start(RetryContext context);
void backOff (BackOffContext backOffContext) выдает BackOffInterruptedException;
Метод start
позволяет определить начальное поведение. Например, начальная задержка.
Метод backoff
позволяет определить паузу между попытками.
Проект Spring Retry предоставляет несколько реализаций политики отката:
- Политика отмены безгражданства: не поддерживать состояние между вызовами.
-
FixedBackOffPolicy:
делает паузу на определенное время между повторными попытками. Начальная задержка не установлена. -
NoBackOffPolicy:
выполняются без паузы между ними.
-
ExponentialBackOffPolicy:
начиная с указанного промежутка времени, он будет умножаться при каждом вызове. По умолчанию это удваивает время. Вы можете изменить множитель. -
ExponentialRandomBackOffPolicy:
расширяетExponentialBackOffPolicy
. Множитель устанавливается случайным образом.
Повторить политику
Это позволяет определить, сколько раз совет повторения выполнит вызов веб-службы, прежде чем сдаться. Интерфейс RetryPolicy
определяет несколько методов:
boolean canRetry(RetryContext context);
RetryContext open (родитель RetryContext);
void close (контекст RetryContext);
void registerThrowable (контекст RetryContext, Throwable throwable);
Метод canRetry
возвращает, если операция может быть повторена. Это может произойти, например, если мы не достигли максимального числа повторных попыток.
Метод open
используется для получения всех необходимых ресурсов, для отслеживания количества попыток или возникновения исключения во время предыдущей попытки.
Метод registerThrowable
вызывается после каждого неудачного вызова.
Проект Spring Retry предоставляет несколько реализаций политики повторов:
-
SimpleRetryPolicy:
вызов до достижения максимального количества попыток. -
TimeoutRetryPolicy:
он будетTimeoutRetryPolicy:
попытки доTimeoutRetryPolicy:
ожидания. Тайм-аут начинается во время открытого метода. -
NeverRetryPolicy:
он просто пробует вызов один раз. -
AlwaysRetryPolicy:
метод canRetry всегда возвращает true. Он будет повторять попытки до тех пор, пока служба не ответит. -
ExceptionClassifierRetryPolicy:
определяет различное максимальное количество попыток в зависимости от выданного исключения. -
CompositeRetryPolicy:
содержит список политик повторов, которые будут выполняться по порядку.
7,5. Повторите операции, используя поллер
Доступные политики повторов реализуются с использованием задержек по времени, что подходит для большинства ситуаций, но в этом разделе мы собираемся реализовать пользовательское решение, которое позволит нам использовать программу опроса, которая будет настроена с использованием выражения Cron .
Поскольку вызов может завершиться неудачно, шлюз не вернет результат. Мы сделаем поток асинхронным, чтобы клиент мог отправить запрос на обслуживание и продолжить. Таким образом, поток будет повторять попытки из другого потока, пока результат не будет обработан активатором службы или не будет достигнут предел повторных попыток.
Шлюз выглядит следующим образом:
1
2
3
4
|
public interface AsyncTicketService { @Gateway public void invoke(TicketRequest request); } |
Шлюз не определяет канал ответа, так как ответ не будет отправлен. Поскольку это асинхронный запрос, канал запроса содержит очередь. Это позволит его потребителю активно опрашивать сообщение из другого потока:
1
2
3
4
5
6
|
< int:gateway id = "systemEntry" default-request-channel = "requestChannel" service-interface = "xpadro.spring.integration.ws.gateway.AsyncTicketService" /> < int:channel id = "requestChannel" > < int:queue /> </ int:channel > |
Мы включили опрос в шлюз веб-службы, так как теперь он будет опрашивать сообщения:
1
2
3
4
5
6
7
|
< int-ws:outbound-gateway id = "marshallingGateway" request-channel = "requestChannel" reply-channel = "responseChannel" unmarshaller = "marshaller" interceptor = "myInterceptor" > < int:poller fixed-rate = "500" /> </ int-ws:outbound-gateway > |
Предыдущий вызов может привести к трем различным результатам: правильный вызов, неудачный вызов, который необходимо повторить, и окончательный неудачный вызов, который необходимо зарегистрировать.
Сервисный вызов правильно вызван
У нас есть активатор службы подписки на канал ответа. Это простой пример, поэтому он просто запишет результат:
1
2
3
|
<!-- Service is running - Response received --> < int:channel id = "responseChannel" /> < int:service-activator ref = "clientServiceActivator" method = "handleServiceResult" input-channel = "responseChannel" /> |
Сбой вызова службы. Повторите операцию
Если что-то пошло не так, и поскольку это асинхронный запрос, исключение будет помещено в исключение MessageHandlingException
и отправлено в канал ошибок, который по умолчанию настроен Spring Integration.
На данный момент у нас есть маршрутизатор, подписанный на канал ошибок. Этот маршрутизатор обрабатывает количество повторных попыток и, основываясь на этом, перенаправляет сообщение об ошибке на соответствующий канал. Если операция должна быть повторена, она отправит сообщение на канал повторной попытки:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@Component ( "serviceRouter" ) public class ServiceRouter { private Logger logger = LoggerFactory.getLogger( this .getClass()); private int maxRetries = 3 ; private int currentRetries; public String handleServiceError(Message<?> msg) { logger.info( "Handling service failure" ); if (maxRetries > 0 ) { currentRetries++; if (currentRetries > maxRetries) { logger.info( "Max retries [{}] reached" , maxRetries); return "failedChannel" ; } } logger.info( "Retry number {} of {}" , currentRetries, maxRetries); return "retryChannel" ; } } |
Конфигурация маршрутизатора показана ниже:
1
2
3
4
|
<!-- Service invocation failed --> < int:router ref = "serviceRouter" method = "handleServiceError" input-channel = "errorChannel" /> < int:channel id = "retryChannel" /> < int:channel id = "failedChannel" /> |
Далее у нас есть эти конечные точки, которые объясняются ниже:
01
02
03
04
05
06
07
08
09
10
11
|
<!-- Retry --> < int:service-activator ref = "clientServiceActivator" method = "retryFailedInvocation" input-channel = "retryChannel" /> < int:inbound-channel-adapter id = "retryAdapter" channel = "requestChannel" ref = "clientServiceActivator" method = "retryInvocation" auto-startup = "false" > < int:poller cron = "0/5 * * * * *" /> </ int:inbound-channel-adapter > <!-- Log failed invocation --> < int:service-activator ref = "clientServiceActivator" method = "handleFailedInvocation" input-channel = "failedChannel" /> |
retryAdapter
входящего канала retryAdapter
будет продолжать опрашивать канал запроса, но обратите внимание, что атрибут auto-startup
имеет значение false. Это означает, что этот адаптер отключен, пока его не активируют. Нам нужно сделать это, иначе он начнет опрос с самого начала, и мы хотим активировать его только в случае неудачного вызова.
Активатор службы запускает или останавливает адаптер в зависимости от результата вызова службы. Когда это терпит неудачу, это запускает адаптер, чтобы начать повторную попытку. Если достигнуто максимальное количество повторных попыток, маршрутизатор перенаправит сообщение на неисправный канал, где активатор службы отключит адаптер, чтобы остановить его опрос. Если вызов, наконец, завершается успешно, он регистрирует сообщение и останавливает адаптер.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@Component ( "clientServiceActivator" ) public class ClientServiceActivator { private Logger logger = LoggerFactory.getLogger( this .getClass()); @Autowired @Qualifier ( "retryAdapter" ) private AbstractEndpoint retryAdapter; private Message<?> message; public void handleServiceResult(Message<?> msg) { logger.info( "service successfully invoked. Finishing flow" ); retryAdapter.stop(); } public void retryFailedInvocation(Message<?> msg) { logger.info( "Service invocation failed. Activating retry trigger..." ); MessageHandlingException exc = (MessageHandlingException) msg.getPayload(); this .message = exc.getFailedMessage(); retryAdapter.start(); } public Message<?> retryInvocation() { logger.info( "Retrying service invocation..." ); return message; } public void handleFailedInvocation(MessageHandlingException exception) { logger.info( "Maximum number of retries reached. Finishing flow." ); retryAdapter.stop(); } } |
Тестовый класс был изменен, чтобы не ожидать результата:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
@ContextConfiguration ({ "classpath:xpadro/spring/integration/ws/test/config/int-ws-async-config.xml" }) @RunWith (SpringJUnit4ClassRunner. class ) public class TestAsyncInvocation { @Autowired private AsyncTicketService service; @Test public void testInvocation() throws InterruptedException, ExecutionException { TicketRequest request = new TicketRequest(); request.setFilmId( "aFilm" ); request.setQuantity( new BigInteger( "3" )); request.setSessionDate(DateUtils.convertDate( new Date())); service.invoke(request); Thread.sleep( 80000 ); } } |
Вот и все. Очевидно, что нет необходимости реализовывать весь этот поток, зная, что мы можем использовать рекомендации повторных попыток проекта Spring Retry, но цель этого примера — получить больше знаний о том, как создавать более сложные потоки, используя активацию и деактивацию адаптеров, перенаправления маршрутизатора и другие функции для удовлетворения ваших потребностей.
8. Скачать исходный код
Вы можете скачать исходный код, касающийся интеграции Spring и веб-сервисов, здесь: Spring_Integration_Sample.zip и Spring_WS_Sample.zip