Статьи

Весенний Облачный Поток с Кафкой

обзор

Этот пример проекта демонстрирует, как создавать потоковые приложения в реальном времени, используя управляемую событиями архитектуру , Spring Boot , Spring Cloud Stream, Apache Kafka и Lombok .

К концу этого урока вы получите простой микросервис Greetings на основе Spring Boot, который работает

  1. принимает сообщение от REST API
  2. пишет это в тему Кафки
  3. читает это из темы
  4. выводит его на консоль

Давайте начнем!

Кстати, вы найдете исходный код здесь .

Что такое Spring Cloud Streaming?

Spring Cloud Stream — это платформа, основанная на Spring Boot для создания управляемых сообщениями микросервисов.

Что такое Кафка?

Kafka — популярная высокопроизводительная и горизонтально масштабируемая платформа обмена сообщениями, изначально разработанная LinkedIn.

Установка Кафки

Загрузите Кафку отсюда и распакуйте ее:

1
2
> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0

Запустите Zookeeper и Kafka

В Windows:

1
2
> bin\windows\zookeeper-server-start.bat config\zookeeper.properties
> bin\windows\kafka-server-start.bat config\server.properties

В Linux или Mac:

1
2
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties

Если Kafka не работает и не запускается после выхода компьютера из спящего режима, удалите <TMP_DIR>/kafka-logs а затем снова запустите Kafka.

Что такое Ломбок?

Lombok — это Java-фреймворк, который автоматически генерирует в коде геттеры, сеттеры, toString (), компоновщики, регистраторы и т. Д.

Maven зависимости

Перейдите на https://start.spring.io, чтобы создать проект maven:

  1. Добавьте необходимые зависимости: Spring Cloud Stream , Kafka , Devtools (для горячих Devtools во время разработки, необязательно), Actuator (для приложения мониторинга, необязательно), Lombok (убедитесь, что в вашей IDE также установлен плагин Lombok)
  2. Нажмите кнопку «Создать проект», чтобы загрузить проект в виде zip-файла.
  3. Извлеките zip-файл и импортируйте проект maven в вашу любимую IDE

Обратите внимание на зависимости maven в файле pom.xml :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
  <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
  </dependency>
  <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>
  <!-- Also install the Lombok plugin in your IDE -->
  <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
  </dependency>
 
  <!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running -->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <optional>true</optional>
  </dependency>

… Также раздел <dependencyManagement> :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
<dependencyManagement>
  <dependencies>
    <dependency>
      <!-- Import dependency management from Spring Boot -->
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-dependencies</artifactId>
      <version>${spring-boot.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-dependencies</artifactId>
      <version>${spring-cloud-stream.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
    </dependencyManagement>

… и раздел <repository> :

1
2
3
4
5
6
7
8
<repository>
  <id>spring-milestones</id>
  <name>Spring Milestones</name>
  <snapshots>
    <enabled>false</enabled>
  </snapshots>
</repository>

Определите потоки Кафки

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
package com.kaviddiss.streamkafka.stream;
 
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel; 
 
public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
 
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
 
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

Чтобы наше приложение могло взаимодействовать с Kafka, нам нужно определить исходящий поток для записи сообщений в тему Kafka и входящий поток для чтения сообщений из темы Kafka.

Spring Cloud предоставляет удобный способ сделать это, просто создав интерфейс, который определяет отдельный метод для каждого потока.

Метод inboundGreetings() определяет входящий поток для чтения из Kafka, а метод outboundGreetings() определяет исходящий поток для записи в Kafka.

Во время выполнения Spring создаст реализацию интерфейса GreetingsStreams основе прокси-сервера Java, которую можно внедрить в виде Spring Bean в любом месте кода для доступа к нашим двум потокам.

Настроить Spring Cloud Stream

Наш следующий шаг — настроить Spring Cloud Stream для привязки к нашим потокам в интерфейсе GreetingsStreams . Это можно сделать, создав класс com.kaviddiss.streamkafka.config.StreamsConfig с кодом ниже:

1
2
3
4
5
6
7
8
package com.kaviddiss.streamkafka.config;
 
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;
 
@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

Связывание потоков выполняется с @EnableBinding аннотации @EnableBinding куда GreatingsService интерфейс GreatingsService .

Конфигурационные свойства для Кафки

По умолчанию свойства конфигурации хранятся в файле src/main/resources/application.properties .

Однако я предпочитаю использовать формат YAML, так как он менее многословен и позволяет хранить как общие, так и специфичные для среды свойства в одном файле.

А пока давайте переименуем application.properties в application.yaml и вставим приведенный ниже фрагмент конфигурации в файл:

01
02
03
04
05
06
07
08
09
10
11
12
13
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        greetings-in:
          destination: greetings
          contentType: application/json
        greetings-out:
          destination: greetings
          contentType: application/json

Приведенные выше свойства конфигурации настраивают адрес сервера Kafka для подключения и тему Kafka, которую мы используем для нашего входящего и исходящего потоков в нашем коде. Они оба должны использовать одну и ту же тему Кафки!

Свойства contentType сообщают Spring Cloud Stream отправлять / получать объекты наших сообщений в виде String в потоках.

Создать объект сообщения

Создайте простой класс com.kaviddiss.streamkafka.model.Greetings с кодом ниже, который будет представлять объект сообщения, из которого мы читаем и пишем в тему greetings Kafka:

01
02
03
04
05
06
07
08
09
10
11
12
13
package com.kaviddiss.streamkafka.model;
 
// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/):
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
 
@Getter @Setter @ToString @Builder
public class Greetings {
    private long timestamp;
    private String message;
}

Обратите внимание, что в классе нет геттеров и сеттеров благодаря аннотациям Lombok. @ToString сгенерирует метод toString() с использованием полей класса, а аннотация @Builder позволит нам создавать объекты Greetings с помощью Fluent Builder (см. Ниже).

Создать сервисный слой для записи в Kafka

Давайте создадим класс com.kaviddiss.streamkafka.service.GreetingsService с приведенным ниже кодом, который будет записывать объект Greetings в тему greetings Kafka:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.kaviddiss.streamkafka.service;
 
import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
 
@Service
@Slf4j
public class GreetingsService {
    private final GreetingsStreams greetingsStreams;
 
    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }
 
    public void sendGreeting(final Greetings greetings) {
        log.info("Sending greetings {}", greetings);
 
        MessageChannel messageChannel = greetingsStreams.outboundGreetings();
        messageChannel.send(MessageBuilder
                .withPayload(greetings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }

Аннотация @Service настроит этот класс как Spring Bean и внедрит зависимость GreetingsService через конструктор.

Аннотация @Slf4j сгенерирует поле регистратора SLF4J, которое мы можем использовать для регистрации.

В sendGreeting() мы используем sendGreeting() объект GreetingsStream для отправки сообщения, представленного объектом Greetings .

Создать REST API

Теперь мы создадим конечную точку API REST, которая будет инициировать отправку сообщения в Kafka с помощью Spring Bean GreetingsService :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.kaviddiss.streamkafka.web;
 
import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController; 
 
@RestController
public class GreetingsController {
    private final GreetingsService greetingsService;
 
    public GreetingsController(GreetingsService greetingsService) {
        this.greetingsService = greetingsService;
    }
 
    @GetMapping("/greetings")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void greetings(@RequestParam("message") String message) {
        Greetings greetings = Greetings.builder()
            .message(message)
            .timestamp(System.currentTimeMillis())
            .build();
 
        greetingsService.sendGreeting(greetings);
    }
}

Аннотация @RestController сообщает Spring, что это bean-компонент Controller (C из MVC). Метод greetings() определяет конечную точку HTTP GET /greetings которая принимает параметр запроса message и передает его sendGreeting() в GreetingsService .

Прослушивание приветствия Кафки в теме

Давайте создадим класс com.kaviddiss.streamkafka.service.GreetingsListener который будет прослушивать сообщения по теме greetings Kafka и регистрировать их в консоли:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
package com.kaviddiss.streamkafka.service;
 
import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
@Component
@Slf4j
public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greetings greetings) {
        log.info("Received greetings: {}", greetings);
    }
}

Аннотация @Component аналогично @Service и @RestController определяет Spring Bean.

GreetingsListener имеет единственный метод handleGreetings() который будет вызываться Spring Cloud Stream с каждым новым объектом сообщения Greetings в теме greetings Kafka. Это благодаря аннотации @StreamListener настроенной для handleGreetings() .

Запуск приложения

Последним фрагментом головоломки является класс com.kaviddiss.streamkafka.StreamKafkaApplication который был автоматически создан Spring Initializer:

01
02
03
04
05
06
07
08
09
10
11
12
package com.kaviddiss.streamkafka;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
public class StreamKafkaApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(StreamKafkaApplication.class, args);
    }
}

Нет необходимости вносить какие-либо изменения здесь. Вы можете запустить этот класс как приложение Java из вашей IDE или запустить приложение из командной строки с помощью подключаемого модуля Spring Boot maven:

1
> mvn spring-boot:run

После запуска приложения перейдите по адресу http: // localhost: 8080 / greetings? Message = hello в браузере и проверьте свою консоль.

Резюме

Я надеюсь, вам понравился этот урок. Не стесняйтесь задавать любые вопросы и оставляйте свои отзывы.

Опубликовано на Java Code Geeks с разрешения Дэвида Кисс, партнера нашей программы JCG . Смотреть оригинальную статью здесь: Spring Cloud Stream с Кафкой

Мнения, высказанные участниками Java Code Geeks, являются их собственными.