обзор
Этот пример проекта демонстрирует, как создавать потоковые приложения в реальном времени, используя управляемую событиями архитектуру , Spring Boot , Spring Cloud Stream, Apache Kafka и Lombok .
К концу этого урока вы получите простой микросервис Greetings на основе Spring Boot, который работает
- принимает сообщение от REST API
- пишет это в тему Кафки
- читает это из темы
- выводит его на консоль
Давайте начнем!
Кстати, вы найдете исходный код здесь .
Что такое Spring Cloud Streaming?
Spring Cloud Stream — это платформа, основанная на Spring Boot для создания управляемых сообщениями микросервисов.
Что такое Кафка?
Kafka — популярная высокопроизводительная и горизонтально масштабируемая платформа обмена сообщениями, изначально разработанная LinkedIn.
Установка Кафки
Загрузите Кафку отсюда и распакуйте ее:
| 1 2 | > tar -xzf kafka_2.11-1.0.0.tgz> cd kafka_2.11-1.0.0 | 
Запустите Zookeeper и Kafka
В Windows:
| 1 2 | > bin\windows\zookeeper-server-start.bat config\zookeeper.properties> bin\windows\kafka-server-start.bat config\server.properties | 
В Linux или Mac:
| 1 2 | > bin/zookeeper-server-start.sh config/zookeeper.properties> bin/kafka-server-start.sh config/server.properties | 
 Если Kafka не работает и не запускается после выхода компьютера из спящего режима, удалите <TMP_DIR>/kafka-logs а затем снова запустите Kafka. 
Что такое Ломбок?
Lombok — это Java-фреймворк, который автоматически генерирует в коде геттеры, сеттеры, toString (), компоновщики, регистраторы и т. Д.
Maven зависимости
Перейдите на https://start.spring.io, чтобы создать проект maven:
-   Добавьте необходимые зависимости: Spring Cloud Stream,Kafka,Devtools(для горячихDevtoolsво время разработки, необязательно),Actuator(для приложения мониторинга, необязательно),Lombok(убедитесь, что в вашей IDE также установлен плагин Lombok)
- Нажмите кнопку «Создать проект», чтобы загрузить проект в виде zip-файла.
- Извлеките zip-файл и импортируйте проект maven в вашу любимую IDE
  Обратите внимание на зависимости maven в файле pom.xml : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-actuator</artifactId>  </dependency>  <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-stream</artifactId>  </dependency>  <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-starter-stream-kafka</artifactId>  </dependency>  <!-- Also install the Lombok plugin in your IDE -->  <dependency>      <groupId>org.projectlombok</groupId>      <artifactId>lombok</artifactId>      <optional>true</optional>  </dependency>  <!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running -->  <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-devtools</artifactId>      <optional>true</optional>  </dependency> | 
  … Также раздел <dependencyManagement> : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 | <dependencyManagement>  <dependencies>    <dependency>      <!-- Import dependency management from Spring Boot -->      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-dependencies</artifactId>      <version>${spring-boot.version}</version>      <type>pom</type>      <scope>import</scope>    </dependency>    <dependency>      <groupId>org.springframework.cloud</groupId>      <artifactId>spring-cloud-stream-dependencies</artifactId>      <version>${spring-cloud-stream.version}</version>      <type>pom</type>      <scope>import</scope>    </dependency>  </dependencies>    </dependencyManagement> | 
  … и раздел <repository> : 
| 1 2 3 4 5 6 7 8 | <repository>  <id>spring-milestones</id>  <name>Spring Milestones</name>  <snapshots>    <enabled>false</enabled>  </snapshots></repository> | 
Определите потоки Кафки
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 | packagecom.kaviddiss.streamkafka.stream;importorg.springframework.cloud.stream.annotation.Input;importorg.springframework.cloud.stream.annotation.Output;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.SubscribableChannel;  publicinterfaceGreetingsStreams {    String INPUT = "greetings-in";    String OUTPUT = "greetings-out";    @Input(INPUT)    SubscribableChannel inboundGreetings();    @Output(OUTPUT)    MessageChannel outboundGreetings();} | 
Чтобы наше приложение могло взаимодействовать с Kafka, нам нужно определить исходящий поток для записи сообщений в тему Kafka и входящий поток для чтения сообщений из темы Kafka.
Spring Cloud предоставляет удобный способ сделать это, просто создав интерфейс, который определяет отдельный метод для каждого потока.
  Метод inboundGreetings() определяет входящий поток для чтения из Kafka, а метод outboundGreetings() определяет исходящий поток для записи в Kafka. 
  Во время выполнения Spring создаст реализацию интерфейса GreetingsStreams основе прокси-сервера Java, которую можно внедрить в виде Spring Bean в любом месте кода для доступа к нашим двум потокам. 
Настроить Spring Cloud Stream
  Наш следующий шаг — настроить Spring Cloud Stream для привязки к нашим потокам в интерфейсе GreetingsStreams .  Это можно сделать, создав класс com.kaviddiss.streamkafka.config.StreamsConfig с кодом ниже: 
| 1 2 3 4 5 6 7 8 | packagecom.kaviddiss.streamkafka.config;importcom.kaviddiss.streamkafka.stream.GreetingsStreams;importorg.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding(GreetingsStreams.class)publicclassStreamsConfig {} | 
  Связывание потоков выполняется с @EnableBinding аннотации @EnableBinding куда GreatingsService интерфейс GreatingsService . 
Конфигурационные свойства для Кафки
  По умолчанию свойства конфигурации хранятся в файле src/main/resources/application.properties . 
Однако я предпочитаю использовать формат YAML, так как он менее многословен и позволяет хранить как общие, так и специфичные для среды свойства в одном файле.
  А пока давайте переименуем application.properties в application.yaml и вставим приведенный ниже фрагмент конфигурации в файл: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 | spring:  cloud:    stream:      kafka:        binder:          brokers: localhost:9092      bindings:        greetings-in:          destination: greetings          contentType: application/json        greetings-out:          destination: greetings          contentType: application/json | 
Приведенные выше свойства конфигурации настраивают адрес сервера Kafka для подключения и тему Kafka, которую мы используем для нашего входящего и исходящего потоков в нашем коде. Они оба должны использовать одну и ту же тему Кафки!
  Свойства contentType сообщают Spring Cloud Stream отправлять / получать объекты наших сообщений в виде String в потоках. 
Создать объект сообщения
  Создайте простой класс com.kaviddiss.streamkafka.model.Greetings с кодом ниже, который будет представлять объект сообщения, из которого мы читаем и пишем в тему greetings Kafka: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 | packagecom.kaviddiss.streamkafka.model;// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/):importlombok.Builder;importlombok.Getter;importlombok.Setter;importlombok.ToString;@Getter@Setter@ToString@BuilderpublicclassGreetings {    privatelongtimestamp;    privateString message;} | 
  Обратите внимание, что в классе нет геттеров и сеттеров благодаря аннотациям Lombok.  @ToString сгенерирует метод toString() с использованием полей класса, а аннотация @Builder позволит нам создавать объекты Greetings с помощью Fluent Builder (см. Ниже). 
Создать сервисный слой для записи в Kafka
  Давайте создадим класс com.kaviddiss.streamkafka.service.GreetingsService с приведенным ниже кодом, который будет записывать объект Greetings в тему greetings Kafka: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | packagecom.kaviddiss.streamkafka.service;importcom.kaviddiss.streamkafka.model.Greetings;importcom.kaviddiss.streamkafka.stream.GreetingsStreams;importlombok.extern.slf4j.Slf4j;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.MessageHeaders;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Service;importorg.springframework.util.MimeTypeUtils;@Service@Slf4jpublicclassGreetingsService {    privatefinalGreetingsStreams greetingsStreams;    publicGreetingsService(GreetingsStreams greetingsStreams) {        this.greetingsStreams = greetingsStreams;    }    publicvoidsendGreeting(finalGreetings greetings) {        log.info("Sending greetings {}", greetings);        MessageChannel messageChannel = greetingsStreams.outboundGreetings();        messageChannel.send(MessageBuilder                .withPayload(greetings)                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)                .build());    } | 
  Аннотация @Service настроит этот класс как Spring Bean и внедрит зависимость GreetingsService через конструктор. 
  Аннотация @Slf4j сгенерирует поле регистратора SLF4J, которое мы можем использовать для регистрации. 
  В sendGreeting() мы используем sendGreeting() объект GreetingsStream для отправки сообщения, представленного объектом Greetings . 
Создать REST API
  Теперь мы создадим конечную точку API REST, которая будет инициировать отправку сообщения в Kafka с помощью Spring Bean GreetingsService : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | packagecom.kaviddiss.streamkafka.web;importcom.kaviddiss.streamkafka.model.Greetings;importcom.kaviddiss.streamkafka.service.GreetingsService;importorg.springframework.http.HttpStatus;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.ResponseStatus;importorg.springframework.web.bind.annotation.RestController; @RestControllerpublicclassGreetingsController {    privatefinalGreetingsService greetingsService;    publicGreetingsController(GreetingsService greetingsService) {        this.greetingsService = greetingsService;    }    @GetMapping("/greetings")    @ResponseStatus(HttpStatus.ACCEPTED)    publicvoidgreetings(@RequestParam("message") String message) {        Greetings greetings = Greetings.builder()            .message(message)            .timestamp(System.currentTimeMillis())            .build();        greetingsService.sendGreeting(greetings);    }} | 
  Аннотация @RestController сообщает Spring, что это bean-компонент Controller (C из MVC).  Метод greetings() определяет конечную точку HTTP GET /greetings которая принимает параметр запроса message и передает его sendGreeting() в GreetingsService . 
Прослушивание приветствия Кафки в теме
  Давайте создадим класс com.kaviddiss.streamkafka.service.GreetingsListener который будет прослушивать сообщения по теме greetings Kafka и регистрировать их в консоли: 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 | packagecom.kaviddiss.streamkafka.service;importcom.kaviddiss.streamkafka.model.Greetings;importcom.kaviddiss.streamkafka.stream.GreetingsStreams;importlombok.extern.slf4j.Slf4j;importorg.springframework.cloud.stream.annotation.StreamListener;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Component;@Component@Slf4jpublicclassGreetingsListener {    @StreamListener(GreetingsStreams.INPUT)    publicvoidhandleGreetings(@PayloadGreetings greetings) {        log.info("Received greetings: {}", greetings);    }} | 
  Аннотация @Component аналогично @Service и @RestController определяет Spring Bean. 
  GreetingsListener имеет единственный метод handleGreetings() который будет вызываться Spring Cloud Stream с каждым новым объектом сообщения Greetings в теме greetings Kafka.  Это благодаря аннотации @StreamListener настроенной для handleGreetings() . 
Запуск приложения
  Последним фрагментом головоломки является класс com.kaviddiss.streamkafka.StreamKafkaApplication который был автоматически создан Spring Initializer: 
| 01 02 03 04 05 06 07 08 09 10 11 12 | packagecom.kaviddiss.streamkafka;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublicclassStreamKafkaApplication {    publicstaticvoidmain(String[] args) {        SpringApplication.run(StreamKafkaApplication.class, args);    }} | 
Нет необходимости вносить какие-либо изменения здесь. Вы можете запустить этот класс как приложение Java из вашей IDE или запустить приложение из командной строки с помощью подключаемого модуля Spring Boot maven:
| 1 | > mvn spring-boot:run | 
После запуска приложения перейдите по адресу http: // localhost: 8080 / greetings? Message = hello в браузере и проверьте свою консоль.
Резюме
Я надеюсь, вам понравился этот урок. Не стесняйтесь задавать любые вопросы и оставляйте свои отзывы.
| Опубликовано на Java Code Geeks с разрешения Дэвида Кисс, партнера нашей программы JCG .  Смотреть оригинальную статью здесь: Spring Cloud Stream с Кафкой Мнения, высказанные участниками Java Code Geeks, являются их собственными. | 
