обзор
Этот пример проекта демонстрирует, как создавать потоковые приложения в реальном времени, используя управляемую событиями архитектуру , Spring Boot , Spring Cloud Stream, Apache Kafka и Lombok .
К концу этого урока вы получите простой микросервис Greetings на основе Spring Boot, который работает
- принимает сообщение от REST API
- пишет это в тему Кафки
- читает это из темы
- выводит его на консоль
Давайте начнем!
Кстати, вы найдете исходный код здесь .
Что такое Spring Cloud Streaming?
Spring Cloud Stream — это платформа, основанная на Spring Boot для создания управляемых сообщениями микросервисов.
Что такое Кафка?
Kafka — популярная высокопроизводительная и горизонтально масштабируемая платформа обмена сообщениями, изначально разработанная LinkedIn.
Установка Кафки
Загрузите Кафку отсюда и распакуйте ее:
1
2
|
> tar -xzf kafka_2. 11 - 1.0 . 0 .tgz > cd kafka_2. 11 - 1.0 . 0 |
Запустите Zookeeper и Kafka
В Windows:
1
2
|
> bin\windows\zookeeper-server-start.bat config\zookeeper.properties > bin\windows\kafka-server-start.bat config\server.properties |
В Linux или Mac:
1
2
|
> bin/zookeeper-server-start.sh config/zookeeper.properties > bin/kafka-server-start.sh config/server.properties |
Если Kafka не работает и не запускается после выхода компьютера из спящего режима, удалите <TMP_DIR>/kafka-logs
а затем снова запустите Kafka.
Что такое Ломбок?
Lombok — это Java-фреймворк, который автоматически генерирует в коде геттеры, сеттеры, toString (), компоновщики, регистраторы и т. Д.
Maven зависимости
Перейдите на https://start.spring.io, чтобы создать проект maven:
- Добавьте необходимые зависимости:
Spring Cloud Stream
,Kafka
,Devtools
(для горячихDevtools
во время разработки, необязательно),Actuator
(для приложения мониторинга, необязательно),Lombok
(убедитесь, что в вашей IDE также установлен плагин Lombok) - Нажмите кнопку «Создать проект», чтобы загрузить проект в виде zip-файла.
- Извлеките zip-файл и импортируйте проект maven в вашу любимую IDE
Обратите внимание на зависимости maven в файле pom.xml
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
< dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-actuator</ artifactId > </ dependency > < dependency > < groupId >org.springframework.cloud</ groupId > < artifactId >spring-cloud-stream</ artifactId > </ dependency > < dependency > < groupId >org.springframework.cloud</ groupId > < artifactId >spring-cloud-starter-stream-kafka</ artifactId > </ dependency > <!-- Also install the Lombok plugin in your IDE --> < dependency > < groupId >org.projectlombok</ groupId > < artifactId >lombok</ artifactId > < optional >true</ optional > </ dependency > <!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running --> < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-devtools</ artifactId > < optional >true</ optional > </ dependency > |
… Также раздел <dependencyManagement>
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
< dependencyManagement > < dependencies > < dependency > <!-- Import dependency management from Spring Boot --> < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-dependencies</ artifactId > < version >${spring-boot.version}</ version > < type >pom</ type > < scope >import</ scope > </ dependency > < dependency > < groupId >org.springframework.cloud</ groupId > < artifactId >spring-cloud-stream-dependencies</ artifactId > < version >${spring-cloud-stream.version}</ version > < type >pom</ type > < scope >import</ scope > </ dependency > </ dependencies > </ dependencyManagement > |
… и раздел <repository>
:
1
2
3
4
5
6
7
8
|
< repository > < id >spring-milestones</ id > < name >Spring Milestones</ name > < snapshots > < enabled >false</ enabled > </ snapshots > </ repository > |
Определите потоки Кафки
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
package com.kaviddiss.streamkafka.stream; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface GreetingsStreams { String INPUT = "greetings-in" ; String OUTPUT = "greetings-out" ; @Input (INPUT) SubscribableChannel inboundGreetings(); @Output (OUTPUT) MessageChannel outboundGreetings(); } |
Чтобы наше приложение могло взаимодействовать с Kafka, нам нужно определить исходящий поток для записи сообщений в тему Kafka и входящий поток для чтения сообщений из темы Kafka.
Spring Cloud предоставляет удобный способ сделать это, просто создав интерфейс, который определяет отдельный метод для каждого потока.
Метод inboundGreetings()
определяет входящий поток для чтения из Kafka, а метод outboundGreetings()
определяет исходящий поток для записи в Kafka.
Во время выполнения Spring создаст реализацию интерфейса GreetingsStreams
основе прокси-сервера Java, которую можно внедрить в виде Spring Bean в любом месте кода для доступа к нашим двум потокам.
Настроить Spring Cloud Stream
Наш следующий шаг — настроить Spring Cloud Stream для привязки к нашим потокам в интерфейсе GreetingsStreams
. Это можно сделать, создав класс com.kaviddiss.streamkafka.config.StreamsConfig
с кодом ниже:
1
2
3
4
5
6
7
8
|
package com.kaviddiss.streamkafka.config; import com.kaviddiss.streamkafka.stream.GreetingsStreams; import org.springframework.cloud.stream.annotation.EnableBinding; @EnableBinding (GreetingsStreams. class ) public class StreamsConfig { } |
Связывание потоков выполняется с @EnableBinding
аннотации @EnableBinding
куда GreatingsService
интерфейс GreatingsService
.
Конфигурационные свойства для Кафки
По умолчанию свойства конфигурации хранятся в файле src/main/resources/application.properties
.
Однако я предпочитаю использовать формат YAML, так как он менее многословен и позволяет хранить как общие, так и специфичные для среды свойства в одном файле.
А пока давайте переименуем application.properties
в application.yaml
и вставим приведенный ниже фрагмент конфигурации в файл:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
spring: cloud: stream: kafka: binder: brokers: localhost: 9092 bindings: greetings-in: destination: greetings contentType: application/json greetings-out: destination: greetings contentType: application/json |
Приведенные выше свойства конфигурации настраивают адрес сервера Kafka для подключения и тему Kafka, которую мы используем для нашего входящего и исходящего потоков в нашем коде. Они оба должны использовать одну и ту же тему Кафки!
Свойства contentType
сообщают Spring Cloud Stream отправлять / получать объекты наших сообщений в виде String
в потоках.
Создать объект сообщения
Создайте простой класс com.kaviddiss.streamkafka.model.Greetings
с кодом ниже, который будет представлять объект сообщения, из которого мы читаем и пишем в тему greetings
Kafka:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
package com.kaviddiss.streamkafka.model; // lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/): import lombok.Builder; import lombok.Getter; import lombok.Setter; import lombok.ToString; @Getter @Setter @ToString @Builder public class Greetings { private long timestamp; private String message; } |
Обратите внимание, что в классе нет геттеров и сеттеров благодаря аннотациям Lombok. @ToString
сгенерирует метод toString()
с использованием полей класса, а аннотация @Builder
позволит нам создавать объекты Greetings
с помощью Fluent Builder (см. Ниже).
Создать сервисный слой для записи в Kafka
Давайте создадим класс com.kaviddiss.streamkafka.service.GreetingsService
с приведенным ниже кодом, который будет записывать объект Greetings
в тему greetings
Kafka:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package com.kaviddiss.streamkafka.service; import com.kaviddiss.streamkafka.model.Greetings; import com.kaviddiss.streamkafka.stream.GreetingsStreams; import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.util.MimeTypeUtils; @Service @Slf4j public class GreetingsService { private final GreetingsStreams greetingsStreams; public GreetingsService(GreetingsStreams greetingsStreams) { this .greetingsStreams = greetingsStreams; } public void sendGreeting( final Greetings greetings) { log.info( "Sending greetings {}" , greetings); MessageChannel messageChannel = greetingsStreams.outboundGreetings(); messageChannel.send(MessageBuilder .withPayload(greetings) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build()); } |
Аннотация @Service
настроит этот класс как Spring Bean и внедрит зависимость GreetingsService
через конструктор.
Аннотация @Slf4j
сгенерирует поле регистратора SLF4J, которое мы можем использовать для регистрации.
В sendGreeting()
мы используем sendGreeting()
объект GreetingsStream
для отправки сообщения, представленного объектом Greetings
.
Создать REST API
Теперь мы создадим конечную точку API REST, которая будет инициировать отправку сообщения в Kafka с помощью Spring Bean GreetingsService
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package com.kaviddiss.streamkafka.web; import com.kaviddiss.streamkafka.model.Greetings; import com.kaviddiss.streamkafka.service.GreetingsService; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; @RestController public class GreetingsController { private final GreetingsService greetingsService; public GreetingsController(GreetingsService greetingsService) { this .greetingsService = greetingsService; } @GetMapping ( "/greetings" ) @ResponseStatus (HttpStatus.ACCEPTED) public void greetings( @RequestParam ( "message" ) String message) { Greetings greetings = Greetings.builder() .message(message) .timestamp(System.currentTimeMillis()) .build(); greetingsService.sendGreeting(greetings); } } |
Аннотация @RestController
сообщает Spring, что это bean-компонент Controller (C из MVC). Метод greetings()
определяет конечную точку HTTP GET /greetings
которая принимает параметр запроса message
и передает его sendGreeting()
в GreetingsService
.
Прослушивание приветствия Кафки в теме
Давайте создадим класс com.kaviddiss.streamkafka.service.GreetingsListener
который будет прослушивать сообщения по теме greetings
Kafka и регистрировать их в консоли:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
package com.kaviddiss.streamkafka.service; import com.kaviddiss.streamkafka.model.Greetings; import com.kaviddiss.streamkafka.stream.GreetingsStreams; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @Component @Slf4j public class GreetingsListener { @StreamListener (GreetingsStreams.INPUT) public void handleGreetings( @Payload Greetings greetings) { log.info( "Received greetings: {}" , greetings); } } |
Аннотация @Component
аналогично @Service
и @RestController
определяет Spring Bean.
GreetingsListener
имеет единственный метод handleGreetings()
который будет вызываться Spring Cloud Stream с каждым новым объектом сообщения Greetings
в теме greetings
Kafka. Это благодаря аннотации @StreamListener
настроенной для handleGreetings()
.
Запуск приложения
Последним фрагментом головоломки является класс com.kaviddiss.streamkafka.StreamKafkaApplication
который был автоматически создан Spring Initializer:
01
02
03
04
05
06
07
08
09
10
11
12
|
package com.kaviddiss.streamkafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamKafkaApplication { public static void main(String[] args) { SpringApplication.run(StreamKafkaApplication. class , args); } } |
Нет необходимости вносить какие-либо изменения здесь. Вы можете запустить этот класс как приложение Java из вашей IDE или запустить приложение из командной строки с помощью подключаемого модуля Spring Boot maven:
1
|
> mvn spring-boot:run |
После запуска приложения перейдите по адресу http: // localhost: 8080 / greetings? Message = hello в браузере и проверьте свою консоль.
Резюме
Я надеюсь, вам понравился этот урок. Не стесняйтесь задавать любые вопросы и оставляйте свои отзывы.
Опубликовано на Java Code Geeks с разрешения Дэвида Кисс, партнера нашей программы JCG . Смотреть оригинальную статью здесь: Spring Cloud Stream с Кафкой
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |