Пожалуйста, ознакомьтесь с моим другим постом о создании простых CXF-сервисов (без Camel) в OSGi на Karaf.
Это базовый урок о том, как
- создать службу CXF REST
- многоадресная (и распараллеливание) входящего запроса с использованием Camel
- исходные данные из двух разных сервисов
- агрегировать ответ и
- наконец, вернуть консолидированный результат как JSON конечному пользователю.
Вы можете скачать всю кодовую базу с GitHub .
Что это приложение делает, простыми словами
Ожидаемый результат от этого сервиса — жестко закодированный ответ, который выглядит следующим образом:
Как видно из изображения, верхняя часть ответа поступает из службы NameEmailService
а вторая часть ответа — из службы AgePhoneService
. Призывы к обогащению данных выполняются одновременно, и заполняется консолидированная сущность результата — ConsolidatedSearchResult
.
Структура проекта выглядит следующим образом:
Для шага 1 есть два маленьких шага.
Шаг 1.a — Создание службы CXF REST
Как вы уже догадались, в этом шаге нет ничего сложного. Просто интерфейс и реализация.
Интерфейс
1
2
3
4
5
6
7
8
9
|
@Path ( "rest" ) public interface RestService { @GET @Path ( "query/{queryString}" ) @Produces (MediaType.APPLICATION_JSON) public String sourceResultsFromTwoSources( @PathParam ( "queryString" ) String queryString); } |
Реализация
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class RestServiceImpl implements RestService { private static Logger logger= LoggerFactory.getLogger(AgePhoneServiceImpl. class ); private NameEmailService nameEmailService; private AgePhoneService agePhoneService; public RestServiceImpl(){ } //Do nothing. Camel intercepts and routes the requests public String sourceResultsFromTwoSources(String queryString) { return null ; } public NameEmailResult getNameEmailResult(String queryString){ logger.info( "Invoking getNameEmailResult from RestServiceImpl" ); return nameEmailService.getNameAndEmail(queryString); } public AgePhoneResult getAgePhoneResult(String queryString){ logger.info( "Invoking getAgePhoneResult from RestServiceImpl" ); return agePhoneService.getAgePhoneResult(queryString); } public NameEmailService getNameEmailService() { return nameEmailService; } public AgePhoneService getAgePhoneService() { return agePhoneService; } public void setNameEmailService(NameEmailService nameEmailService) { this .nameEmailService = nameEmailService; } public void setAgePhoneService(AgePhoneService agePhoneService) { this .agePhoneService = agePhoneService; } } |
Обратите внимание, что реализация метода sourceResultsFromTwoSources
возвращает значение NULL. Правда в том, что этот метод даже не вызывается при вызове REST. Camel перехватывает все запросы к URL-адресу и направляет его к различным конечным точкам (в нашем случае вызывает два метода — getNameEmailResult()
и getAgePhoneResult()
).
Шаг 1.b — Создание реализации сервиса
Детские реализации NameEmailService и AgePhoneService приведены ниже:
NameEmailServiceImpl
1
2
3
4
5
6
7
8
9
|
public class NameEmailServiceImpl implements NameEmailService { public NameEmailResult getNameAndEmail(String queryString){ } } |
AgePhoneServiceImpl
1
2
3
4
5
6
|
public class AgePhoneServiceImpl implements AgePhoneService { public AgePhoneResult getAgePhoneResult(String queryString){ return new AgePhoneResult( 32 , "111-222-333" ); } } |
Шаг 2, 3, 4 и 5
Ну, я солгал, когда сказал, что 2,3,4 и 5 были 4 шагами. Все они выполняются в один этап с использованием маршрутизации Camel и его реализаций Enterprise Integration Pattern.
RestToBeanRouter
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
public class RestToBeanRouter extends RouteBuilder { @Override public void configure() throws Exception { from ( "cxfrs://bean://rsServer" ) .multicast() .parallelProcessing() .aggregationStrategy( new ResultAggregator()) .beanRef( "restServiceImpl" , "getNameEmailResult" ) .beanRef( "restServiceImpl" , "getAgePhoneResult" ) .end() .marshal().json(JsonLibrary.Jackson) .to( "log://camelLogger?level=DEBUG" ); } } |
Наша маршрутизация объяснила
Проще говоря, наш маршрутизатор делает то, что он
1) from ("cxfrs://bean://rsServer")
Перехватывает все запросы к конечной точке сервера JAX-RS, определенной в rest-blueprint.xml
как
Остальное-blueprint.xml
1
2
3
|
<cxf:rsServer id= "rsServer" address= "/karafcxfcamel" serviceClass= "me.rerun.karafcxfcamel.rest.RestServiceImpl" loggingFeatureEnabled= "true" /> |
2) .multicast()
пересылает исходный запрос без изменений
1
2
|
1 . `getNameEmailResult` & 2 . `getAgePhoneResult` methods in `RestServiceImpl` |
3) .parallelProcessing()
размещает параллельные вызовы методов.
4) .aggregationStrategy(new ResultAggregator())
указывает, как должны быть агрегированы результаты из различных многоадресных источников.
Наш агрегатор выглядит так:
ResultAggregator
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class ResultAggregator implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { ConsolidatedSearchResult consolidatedSearchResult= null ; if (oldExchange== null ){ consolidatedSearchResult= new ConsolidatedSearchResult(); } else { consolidatedSearchResult=oldExchange.getIn().getBody(ConsolidatedSearchResult. class ); } NameEmailResult nameEmailResult=newExchange.getIn().getBody(NameEmailResult. class ); AgePhoneResult agePhoneResult=newExchange.getIn().getBody(AgePhoneResult. class ); if (nameEmailResult!= null ){ consolidatedSearchResult.setNameEmailResult(nameEmailResult); } if (agePhoneResult!= null ){ consolidatedSearchResult.setAgePhoneResult(agePhoneResult); } newExchange.getIn().setBody(consolidatedSearchResult); return newExchange; } } |
Наш Агрегатор объяснил
Метод aggregate
в нашем ResultAggregator немного грубоват, но выполняет свою работу.
- Метод
aggregate
вызывается для всех многоадресных конечных точек всякий раз, когда они завершаются. - Итак, в первый раз oldExchange будет нулевым. Мы принимаем это как возможность для создания окончательной консолидированной сущности результата, которую мы хотели ответить пользователю.
- Мы проверяем, является ли входящий newExchange результатом вызова NameEmailService или AgePhoneService, и соответственно заполняем объединенную сущность.
- Наконец, мы возвращаем консолидированный объект — возвращающий выполняет две работы.
- Консолидированная сущность появляется как oldExchange для следующего вызова
aggregate
метода. (больше похоже на цепочку — последний возвращенный объект от объекта является тем, который входит в качестве входящего обмена для следующего вызова) - Возвращает обратно пользователю, если это последний вызов
aggregate
(все вызовы конечных точек многоадресной передачи завершены).
- Консолидированная сущность появляется как oldExchange для следующего вызова