Quarkus предоставляет несколько различных возможностей реагирования на сообщения. Проект open-source cloud-native-starter использует некоторые из этих возможностей в примере приложения, которое описано в этой статье.
Существует несколько простых руководств Quarkus по теме «Реактивные сообщения». Учебники — это здорово, но для меня лучший способ освоить новые технологии — это использовать их в простых приложениях. Вот почему я придумал простой сценарий.
Вам также может понравиться: Как писать реактивные приложения с MicroProfile
Образец заявки
Пример поставляется с веб-приложением, которое отображает ссылки на статьи с информацией об авторе в простом веб-приложении. Веб-приложение вызывает службу веб-API, которая реализует шаблон backend-for-frontend и вызывает микросервисы статей и авторов. Служба статей хранит данные в базе данных Postgres . Сообщения отправляются между микросервисами через Kafka. Эта диаграмма описывает архитектуру высокого уровня:
Одним из преимуществ реактивных моделей является возможность обновления веб-приложений путем отправки сообщений, а не получения обновлений. Это более эффективно и улучшает пользовательский опыт.
Следующее видео показывает, как статьи могут быть созданы с помощью REST API. Веб-приложение получает уведомления и добавляет новые статьи на страницу.
Следующая диаграмма объясняет процесс, который я расскажу более подробно в этой статье.
- Клиент API «отправки» вызывает конечную точку REST службы «статьи» для создания новой статьи.
- После создания статьи сообщение отправляется в Kafka.
- Служба web-API подписалась на сообщения Kafka, так что вызывается прослушиватель.
- При создании новых статей события передаются в веб-приложение.
Давайте внимательнее посмотрим на используемые технологии.
Отправка сообщений в памяти через шину событий Vert.X
«Articles» и сервис «web-API» были реализованы на Java с Quarkus. В обоих случаях я использовал подход с чистой архитектурой, когда код микросервиса организован в три пакета. Эти пакеты довольно независимы друг от друга и могут быть заменены другими реализациями.
- API : содержит конечные точки REST и обрабатывает входящие и исходящие сообщения.
- Бизнес : Содержит бизнес-логику микросервисных и бизнес-структур.
- Данные : содержит код для доступа к базам данных или другим микросервисам.
После сохранения новой статьи в базе данных Postgres сообщение отправляется в Kafka. Это инициируется бизнес-логикой, но фактический код находится на уровне API. Вот почему бизнес-уровень должен сначала отправить сообщение на уровень API.
Quarkus предоставляет механизм взаимодействия бинов через асинхронные сообщения, обеспечивая слабую связь. Посмотрите руководство Асинхронный обмен сообщениями между компонентами . Эта функциональность предоставляется через Eclipse Vert.x, который поставляется с Quarkus.
Вот код, который отправляет событие в памяти на уровень API:
Джава
1
import io.vertx.axle.core.eventbus.EventBus; ... EventBus bus; ...
2
private void sendMessageToKafka(Article article) {
3
bus.publish("com.ibm.articles.apis.NewArticleCreatedListener", article.id); }
На уровне API событие может быть использовано (см. Код ):
Джава
xxxxxxxxxx
1
import io.quarkus.vertx.ConsumeEvent; ... public void
2
sendMessageToKafka(String articleId) { ... }
Eclipse MicroProfile поддерживает еще один механизм для сообщений памяти. Причина, по которой я не использовал его, в данном случае, заключалась в том, что я не заставил его работать. Для меня аннотация @Outgoing работала только с методами, которые запускаются входящим событием или платформой (например, @PostConstruct). В моем случае я должен вызвать эту функциональность из бизнес-логики. Я не уверен, является ли это отсутствующей функцией в MircoProfile, дефектом или ошибкой пользователя. Я пытаюсь это выяснить.
В документации упоминается еще одна причина, по которой вам следует использовать шину событий Vert.x: «Функция асинхронной передачи сообщений позволяет отвечать на сообщения, которые не поддерживаются в MicroProfile Reactive Messaging. Однако она ограничена поведением с одним событием (без потока) и местные сообщения. «
Отправка сообщений Kafka через API Kafka
Затем, API-уровень сервиса ‘web-API’ должен отправить сообщение в Kafka. Чтобы настроить Kafka в Kubernetes, следуйте инструкциям из моей предыдущей статьи Доступ к Apache Kafka из Quarkus .
Eclipse MicroProfile Reactive Messaging предоставляет ту же аннотацию @Outgoing для этого, но я не смог заставить ее работать, так как мне пришлось запускать эту функцию вручную. Я хочу выяснить причину этого.
В качестве обходного пути я использовал вместо этого Kafka API . Использование довольно простое. К сожалению, похоже, что конфигурация не читается из того же файла application.properties, который использует MicroProfile. Вместо этого я должен был сделать это в коде :
Джава
xxxxxxxxxx
1
import io.vertx.core.Vertx; import
2
io.vertx.kafka.client.producer.KafkaProducer; ... Vertx vertx;
3
name = "kafka.bootstrap.servers") String kafkaBootstrapServer; (
4
KafkaProducer<String, String> producer; ... void
5
initKafkaClient() { Map<String, String> config = new HashMap<>();
6
config.put("bootstrap.servers", kafkaBootstrapServer);
7
config.put("key.serializer",
8
"org.apache.kafka.common.serialization.StringSerializer");
9
config.put("value.serializer",
10
"org.apache.kafka.common.serialization.StringSerializer"); producer =
11
KafkaProducer.create(vertx, config); } ... public void
12
sendMessageToKafka(String articleId) { try { KafkaProducerRecord<String,
13
String> record = KafkaProducerRecord.create("new-article-created", articleId);
14
producer.write(record, done -> System.out.println("Kafka message sent: new-
15
article-created - " + articleId)); } catch (Exception e) { // allow to run this
16
functionality if Kafka hasn't been set up } } Sending and receiving messages
17
via MicroProfile
Далее, сервис ‘web-API’ должен получить это сообщение от Kafka. Эта часть может быть легко реализована с помощью MicroProfile Reactive Messaging с помощью аннотации @Incoming. Вот код :
Джава
x
1
import org.eclipse.microprofile.reactive.messaging.Incoming; import
2
org.eclipse.microprofile.reactive.messaging.Outgoing; import
3
io.smallrye.reactive.messaging.annotations.Broadcast; ...
4
"new-article-created") (
5
"stream-new-article") public String process(String articleId) (
6
{ System.out.println("Kafka message received: new-article-created - " + articleId); return articleId; }
На следующем этапе уведомления о новых статьях должны быть переданы в веб-приложение. Для этого событие перенаправляется в конечную точку потоковой передачи через аннотации MicroProfile @Outgoing и @Broadcast.
Ознакомьтесь с руководством Использование Apache Kafka с реактивными сообщениями для получения дополнительной информации об этих аннотациях. По опыту разработчиков, это так просто, как только можно. Мне особенно нравится, что такие же аннотации могут использоваться для каналов Kafka, а также для обмена сообщениями в памяти.
Отправка событий в веб-приложения через отправленные сервером события
Последний шаг — потоковая передача событий в веб-приложения. Это делается с помощью событий, отправляемых сервером, и Quarkus очень прост в реализации. Конечная точка потоковой передачи получает сообщения через @Channel и пересылает их через @Produces (MediaType.SERVER_SENT_EVENTS) и @SseElementType (см. Код ):
Джава
x
1
import org.reactivestreams.Publisher; import
2
io.smallrye.reactive.messaging.annotations.Channel; import
3
org.jboss.resteasy.annotations.SseElementType; ... public class
4
NewArticlesStream { ("stream-new-article")
5
Publisher<String> newArticles;
6
"/server-sent-events") (
7
MediaType.SERVER_SENT_EVENTS) (
8
"text/plain") public Publisher<String> (
9
stream() { return newArticles; } }
В веб-приложении события могут использоваться через EventSource. В моем случае я отправляю только идентификатор статьи и обновляю список отображаемых статей (см. Код ). Кроме того, я также мог бы отправить полную информацию о статье в случае.
Джава
xxxxxxxxxx
1
let source = new EventSource(this.$store.state.endpoints.api + "server-sent-events"); let that = this; source.onmessage = function (event) { that.readArticles(); }; Next Steps
Если вы хотите узнать больше о реактивном программировании и реактивном обмене сообщениями, попробуйте код самостоятельно.