Статьи

Spring, Reactor и ElasticSearch: от обратных вызовов до реактивных потоков

Spring 5 (и Boot 2, когда он прибудет через пару недель ) — это революция. Это не революция типа « аннотации через XML » или « классы Java над аннотациями ». Это действительно революционная структура, которая позволяет создавать совершенно новый класс приложений. В последние годы меня это немного пугало. «Spring Cloud — это фреймворк, упрощающий использование Spring Boot, фреймворк, упрощающий использование Spring, являющийся фреймворком, упрощающим развитие предприятия». В файле start.spring.io (также известном как « начало… точечная пружина… точка I… O ») перечислены 120 различных модулей (!), которые вы можете добавить к своему сервису. Весна в эти дни стала огромным зонтичным проектом, и я могу себе представить, почему некоторые люди (все еще!) Предпочитают Java EE (или как это называется в наши дни).

Но весна 5 приносит реактивную революцию. Это больше не просто оболочка для блокировки API сервлетов и различных веб-фреймворков. Spring 5 поверх Project Reactor позволяет создавать высокопроизводительные, чрезвычайно быстрые и масштабируемые серверы, полностью избегая стека сервлетов. Черт, на CLASSPATH нет ни Jetty, ни даже сервлетного API! В основе Spring 5 веб-потока мы найдем Netty , низкоуровневую инфраструктуру для написания асинхронных клиентов и серверов. Наконец, Spring становится первоклассным гражданином в семье реактивных рамок. Разработчики Java могут реализовывать быстрые сервисы, не выходя из зоны комфорта и выбирая https://doc.akka.io/docs/akka-http/current/ или https://www.playframework.com/ . Spring 5 — это полностью реактивный, современный инструмент для создания масштабируемых и устойчивых приложений. Тем не менее, базовые принципы, такие как контроллеры, компоненты, внедрение зависимостей, одинаковы. Более того, путь обновления гладкий, и мы можем постепенно добавлять новые функции, а не изучать совершенно новую, инопланетную среду. Хватит говорить, давайте напишем немного кода.

В этой статье мы напишем простое приложение без заголовка, которое индексирует документы в ElasticSearch в большом объеме. Мы стремимся к тысячам одновременных соединений с несколькими потоками, даже когда сервер работает медленно. Однако, в отличие, например, от Spring Data MongoDB, Spring Data ElasticSearch изначально не поддерживает неблокирующие репозитории. Ну, последний, кажется, даже не поддерживается, с текущей версией, которой 3 года. Многие статьи нацелены на Spring 5 + MongoDB с его репозиториями, возвращающими неблокирующие потоки ( Flux или Flowable из RxJava). Этот будет немного более продвинутым.

Java API ElasticSearch 6 использует интерфейс RESTful и реализован с использованием неблокирующего HTTP-клиента. К сожалению, он использует обратные вызовы, а не что-то вменяемое, как CompletableFuture . Итак, давайте создадим клиентский адаптер сами.

Клиент ElasticSearch, использующий Fluxes и Monos

Исходный код этой статьи доступен по адресу github.com/nurkiewicz/elastic-flux в разделе « reactive-elastic-search

Мы хотели бы создать Java-клиент ElasticSearch, который поддерживает Project Reactor, возвращая Flux или Mono . Конечно, мы получаем наибольшее преимущество, если основной поток полностью асинхронный и не использует потоки. К счастью, Java API просто так. Во-первых, давайте настроим клиент ElasticSearch как бин Spring:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
  
@Bean
RestHighLevelClient restHighLevelClient() {
    return new RestHighLevelClient(
            RestClient
                    .builder(new HttpHost("localhost", 9200))
                    .setRequestConfigCallback(config -> config
                            .setConnectTimeout(5_000)
                            .setConnectionRequestTimeout(5_000)
                            .setSocketTimeout(5_000)
                    )
                    .setMaxRetryTimeoutMillis(5_000));
}

В реальной жизни мы бы, очевидно, параметризовали большую часть этого материала. Мы будем индексировать простые документы JSON, на данный момент их содержание не имеет значения:

1
2
3
4
5
@Value
class Doc {
    private final String username;
    private final String json;
}

Код, который мы напишем, оборачивает RestHighLevelClient и делает его еще более высокоуровневым , возвращая Mono<IndexResponse> . Mono очень похож на CompletableFuture но с двумя исключениями:

  • это лениво — до тех пор, пока вы не подпишетесь, вычисления не начнутся
  • в отличие от CompletableFuture , Mono может завершиться нормально, не выдавая никакого значения

Второе отличие всегда вводило меня в заблуждение. В RxJava 2.x есть два разных типа: Single (всегда завершается значением или ошибкой) и Maybe (как Mono ). Жаль, что Reactor не делает этого различия. Не имеет значения, как выглядит слой адаптера? API простого Elastic выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
client.indexAsync(indexRequest, new ActionListener() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        //got response
    }
  
    @Override
    public void onFailure(Exception e) {
        //got error
    }
});

Вы можете видеть, куда это идет: ад обратного вызова . Вместо того, чтобы выставлять собственный ActionListener в качестве аргумента этой логике, давайте обернем его в Mono :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
  
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
  
private Mono<IndexResponse> indexDoc(Doc doc) {
    return Mono.create(sink -> {
        IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername());
        indexRequest.source(doc.getJson(), XContentType.JSON);
        client.indexAsync(indexRequest, new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                sink.success(indexResponse);
            }
  
            @Override
            public void onFailure(Exception e) {
                sink.error(e);
            }
        });
    });
}

Мы должны создать JSON-документ IndexRequest и отправить его через RESTful API. Но дело не в этом. Мы используем Mono.create() , у него есть некоторые недостатки, но об этом позже. Mono ленив, так что indexDoc() вызова indexDoc() недостаточно, HTTP-запрос к ElasticSearch не был сделан. Однако каждый раз, когда кто-то подписывается на этот одноэлементный источник, логика внутри create() будет выполняться. Решающими являются: sink.success() и sink.error() . Они распространяют результаты из ElasticSearch (исходящие из фонового асинхронного потока) в поток. Как использовать такой метод на практике? Это очень просто!

1
2
3
4
5
Doc doc = //...
indexDoc(doc)
        .subscribe(
                indexResponse -> log.info("Got response")
        );

Конечно, истинная сила обработки реактивного потока заключается в составлении нескольких потоков. Но мы сделали первые шаги: преобразовали асинхронный API на основе обратного вызова в общий поток. Если вам (не) повезло использовать MongoDB, он имеет встроенную поддержку реактивных типов, таких как Mono или Flux прямо в репозиториях. То же самое касается Кассандры и Редиса . В следующей статье мы узнаем, как генерировать поддельные данные и одновременно индексировать их.

Опубликовано на Java Code Geeks с разрешения Томаша Нуркевича, партнера нашей программы JCG. Смотрите оригинальную статью здесь: Spring, Reactor и ElasticSearch: от обратных вызовов до реактивных потоков

Мнения, высказанные участниками Java Code Geeks, являются их собственными.