Статьи

Создание сервиса Camel-CXF REST в OSGi для Карафа — многоадресная рассылка и агрегация

Пожалуйста, ознакомьтесь с моим другим постом о создании простых CXF-сервисов (без Camel) в OSGi на Karaf.

Это базовый урок о том, как

  1. создать службу CXF REST
  2. многоадресная (и распараллеливание) входящего запроса с использованием Camel
  3. исходные данные из двух разных сервисов
  4. агрегировать ответ и
  5. наконец, вернуть консолидированный результат как JSON конечному пользователю.

Вы можете скачать всю кодовую базу с GitHub .

Что это приложение делает, простыми словами

Ожидаемый результат от этого сервиса — жестко закодированный ответ, который выглядит следующим образом:

FinalOutput

Как видно из изображения, верхняя часть ответа поступает из службы NameEmailService а вторая часть ответа — из службы AgePhoneService . Призывы к обогащению данных выполняются одновременно, и заполняется консолидированная сущность результата — ConsolidatedSearchResult .

Структура проекта выглядит следующим образом:

ProjectStructure2

Для шага 1 есть два маленьких шага.

Шаг 1.a — Создание службы CXF REST

Как вы уже догадались, в этом шаге нет ничего сложного. Просто интерфейс и реализация.

RESTClassDiagram2

Интерфейс

1
2
3
4
5
6
7
8
9
@Path("rest")
public interface RestService {
 
    @GET
    @Path("query/{queryString}")
    @Produces(MediaType.APPLICATION_JSON)
    public String sourceResultsFromTwoSources(@PathParam("queryString") String queryString);
 
}

Реализация

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class RestServiceImpl implements RestService {
 
    private static Logger logger= LoggerFactory.getLogger(AgePhoneServiceImpl.class);
 
    private NameEmailService nameEmailService;
    private AgePhoneService agePhoneService;
 
    public RestServiceImpl(){
    }
 
    //Do nothing. Camel intercepts and routes the requests
    public String sourceResultsFromTwoSources(String queryString) {
        return null;
    }
 
    public NameEmailResult getNameEmailResult(String queryString){
        logger.info("Invoking getNameEmailResult from RestServiceImpl");
        return nameEmailService.getNameAndEmail(queryString);
    }
 
    public AgePhoneResult getAgePhoneResult(String queryString){
        logger.info("Invoking getAgePhoneResult from RestServiceImpl");
        return agePhoneService.getAgePhoneResult(queryString);
    }
 
    public NameEmailService getNameEmailService() {
        return nameEmailService;
    }
 
    public AgePhoneService getAgePhoneService() {
        return agePhoneService;
    }
 
    public void setNameEmailService(NameEmailService nameEmailService) {
        this.nameEmailService = nameEmailService;
    }
 
    public void setAgePhoneService(AgePhoneService agePhoneService) {
        this.agePhoneService = agePhoneService;
    }
}

Обратите внимание, что реализация метода sourceResultsFromTwoSources возвращает значение NULL. Правда в том, что этот метод даже не вызывается при вызове REST. Camel перехватывает все запросы к URL-адресу и направляет его к различным конечным точкам (в нашем случае вызывает два метода — getNameEmailResult() и getAgePhoneResult() ).

Шаг 1.b — Создание реализации сервиса

Детские реализации NameEmailService и AgePhoneService приведены ниже:

ServiceClassDiagram2

NameEmailServiceImpl

1
2
3
4
5
6
7
8
9
public class NameEmailServiceImpl implements NameEmailService {
 
    public NameEmailResult getNameAndEmail(String queryString){
 
        return new NameEmailResult("Arun", "[email protected]");
 
    }
 
}

AgePhoneServiceImpl

1
2
3
4
5
6
public class AgePhoneServiceImpl implements AgePhoneService {
 
    public AgePhoneResult getAgePhoneResult(String queryString){
        return new AgePhoneResult(32, "111-222-333");
    }
}

Шаг 2, 3, 4 и 5

Ну, я солгал, когда сказал, что 2,3,4 и 5 были 4 шагами. Все они выполняются в один этап с использованием маршрутизации Camel и его реализаций Enterprise Integration Pattern.

RestToBeanRouter

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
public class RestToBeanRouter extends RouteBuilder {
 
    @Override
    public void configure() throws Exception {
 
        from ("cxfrs://bean://rsServer")
                .multicast()
                .parallelProcessing()
                .aggregationStrategy(new ResultAggregator())
                .beanRef("restServiceImpl", "getNameEmailResult")
                .beanRef("restServiceImpl", "getAgePhoneResult")
                .end()
                .marshal().json(JsonLibrary.Jackson)
                .to("log://camelLogger?level=DEBUG");
    }
}

Наша маршрутизация объяснила

Проще говоря, наш маршрутизатор делает то, что он

1) from ("cxfrs://bean://rsServer") Перехватывает все запросы к конечной точке сервера JAX-RS, определенной в rest-blueprint.xml как

Остальное-blueprint.xml

1
2
3
<cxf:rsServer id="rsServer" address="/karafcxfcamel"
                  serviceClass="me.rerun.karafcxfcamel.rest.RestServiceImpl"
                  loggingFeatureEnabled="true" />

2) .multicast() пересылает исходный запрос без изменений

1
2
1. `getNameEmailResult`  &
2. `getAgePhoneResult` methods in `RestServiceImpl`

3) .parallelProcessing() размещает параллельные вызовы методов.

4) .aggregationStrategy(new ResultAggregator()) указывает, как должны быть агрегированы результаты из различных многоадресных источников.

Наш агрегатор выглядит так:

ResultAggregator

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ResultAggregator implements AggregationStrategy {
 
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
 
        ConsolidatedSearchResult consolidatedSearchResult=null;
 
        if (oldExchange==null){
            consolidatedSearchResult=new ConsolidatedSearchResult();
        }
        else{
            consolidatedSearchResult=oldExchange.getIn().getBody(ConsolidatedSearchResult.class);
        }
 
        NameEmailResult nameEmailResult=newExchange.getIn().getBody(NameEmailResult.class);
        AgePhoneResult agePhoneResult=newExchange.getIn().getBody(AgePhoneResult.class);
 
        if (nameEmailResult!=null){
            consolidatedSearchResult.setNameEmailResult(nameEmailResult);
        }
 
        if (agePhoneResult!=null){
            consolidatedSearchResult.setAgePhoneResult(agePhoneResult);
        }
 
        newExchange.getIn().setBody(consolidatedSearchResult);
 
        return newExchange;
    }
}

Наш Агрегатор объяснил

Метод aggregate в нашем ResultAggregator немного грубоват, но выполняет свою работу.

  1. Метод aggregate вызывается для всех многоадресных конечных точек всякий раз, когда они завершаются.
  2. Итак, в первый раз oldExchange будет нулевым. Мы принимаем это как возможность для создания окончательной консолидированной сущности результата, которую мы хотели ответить пользователю.
  3. Мы проверяем, является ли входящий newExchange результатом вызова NameEmailService или AgePhoneService, и соответственно заполняем объединенную сущность.
  4. Наконец, мы возвращаем консолидированный объект — возвращающий выполняет две работы.
    1. Консолидированная сущность появляется как oldExchange для следующего вызова aggregate метода. (больше похоже на цепочку — последний возвращенный объект от объекта является тем, который входит в качестве входящего обмена для следующего вызова)
    2. Возвращает обратно пользователю, если это последний вызов aggregate (все вызовы конечных точек многоадресной передачи завершены).