Spring 5 (и Boot 2, когда он прибудет через пару недель ) — это революция. Это не революция типа « аннотации через XML » или « классы Java над аннотациями ». Это действительно революционная структура, которая позволяет создавать совершенно новый класс приложений. В последние годы меня это немного пугало. «Spring Cloud — это фреймворк, упрощающий использование Spring Boot, фреймворк, упрощающий использование Spring, являющийся фреймворком, упрощающим развитие предприятия». В файле start.spring.io (также известном как « начало… точечная пружина… точка I… O ») перечислены 120 различных модулей (!), которые вы можете добавить к своему сервису. Весна в эти дни стала огромным зонтичным проектом, и я могу себе представить, почему некоторые люди (все еще!) Предпочитают Java EE (или как это называется в наши дни).
Но весна 5 приносит реактивную революцию. Это больше не просто оболочка для блокировки API сервлетов и различных веб-фреймворков. Spring 5 поверх Project Reactor позволяет создавать высокопроизводительные, чрезвычайно быстрые и масштабируемые серверы, полностью избегая стека сервлетов. Черт, на CLASSPATH нет ни Jetty, ни даже сервлетного API! В основе Spring 5 веб-потока мы найдем Netty , низкоуровневую инфраструктуру для написания асинхронных клиентов и серверов. Наконец, Spring становится первоклассным гражданином в семье реактивных рамок. Разработчики Java могут реализовывать быстрые сервисы, не выходя из зоны комфорта и выбирая https://doc.akka.io/docs/akka-http/current/ или https://www.playframework.com/ . Spring 5 — это полностью реактивный, современный инструмент для создания масштабируемых и устойчивых приложений. Тем не менее, базовые принципы, такие как контроллеры, компоненты, внедрение зависимостей, одинаковы. Более того, путь обновления гладкий, и мы можем постепенно добавлять новые функции, а не изучать совершенно новую, инопланетную среду. Хватит говорить, давайте напишем немного кода.
В этой статье мы напишем простое приложение без заголовка, которое индексирует документы в ElasticSearch в большом объеме. Мы стремимся к тысячам одновременных соединений с несколькими потоками, даже когда сервер работает медленно. Однако, в отличие, например, от Spring Data MongoDB, Spring Data ElasticSearch изначально не поддерживает неблокирующие репозитории. Ну, последний, кажется, даже не поддерживается, с текущей версией, которой 3 года. Многие статьи нацелены на Spring 5 + MongoDB с его репозиториями, возвращающими неблокирующие потоки ( Flux
или Flowable из RxJava). Этот будет немного более продвинутым.
Java API ElasticSearch 6 использует интерфейс RESTful и реализован с использованием неблокирующего HTTP-клиента. К сожалению, он использует обратные вызовы, а не что-то вменяемое, как CompletableFuture
. Итак, давайте создадим клиентский адаптер сами.
Клиент ElasticSearch, использующий Fluxes и Monos
Исходный код этой статьи доступен по адресу github.com/nurkiewicz/elastic-flux в разделе « reactive-elastic-search
Мы хотели бы создать Java-клиент ElasticSearch, который поддерживает Project Reactor, возвращая Flux
или Mono
. Конечно, мы получаем наибольшее преимущество, если основной поток полностью асинхронный и не использует потоки. К счастью, Java API просто так. Во-первых, давайте настроим клиент ElasticSearch как бин Spring:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; @Bean RestHighLevelClient restHighLevelClient() { return new RestHighLevelClient( RestClient .builder( new HttpHost( "localhost" , 9200 )) .setRequestConfigCallback(config -> config .setConnectTimeout(5_000) .setConnectionRequestTimeout(5_000) .setSocketTimeout(5_000) ) .setMaxRetryTimeoutMillis(5_000)); } |
В реальной жизни мы бы, очевидно, параметризовали большую часть этого материала. Мы будем индексировать простые документы JSON, на данный момент их содержание не имеет значения:
1
2
3
4
5
|
@Value class Doc { private final String username; private final String json; } |
Код, который мы напишем, оборачивает RestHighLevelClient
и делает его еще более высокоуровневым , возвращая Mono<IndexResponse>
. Mono
очень похож на CompletableFuture
но с двумя исключениями:
- это лениво — до тех пор, пока вы не подпишетесь, вычисления не начнутся
- в отличие от
CompletableFuture
,Mono
может завершиться нормально, не выдавая никакого значения
Второе отличие всегда вводило меня в заблуждение. В RxJava 2.x есть два разных типа: Single
(всегда завершается значением или ошибкой) и Maybe
(как Mono
). Жаль, что Reactor не делает этого различия. Не имеет значения, как выглядит слой адаптера? API простого Elastic выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
|
client.indexAsync(indexRequest, new ActionListener() { @Override public void onResponse(IndexResponse indexResponse) { //got response } @Override public void onFailure(Exception e) { //got error } }); |
Вы можете видеть, куда это идет: ад обратного вызова . Вместо того, чтобы выставлять собственный ActionListener
в качестве аргумента этой логике, давайте обернем его в Mono
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; private Mono<IndexResponse> indexDoc(Doc doc) { return Mono.create(sink -> { IndexRequest indexRequest = new IndexRequest( "people" , "person" , doc.getUsername()); indexRequest.source(doc.getJson(), XContentType.JSON); client.indexAsync(indexRequest, new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { sink.success(indexResponse); } @Override public void onFailure(Exception e) { sink.error(e); } }); }); } |
Мы должны создать JSON-документ IndexRequest
и отправить его через RESTful API. Но дело не в этом. Мы используем Mono.create()
, у него есть некоторые недостатки, но об этом позже. Mono
ленив, так что indexDoc()
вызова indexDoc()
недостаточно, HTTP-запрос к ElasticSearch не был сделан. Однако каждый раз, когда кто-то подписывается на этот одноэлементный источник, логика внутри create()
будет выполняться. Решающими являются: sink.success()
и sink.error()
. Они распространяют результаты из ElasticSearch (исходящие из фонового асинхронного потока) в поток. Как использовать такой метод на практике? Это очень просто!
1
2
3
4
5
|
Doc doc = //... indexDoc(doc) .subscribe( indexResponse -> log.info( "Got response" ) ); |
Конечно, истинная сила обработки реактивного потока заключается в составлении нескольких потоков. Но мы сделали первые шаги: преобразовали асинхронный API на основе обратного вызова в общий поток. Если вам (не) повезло использовать MongoDB, он имеет встроенную поддержку реактивных типов, таких как Mono
или Flux
прямо в репозиториях. То же самое касается Кассандры и Редиса . В следующей статье мы узнаем, как генерировать поддельные данные и одновременно индексировать их.
Опубликовано на Java Code Geeks с разрешения Томаша Нуркевича, партнера нашей программы JCG. Смотрите оригинальную статью здесь: Spring, Reactor и ElasticSearch: от обратных вызовов до реактивных потоков
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |