Мы представили OpenHub Framework в предыдущей части этой серии. Эта часть демонстрирует одну из самых мощных функций фреймворка — модель асинхронного обмена сообщениями .
Асинхронная связь между системами используется, когда исходная система не может дождаться ответа целевой системы. Есть несколько причин:
- исходная система должна максимально реагировать и не подвергаться влиянию внешних воздействий (медленная связь, нестабильная целевая система и т. д.)
- обработка в целевой системе занимает много времени
- асинхронная связь положительно влияет на производительность и трафик
Асинхронные сценарии
Когда вы решаете общаться асинхронно, вам нужно продумать возможные сценарии:
- Целевая система должна подтвердить, что входящее сообщение было успешно сохранено и подготовлено для дальнейшей обработки. Должна ли исходная система быть уведомлена об окончательном результате асинхронного процесса?
- Что делать в случае сбоя асинхронной обработки? Попробуйте несколько раз, если есть временная техническая ошибка (например, сбой связи с другой системой) или остановите дальнейшую обработку из-за бизнес-ошибки (например, входные данные недействительны).
- Другие системы вызываются во время асинхронной обработки — что делать, если вызов первой системы в порядке, но вызов второй системы завершился неудачно? Асинхронная обработка должна быть идемпотентной, пропустить первый успешный вызов и повторить только второй вызов.
- Асинхронный процесс может быть сложным, и тогда было бы хорошо разделить один большой процесс (родительский) на более мелкие (дочерние) процессы . Если потомки обрабатываются, то и родительский процесс будет завершен.
- Иногда вам нужно гарантировать порядок входящих запросов (запросы не должны поступать в том же порядке, в котором они были отправлены) и обрабатывать их в точном порядке.
- Это асинхронная обработка, и вам нужно отслеживать ее или получать автоматические уведомления, если происходит что-то неожиданное , например, происходит сбой асинхронного процесса.
- Иногда вам нужно сохранять данные или текущее состояние асинхронного процесса между попытками завершить его успешно, например, результатом первого вызова внешней системы является вход для второго вызова.
Когда вы начинаете думать обо всех этих сценариях, вы обнаруживаете, что реализовать его с нуля не так просто. Существует платформа OpenHub со встроенной поддержкой обработки асинхронных сообщений. Это простой в использовании, но надежный и гибкий одновременно. А также настраивается, например, сколько раз процесс должен быть запущен снова? В какие промежутки времени?
Реализация асинхронного маршрута
Реализация маршрута с каркасом OpenHub имеет два под-маршрута:
- один для обработки входящего сообщения (
RouteIn
) - один для реализации асинхронного процесса (
RouteOut
)
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
/** * Route definition for asynchronous operation "translate" via web services. */ @CamelConfiguration (value = AsyncTranslateWsRoute.ROUTE_BEAN) public class AsyncTranslateWsRoute extends AbstractBasicRoute { static final String ROUTE_BEAN = "asyncTranslateWsRouteBean" ; private static final String OPERATION_NAME = "asyncTranslateWs" ; static final String ROUTE_ID_ASYNC_IN = getInRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME); static final String ROUTE_ID_ASYNC_OUT = getOutRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME); static final String URI_ASYNC_OUT = "direct:" + ROUTE_ID_ASYNC_OUT; @Override protected void doConfigure() throws Exception { // asyncTranslate - input asynch message createAsyncRouteIn(); // asyncTranslate - process delivery (=asynchronous execution) createAsyncRouteOut(); } /** * Route for asynchronous <strong>asyncTranslate</strong> input operation. * <p/> * Prerequisite: none * <p/> * Output: {@link AsyncTranslateResponse} */ private void createAsyncRouteIn() { Namespaces ns = new Namespaces( "h" , TranslateWebServiceConfig.TRANSLATE_SERVICE_NS); // note: mandatory parameters are set already in XSD, this validation is extra XPathValidator validator = new XPathValidator( "/h:asyncTranslateRequest" , ns, "h:inputText" ); AsynchRouteBuilder.newInstance(ServiceEnum.TRANSLATE, OPERATION_NAME, getInWsUri( new QName(TranslateWebServiceConfig.TRANSLATE_SERVICE_NS, "asyncTranslateRequest" )), new AsynchResponseProcessor() { @Override protected Object setCallbackResponse(CallbackResponse callbackResponse) { AsyncTranslateResponse res = new AsyncTranslateResponse(); res.setConfirmAsyncTranslate(callbackResponse); return res; } }, jaxb(AsyncTranslateResponse. class )) .withValidator(validator) .build( this ); } /** * Route for <strong>asyncTranslate</strong> operation - process delivery (=asynchronous execution). * Only input text is logged in this case. * <p/> * Prerequisite: none */ private void createAsyncRouteOut() { final String URI_LOG_INPUT_PARAMS = "direct:logInputParams" ; from(URI_ASYNC_OUT) .routeId(ROUTE_ID_ASYNC_OUT) // xml -> AsyncTranslateRequest .unmarshal(jaxb(AsyncTranslateRequest. class )) .to( "extcall:message:" + URI_LOG_INPUT_PARAMS); from(URI_LOG_INPUT_PARAMS) .validate(body().isInstanceOf(AsyncTranslateRequest. class )) .log(LoggingLevel.DEBUG, "Asynchronous execution - input text '${body.inputText}' (lang: ${body.inputLang})" ); } } |
RouteIn
использует AsynchRouteBuilder
для легкой настройки со следующими функциями:
- определяет, какой входящий запрос веб-службы должен запустить этот маршрут
- определяет ответ подтверждения для исходной системы. Когда входной маршрут успешно выполнен, возвращается синхронный ответ исходной системе.
- определяет валидатор, который проверяет, есть ли элемент inputText во входящем запросе
RouteOut
определяет сам асинхронный процесс. Входной запрос ( AsyncTranslateRequest
) регистрируется только в этом случае.
И это все. Все вокруг реализуется фреймворком OpenHub.
Внешние звонки
Ваши реализации маршрутов часто будут вызывать внешние системы или другие маршруты. Если вы реализуете асинхронный процесс, тогда вы должны придерживаться правил идемпотентности — каждая часть вашего процесса может вызываться более одного раза, и вы должны обеспечивать одинаковое поведение во всех вызовах. Иногда внешняя система / маршрут сама по себе идемпотентна, и тогда вы можете вызывать ее столько раз, сколько захотите. Если нет, то вы должны контролировать это в своей реализации. Поэтому мы сделали компонент Camel extcall .
excall
компонента в приведенном выше примере гарантирует, что маршрут с URI_LOG_INPUT_PARAMS
будет вызван ровно один раз, даже если весь асинхронный процесс выполняется больше раз.
Описание:
- во время асинхронной обработки сообщений вызываются две внешние системы
- есть две остановки extcall, к которым мы можем вернуться во время обработки
- если ошибка возникла до первого запроса к внешней системе 1, то следующая попытка обработки начнется с начала, так же, как приходит новое сообщение
- во время связи с внешней системой 2 следующая попытка обработки начнется с extcall1
- после успешного ответа от внешней системы 2 следующая попытка обработки начнется с extcall2
Компоненты воронки и дросселирования
Другими мощными компонентами являются funnel
и throttling
.
Компонент « Funnel
предназначен для фильтрации одновременных сообщений в определенной точке интеграции. Эта фильтрация гарантирует, что в этом месте будет обрабатываться только одно сообщение определенного типа или с определенной информацией в один момент даже в гарантированном порядке (необязательный выбор). Это может быть полезно для связи с внешними системами, которые могут принимать только одно входное сообщение для конкретного объекта (например, одного конкретного клиента в системе заказов) одновременно.
throttling
второго компонента позволяет вам гарантировать, что конкретная конечная точка не будет перегружена или что мы не превысим согласованный SLA с какой-либо внешней службой. Компонент регулирования также может использоваться для синхронных сообщений.
Все компоненты поддерживают кластер.
Детали реализации
Все, что нужно сохранить OpenHub, сохраняется в базе данных — ограничений по типу нет. Нет необходимости адаптировать систему JMS / MQ для поддержки асинхронного обмена сообщениями. Тогда вы можете использовать любые инструменты, которые вам нравятся для вашей повседневной работы — модель данных проста, понятна и хорошо документирована. Существует гораздо больше инструментов для баз данных, чем для систем JMS / MQ.
Иногда мы слышим, что использование базы данных в этом случае запрещено, в некоторых случаях это может быть узким местом с точки зрения производительности. Это зависит от вариантов использования интеграции из реальных проектов, но мы еще не приняли ограничения производительности в наших реальных проектах, где обрабатываются сотни тысяч одновременных запросов. Мы готовы добавить реализацию JMS / MQ, но пока она не нужна.
Нет необходимости запускать асинхронный процесс только по входящему запросу — вы также можете использовать планирование заданий для запуска маршрута в любое время и затем оставить его на платформе OpenHub.
Все примеры можно найти в справочной реализации на GitHub — см. Https://github.com/OpenWiseSolutions/openhub-ri