Spring 5 (и Boot 2, когда он прибудет через пару недель ) — это революция. Это не революция типа « аннотации через XML » или « классы Java над аннотациями ». Это действительно революционная структура, которая позволяет создавать совершенно новый класс приложений. В последние годы меня это немного пугало. «Spring Cloud — это фреймворк, упрощающий использование Spring Boot, фреймворк, упрощающий использование Spring, являющийся фреймворком, упрощающим развитие предприятия». В файле start.spring.io (также известном как « начало… точечная пружина… точка I… O ») перечислены 120 различных модулей (!), которые вы можете добавить к своему сервису. Весна в эти дни стала огромным зонтичным проектом, и я могу себе представить, почему некоторые люди (все еще!) Предпочитают Java EE (или как это называется в наши дни).
Но весна 5 приносит реактивную революцию. Это больше не просто оболочка для блокировки API сервлетов и различных веб-фреймворков. Spring 5 поверх Project Reactor позволяет создавать высокопроизводительные, чрезвычайно быстрые и масштабируемые серверы, полностью избегая стека сервлетов. Черт, на CLASSPATH нет ни Jetty, ни даже сервлетного API! В основе Spring 5 веб-потока мы найдем Netty , низкоуровневую инфраструктуру для написания асинхронных клиентов и серверов. Наконец, Spring становится первоклассным гражданином в семье реактивных рамок. Разработчики Java могут реализовывать быстрые сервисы, не выходя из зоны комфорта и выбирая https://doc.akka.io/docs/akka-http/current/ или https://www.playframework.com/ . Spring 5 — это полностью реактивный, современный инструмент для создания масштабируемых и устойчивых приложений. Тем не менее, базовые принципы, такие как контроллеры, компоненты, внедрение зависимостей, одинаковы. Более того, путь обновления гладкий, и мы можем постепенно добавлять новые функции, а не изучать совершенно новую, инопланетную среду. Хватит говорить, давайте напишем немного кода.
В этой статье мы напишем простое приложение без заголовка, которое индексирует документы в ElasticSearch в большом объеме. Мы стремимся к тысячам одновременных соединений с несколькими потоками, даже когда сервер работает медленно. Однако, в отличие, например, от Spring Data MongoDB, Spring Data ElasticSearch изначально не поддерживает неблокирующие репозитории. Ну, последний, кажется, даже не поддерживается, с текущей версией, которой 3 года. Многие статьи нацелены на Spring 5 + MongoDB с его репозиториями, возвращающими неблокирующие потоки ( Flux или Flowable из RxJava). Этот будет немного более продвинутым.
Java API ElasticSearch 6 использует интерфейс RESTful и реализован с использованием неблокирующего HTTP-клиента. К сожалению, он использует обратные вызовы, а не что-то вменяемое, как CompletableFuture . Итак, давайте создадим клиентский адаптер сами.
Клиент ElasticSearch, использующий Fluxes и Monos
Исходный код этой статьи доступен по адресу github.com/nurkiewicz/elastic-flux в разделе « reactive-elastic-search
Мы хотели бы создать Java-клиент ElasticSearch, который поддерживает Project Reactor, возвращая Flux или Mono . Конечно, мы получаем наибольшее преимущество, если основной поток полностью асинхронный и не использует потоки. К счастью, Java API просто так. Во-первых, давайте настроим клиент ElasticSearch как бин Spring:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient; @BeanRestHighLevelClient restHighLevelClient() { return new RestHighLevelClient( RestClient .builder(new HttpHost("localhost", 9200)) .setRequestConfigCallback(config -> config .setConnectTimeout(5_000) .setConnectionRequestTimeout(5_000) .setSocketTimeout(5_000) ) .setMaxRetryTimeoutMillis(5_000));} |
В реальной жизни мы бы, очевидно, параметризовали большую часть этого материала. Мы будем индексировать простые документы JSON, на данный момент их содержание не имеет значения:
|
1
2
3
4
5
|
@Valueclass Doc { private final String username; private final String json;} |
Код, который мы напишем, оборачивает RestHighLevelClient и делает его еще более высокоуровневым , возвращая Mono<IndexResponse> . Mono очень похож на CompletableFuture но с двумя исключениями:
- это лениво — до тех пор, пока вы не подпишетесь, вычисления не начнутся
- в отличие от
CompletableFuture,Monoможет завершиться нормально, не выдавая никакого значения
Второе отличие всегда вводило меня в заблуждение. В RxJava 2.x есть два разных типа: Single (всегда завершается значением или ошибкой) и Maybe (как Mono ). Жаль, что Reactor не делает этого различия. Не имеет значения, как выглядит слой адаптера? API простого Elastic выглядит следующим образом:
|
01
02
03
04
05
06
07
08
09
10
11
|
client.indexAsync(indexRequest, new ActionListener() { @Override public void onResponse(IndexResponse indexResponse) { //got response } @Override public void onFailure(Exception e) { //got error }}); |
Вы можете видеть, куда это идет: ад обратного вызова . Вместо того, чтобы выставлять собственный ActionListener в качестве аргумента этой логике, давайте обернем его в Mono :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import org.elasticsearch.action.ActionListener;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType; import reactor.core.publisher.Mono;import reactor.core.publisher.MonoSink; private Mono<IndexResponse> indexDoc(Doc doc) { return Mono.create(sink -> { IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername()); indexRequest.source(doc.getJson(), XContentType.JSON); client.indexAsync(indexRequest, new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { sink.success(indexResponse); } @Override public void onFailure(Exception e) { sink.error(e); } }); });} |
Мы должны создать JSON-документ IndexRequest и отправить его через RESTful API. Но дело не в этом. Мы используем Mono.create() , у него есть некоторые недостатки, но об этом позже. Mono ленив, так что indexDoc() вызова indexDoc() недостаточно, HTTP-запрос к ElasticSearch не был сделан. Однако каждый раз, когда кто-то подписывается на этот одноэлементный источник, логика внутри create() будет выполняться. Решающими являются: sink.success() и sink.error() . Они распространяют результаты из ElasticSearch (исходящие из фонового асинхронного потока) в поток. Как использовать такой метод на практике? Это очень просто!
|
1
2
3
4
5
|
Doc doc = //...indexDoc(doc) .subscribe( indexResponse -> log.info("Got response") ); |
Конечно, истинная сила обработки реактивного потока заключается в составлении нескольких потоков. Но мы сделали первые шаги: преобразовали асинхронный API на основе обратного вызова в общий поток. Если вам (не) повезло использовать MongoDB, он имеет встроенную поддержку реактивных типов, таких как Mono или Flux прямо в репозиториях. То же самое касается Кассандры и Редиса . В следующей статье мы узнаем, как генерировать поддельные данные и одновременно индексировать их.
| Опубликовано на Java Code Geeks с разрешения Томаша Нуркевича, партнера нашей программы JCG. Смотрите оригинальную статью здесь: Spring, Reactor и ElasticSearch: от обратных вызовов до реактивных потоков
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |