Статьи

Ускорьте сервисы с помощью Reactive API в Java EE 8

Службы часто можно оптимизировать с помощью асинхронной обработки, даже не меняя их поведение по отношению к внешнему миру.

Причина, по которой некоторые службы неэффективны, заключается в том, что им нужно ждать, пока другие службы не дадут результата, чтобы продолжить работу.

Давайте посмотрим, как вызывать внешние службы REST, не ожидая их, а также независимо выполнять несколько параллельных вызовов, а затем объединить их результаты с реактивным конвейером в Java EE 8.

Если наш сервис вызывает несколько микросервисов и ожидает завершения каждого вызова и возвращает результаты, прежде чем делать другой вызов, это хороший кандидат на рефакторинг с использованием реактивного API. Чтобы сделать службу более эффективной, она может выполнять все вызовы внешних служб параллельно, если они не зависят друг от друга. Это уменьшит время ожидания и, следовательно, ускорит микросервис.

Для параллельного вызова служб REST мы будем использовать новый клиентский API с реактивным интерфейсом в JAX-RS. Мы объединим его с библиотекой RxJava, чтобы объединить их результаты, когда они будут доступны. Эта комбинация позволит нам писать чистый и эффективный код. И с дополнительным преимуществом, что текущий поток может быть освобожден для дальнейшей обработки, ожидая результатов от удаленных вызовов.

Мы создадим конвейер, который обрабатывает результаты по мере их поступления и, наконец, объединяет их в один ответ. Первая часть конвейера будет вызывать каждый удаленный сервис. Вместо того, чтобы ждать результатов, мы укажем, что делать с каждым полученным результатом, и продолжим звонить в другие службы. Использование метода rx () в построителе клиентских запросов JAX-RS позволяет нам вызывать версию метода get() , которая немедленно возвращается, а не ждет результата. Чтобы обработать результаты, когда они поступят, мы можем связать обработчики методов в CompletionStage, возвращенном из rx-версии метода get()

1
2
3
4
5
CompletionStage stage = temperatureServiceTarget
  .request()
  .rx()
  .get(Temperature.class)
  .thenApply(temperature -> new Forecast(temperature));

Приведенный выше код вызовет температурную службу, а затем зарегистрирует лямбда-выражение для обработки результирующей температуры при ее поступлении. Это отображает температуру на прогнозируемый объект, к которому можно обратиться с помощью переменной stage позже.

Однако мы хотим использовать другой вариант метода get() вместе с RxJava Flowable Invoker из проекта Джерси, чтобы получить Flowable из RxJava вместо CompletionStage . Интерфейс Flowable упрощает объединение нескольких асинхронных результатов с гораздо более простым кодом, чем CompletionStage, а также более эффективно.

С помощью следующего кода мы будем вызывать внешнюю службу и возвращать Flowable:

1
2
3
4
5
6
Flowable flowable = temperatureServiceTarget
  .register(RxFlowableInvokerProvider.class)
  .request()
  .rx(RxFlowableInvoker.class)
  .get(Temperature.class)
  .map(temperature -> new Forecast(temperature);

Мы регистрируем дополнительный RxFlowableInvokerProvider , который позволяет запросить RxFlowableInvoker позже. Затем этот вызывающий объект возвращает нам тип возвращаемого значения Flowable из RxJava. Эти классы отсутствуют в JAX-RS API, и мы должны добавить их с помощью библиотеки Jersey RxJava2:

1
2
3
4
5
<dependency>
  <groupId>org.glassfish.jersey.ext.rx</groupId>
  <artifactId>jersey-rx-client-rxjava2</artifactId>
  <version>2.26</version>
</dependency>

На первый взгляд кажется, что мы сделали код более сложным, делая то же самое. Но экземпляр Flowable позволяет нам легко комбинировать несколько вызовов:

01
02
03
04
05
06
07
08
09
10
Flowable.concat(flowable1, flowable2)
  .doOnNext(forecast -> {
    forecasts.add(forecast);
  })
  .doOnComplete(() -> {
    asyncResponse.resume(Response.ok(forecasts).build());
  })
  .doOnError(asyncResponse::resume)
  .subscribe();
}

Для каждого прогноза, полученного от любого текущего, мы добавляем его в список прогнозов. Наконец, мы отправляем список прогнозов в качестве ответа или отправляем сообщение об ошибке. Последний вызов subscribe() необходим для регистрации слушателей, иначе они будут проигнорированы.

Возможно, вы также заметили переменную asyncResponse используемую для отправки окончательного ответа или сообщения об ошибке. Это экземпляр асинхронного ответа JAX-RS, который используется для завершения ответа REST в более позднее время, когда данные доступны, без блокировки исходного потока обработки. Использование асинхронного ответа помогает нам экономить ресурсы потоков, ожидая результатов от внешних служб REST. Чтобы включить асинхронную обработку в нашей конечной точке REST, мы добавим javax.ws.rs.container.AsyncResponse в качестве аргумента метода REST вместе с аннотацией @Suspended . Мы также изменим тип возвращаемого значения на void, потому что ответ будет создаваться с использованием экземпляра AsyncResponse:

1
2
3
4
5
6
@GET
@Produces(MediaType.APPLICATION_JSON)
public void getForecasts(@Suspended AsyncResponse asyncResponse) {
  ...here come some asynchronous calls to REST services...
  asyncResponse.resume(...)
}

Финальный пример кода

Следующий код будет:

  • включить асинхронную обработку запросов REST в методе getForecasts
  • установить 5-минутный тайм-аут на асинхронный ответ
  • выполнить температурный режим дважды, для Лондона и Пекина, не дожидаясь результатов
  • объединить результаты в последовательность прогнозов
  • добавить каждый прогноз в последовательности в список
  • отправить полный список, когда все результаты обработаны
  • отправить ошибку в случае исключения
  • зарегистрировать обработчики с помощью метода подписки
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private Flowable getTemperature(String location) {
  return temperatureTarget
    .register(RxFlowableInvokerProvider.class)
    .resolveTemplate("city", location)
    .request()
    .rx(RxFlowableInvoker.class)
    .get(Temperature.class)
    .map(temperature -> new Forecast(location, temperature));
}
  
@GET
@Produces(MediaType.APPLICATION_JSON)
public void getForecasts(@Suspended AsyncResponse asyncResponse) {
  List forecasts = new ArrayList<>();
  asyncResponse.setTimeout(5, TimeUnit.MINUTES);
  Flowable.concat(getTemperature("London"), getTemperature("Beijing"))
    .doOnNext(forecast -> {
      forecasts.add(forecast);
    })
  .doOnComplete(() -> {
    asyncResponse.resume(Response.ok(forecasts).build());
  })
  .doOnError(asyncResponse::resume)
  .subscribe();
}
Опубликовано на Java Code Geeks с разрешения Ондрея Михали, партнера нашей программы JCG . Смотрите оригинальную статью здесь: Ускорьте сервисы с помощью Reactive API в Java EE 8

Мнения, высказанные участниками Java Code Geeks, являются их собственными.