Пожалуйста, ознакомьтесь с моим другим постом о создании простых CXF-сервисов (без Camel) в OSGi на Karaf.
Это базовый урок о том, как
- создать службу CXF REST
- многоадресная (и распараллеливание) входящего запроса с использованием Camel
- исходные данные из двух разных сервисов
- агрегировать ответ и
- наконец, вернуть консолидированный результат как JSON конечному пользователю.
Вы можете скачать всю кодовую базу с GitHub .
Что это приложение делает, простыми словами
Ожидаемый результат от этого сервиса — жестко закодированный ответ, который выглядит следующим образом:
Как видно из изображения, верхняя часть ответа поступает из службы NameEmailService а вторая часть ответа — из службы AgePhoneService . Призывы к обогащению данных выполняются одновременно, и заполняется консолидированная сущность результата — ConsolidatedSearchResult .
Структура проекта выглядит следующим образом:
Для шага 1 есть два маленьких шага.
Шаг 1.a — Создание службы CXF REST
Как вы уже догадались, в этом шаге нет ничего сложного. Просто интерфейс и реализация.
Интерфейс
|
1
2
3
4
5
6
7
8
9
|
@Path("rest")public interface RestService { @GET @Path("query/{queryString}") @Produces(MediaType.APPLICATION_JSON) public String sourceResultsFromTwoSources(@PathParam("queryString") String queryString);} |
Реализация
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class RestServiceImpl implements RestService { private static Logger logger= LoggerFactory.getLogger(AgePhoneServiceImpl.class); private NameEmailService nameEmailService; private AgePhoneService agePhoneService; public RestServiceImpl(){ } //Do nothing. Camel intercepts and routes the requests public String sourceResultsFromTwoSources(String queryString) { return null; } public NameEmailResult getNameEmailResult(String queryString){ logger.info("Invoking getNameEmailResult from RestServiceImpl"); return nameEmailService.getNameAndEmail(queryString); } public AgePhoneResult getAgePhoneResult(String queryString){ logger.info("Invoking getAgePhoneResult from RestServiceImpl"); return agePhoneService.getAgePhoneResult(queryString); } public NameEmailService getNameEmailService() { return nameEmailService; } public AgePhoneService getAgePhoneService() { return agePhoneService; } public void setNameEmailService(NameEmailService nameEmailService) { this.nameEmailService = nameEmailService; } public void setAgePhoneService(AgePhoneService agePhoneService) { this.agePhoneService = agePhoneService; }} |
Обратите внимание, что реализация метода sourceResultsFromTwoSources возвращает значение NULL. Правда в том, что этот метод даже не вызывается при вызове REST. Camel перехватывает все запросы к URL-адресу и направляет его к различным конечным точкам (в нашем случае вызывает два метода — getNameEmailResult() и getAgePhoneResult() ).
Шаг 1.b — Создание реализации сервиса
Детские реализации NameEmailService и AgePhoneService приведены ниже:
NameEmailServiceImpl
|
1
2
3
4
5
6
7
8
9
|
public class NameEmailServiceImpl implements NameEmailService { public NameEmailResult getNameAndEmail(String queryString){ return new NameEmailResult("Arun", "arun@arunma.com"); }} |
AgePhoneServiceImpl
|
1
2
3
4
5
6
|
public class AgePhoneServiceImpl implements AgePhoneService { public AgePhoneResult getAgePhoneResult(String queryString){ return new AgePhoneResult(32, "111-222-333"); }} |
Шаг 2, 3, 4 и 5
Ну, я солгал, когда сказал, что 2,3,4 и 5 были 4 шагами. Все они выполняются в один этап с использованием маршрутизации Camel и его реализаций Enterprise Integration Pattern.
RestToBeanRouter
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
public class RestToBeanRouter extends RouteBuilder { @Override public void configure() throws Exception { from ("cxfrs://bean://rsServer") .multicast() .parallelProcessing() .aggregationStrategy(new ResultAggregator()) .beanRef("restServiceImpl", "getNameEmailResult") .beanRef("restServiceImpl", "getAgePhoneResult") .end() .marshal().json(JsonLibrary.Jackson) .to("log://camelLogger?level=DEBUG"); }} |
Наша маршрутизация объяснила
Проще говоря, наш маршрутизатор делает то, что он
1) from ("cxfrs://bean://rsServer") Перехватывает все запросы к конечной точке сервера JAX-RS, определенной в rest-blueprint.xml как
Остальное-blueprint.xml
|
1
2
3
|
<cxf:rsServer id="rsServer" address="/karafcxfcamel" serviceClass="me.rerun.karafcxfcamel.rest.RestServiceImpl" loggingFeatureEnabled="true" /> |
2) .multicast() пересылает исходный запрос без изменений
|
1
2
|
1. `getNameEmailResult` &2. `getAgePhoneResult` methods in `RestServiceImpl` |
3) .parallelProcessing() размещает параллельные вызовы методов.
4) .aggregationStrategy(new ResultAggregator()) указывает, как должны быть агрегированы результаты из различных многоадресных источников.
Наш агрегатор выглядит так:
ResultAggregator
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class ResultAggregator implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { ConsolidatedSearchResult consolidatedSearchResult=null; if (oldExchange==null){ consolidatedSearchResult=new ConsolidatedSearchResult(); } else{ consolidatedSearchResult=oldExchange.getIn().getBody(ConsolidatedSearchResult.class); } NameEmailResult nameEmailResult=newExchange.getIn().getBody(NameEmailResult.class); AgePhoneResult agePhoneResult=newExchange.getIn().getBody(AgePhoneResult.class); if (nameEmailResult!=null){ consolidatedSearchResult.setNameEmailResult(nameEmailResult); } if (agePhoneResult!=null){ consolidatedSearchResult.setAgePhoneResult(agePhoneResult); } newExchange.getIn().setBody(consolidatedSearchResult); return newExchange; }} |
Наш Агрегатор объяснил
Метод aggregate в нашем ResultAggregator немного грубоват, но выполняет свою работу.
- Метод
aggregateвызывается для всех многоадресных конечных точек всякий раз, когда они завершаются. - Итак, в первый раз oldExchange будет нулевым. Мы принимаем это как возможность для создания окончательной консолидированной сущности результата, которую мы хотели ответить пользователю.
- Мы проверяем, является ли входящий newExchange результатом вызова NameEmailService или AgePhoneService, и соответственно заполняем объединенную сущность.
- Наконец, мы возвращаем консолидированный объект — возвращающий выполняет две работы.
- Консолидированная сущность появляется как oldExchange для следующего вызова
aggregateметода. (больше похоже на цепочку — последний возвращенный объект от объекта является тем, который входит в качестве входящего обмена для следующего вызова) - Возвращает обратно пользователю, если это последний вызов
aggregate(все вызовы конечных точек многоадресной передачи завершены).
- Консолидированная сущность появляется как oldExchange для следующего вызова



