Сегодняшние пользователи приложений требуют самого лучшего пользовательского опыта. Они привыкли получать доступ к своим приложениям со всех своих устройств (компьютеров, мобильных телефонов, планшетов и т. Д.). Поскольку платформы продолжают переходить на программное обеспечение как услугу (SaaS), разработчики постоянно сотрудничают с мощными инструментами, которые позволяют обрабатывать тысячи запросов каждую секунду. Вот тут-то и появился Apache Kafka, надежный инструмент, который известен своей способностью работать в очень интенсивной среде.
В этой статье мы познакомим вас с основами Apache Kafka и перейдем к созданию безопасного, масштабируемого приложения для обмена сообщениями с использованием Java и Kafka.
Предварительные условия: Java 8+, подключение к интернету и бесплатная учетная запись разработчика Okta .
Краткий обзор Apache Kafka
Apache Kafka — это распределенная потоковая платформа, которая использует шаблон сообщений публикации / подписки для взаимодействия с приложениями; это разработано, чтобы создать длительные сообщения.
Давайте разберем эти понятия более подробно.
Платформа распределенной потоковой передачи
Если вы хотите запустить Kafka, вам нужно запустить его брокер: простой экземпляр Kafka, работающий на машине, как и любой другой сервер. Брокер отвечает за отправку, получение и хранение сообщений на диске.
Одного брокера недостаточно, чтобы Kafka мог обрабатывать сообщения с высокой пропускной способностью. Эта цель достигается благодаря совместной работе многих брокеров, общению и координации друг с другом.
Кластер Kafka объединяет одного или нескольких брокеров. Вместо того, чтобы подключаться к одному узлу, ваше приложение подключается к кластеру, который управляет всеми распределенными данными для вас.
Вам также может понравиться:
Учебное пособие по Kafka для всех, не имеет значения ваш этап в развитии .
Система обмена сообщениями публикации / подписки с надежными сообщениями
Публикация / подписка — это распространенный шаблон в распределенных системах. Изображение ниже иллюстрирует базовую структуру этого шаблона в Кафке:
Изображение включает в себя два компонента, не упомянутых до сих пор: производители и потребители.
Producer — это приложение, которое отправляет сообщения в кластер. В этом примере Producer 1, 2 и 3 отправляют сообщения. Затем кластер выбирает, какой брокер должен их хранить, и отправляет его выбранным.
С другой стороны, у вас есть потребители. Потребитель — это приложение, которое подключается к кластеру и получает сообщения от производителей. Любое приложение, которое заинтересовано в использовании сообщений, отправляемых производителями, должно подключаться к потребителю Kafka.
Поскольку Kafka хранит сообщения в течение длительного времени (значение по умолчанию составляет семь дней), у вас может быть много потребителей, получающих одно и то же сообщение, даже если их не было на момент отправки сообщения!
Кафка Темы
Когда вы отправляете сообщение брокеру Kafka , вам нужно указать, куда сообщение будет отправлено, указав тему. Тема — это категория сообщений, на которую потребитель может подписаться. Этот механизм гарантирует, что потребители получают только сообщения, относящиеся к ним, а не получают каждое сообщение, опубликованное в кластере.
Теперь, когда вы понимаете основную архитектуру Kafka, давайте загрузим и установим ее.
Установите и запустите Kafka
Чтобы скачать Kafka, перейдите на веб-сайт Kafka . Извлеките содержимое этого сжатого файла в папку по вашему выбору.
Внутри каталога Kafka перейдите в bin
папку. Здесь вы найдете много скриптов bash, которые будут полезны для запуска приложения Kafka. Если вы используете Windows, у вас также есть те же скрипты внутри windows
папки. В этом руководстве используются команды Linux, но вам просто нужно использовать эквивалентную версию Windows, если вы используете ОС Microsoft.
Запустите Zookeeper для управления вашим кластером Kafka
Apache Kafka всегда запускается как распределенное приложение. Это означает, что ваш кластер должен иметь дело с некоторыми распределенными проблемами на этом пути, такими как синхронизация конфигураций или выбор лидера, который позаботится о кластере.
Кафка использует Zookeeper для отслеживания этих деталей. Не беспокойтесь о его загрузке. Kafka уже поставляется вместе с Zookeeper, что позволяет вам очень быстро встать на ноги.
Давайте запустим экземпляр Zookeeper! Внутри bin
папки в вашем каталоге Kafka выполните следующую команду:
Оболочка
1
./zookeeper-server-start.sh ../config/zookeeper.properties
Эта команда запускает сервер Zookeeper на порту 2181 по умолчанию. Zookeeper отвечает за координацию работы брокеров Kafka внутри вашего кластера. В этом руководстве вы будете использовать конфигурации по умолчанию внутри проекта Kafka, но вы всегда можете изменить эти значения по мере необходимости.
Запустить Кафку Брокер
Следующим шагом является запуск самого брокера. Из другого терминала выполните следующую команду из bin
папки:
Оболочка
xxxxxxxxxx
1
./kafka-server-start.sh ../config/server.properties
Как вы уже догадались, эта команда запускает сервер Kafka с конфигурациями по умолчанию на порте по умолчанию, 9092.
Создать тему Кафки
Теперь, когда у вас запущены брокер и Zookeeper, вы можете указать тему для начала отправки сообщений от производителя. Вы собираетесь запустить команду внутри bin
папки, как вы это делали в предыдущих шагах:
Оболочка
xxxxxxxxxx
1
./kafka-topics.sh --create --topic myTopic -zookeeper \
2
localhost:2181 --replication-factor 1 --partitions 1
Эта команда создает тему с именем, myTopic
указывающим на экземпляр Zookeeper, который вы начали с первой команды. Есть также два разных параметра, которые вы должны указать: replication-factor
и partitions
. Не беспокойтесь о них прямо сейчас; они используются для управления конкретными аспектами, связанными с распределенными системами в Кафке. Поскольку вы выполняете простую настройку, вы можете указать «1» для обоих параметров.
Теперь, когда все готово, вы можете начать интеграцию Kafka с Java-приложением!
Создать приложение Java + Kafka
Давайте начнем со структуры проекта, используя Spring Initializer для создания приложения.
Перейдите на https://start.spring.io и заполните следующую информацию:
- Проект: Maven Project.
- Язык: Java.
- Группа:
com.okta.javakafka
- Артефакт:
kafka-java
- зависимости:
- Весенняя паутина.
- Весна для Apache Kafka.
Вы также можете создать проект с помощью командной строки. Вставьте следующую команду в ваш терминал, и он загрузит проект с такими же конфигурациями, как определено выше:
Оболочка
xxxxxxxxxx
1
curl https://start.spring.io/starter.zip -d language=java \
2
-d dependencies=web,kafka \
3
-d packageName=com.okta.javakafka \
4
-d name=kafka-java \
5
-d type=maven-project \
6
-o kafka-java.zip
Этот урок использует Maven, но вы можете легко следовать за ним с Gradle, если хотите.
Это оно! Теперь ваша структура проекта Java создана, и вы можете приступить к разработке своего приложения.
Push-сообщения к теме Кафки в вашем Java-приложении
Первым шагом для создания производителя, который может отправлять сообщения, является настройка производителей внутри вашего Java-приложения. Давайте создадим класс конфигурации для этого.
Создайте src/main/java/com/okta/javakafka/configuration
папку и ProducerConfiguration
класс в ней:
Джава
xxxxxxxxxx
1
import org.apache.kafka.clients.producer.ProducerConfig;
2
import org.apache.kafka.common.serialization.StringSerializer;
3
import org.springframework.context.annotation.Bean;
4
import org.springframework.context.annotation.Configuration;
5
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
6
import org.springframework.kafka.core.KafkaTemplate;
7
import org.springframework.kafka.core.ProducerFactory;
8
import java.util.HashMap;
10
import java.util.Map;
11
13
public class ProducerConfiguration {
14
private static final String KAFKA_BROKER = "localhost:9092";
16
18
public ProducerFactory<String, String> producerFactory() {
19
return new DefaultKafkaProducerFactory<>(producerConfigurations());
20
}
21
23
public Map<String, Object> producerConfigurations() {
24
Map<String, Object> configurations = new HashMap<>();
25
configurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
27
configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
28
configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
29
return configurations;
31
}
32
34
public KafkaTemplate<String, String> kafkaTemplate() {
35
return new KafkaTemplate<>(producerFactory());
36
}
37
}
Этот класс создает класс, ProducerFactory
который знает, как создавать производителей на основе предоставленных вами конфигураций. Вы также указали для подключения к вашему локальному брокеру Kafka и сериализации ключа и значений с помощью String
.
Вы также объявили KafkaTemplate
bean-компонент для выполнения высокоуровневых операций над вашим производителем. Другими словами, шаблон может выполнять такие операции, как отправка сообщения в тему, и эффективно скрывает скрытые детали от вас.
Следующим шагом является создание конечной точки для отправки сообщения производителю. Внутри src/main/java/com/okta/javakafka/controller
пакета создайте следующий класс:
Джава
1
import org.springframework.kafka.core.KafkaTemplate;
2
import org.springframework.web.bind.annotation.GetMapping;
3
import org.springframework.web.bind.annotation.RequestParam;
4
import org.springframework.web.bind.annotation.RestController;
5
import java.util.List;
7
9
public class KafkaController {
10
private KafkaTemplate<String, String> template;
12
public KafkaController(KafkaTemplate<String, String> template) {
14
this.template = template;
15
}
16
"/kafka/produce") (
18
public void produce( String message) {
19
template.send("myTopic", message);
20
}
21
Примечание. Поскольку вы отправляете данные для обработки, produce()
метод действительно должен быть методом POST. В демонстрационных целях проще оставить его как GET, чтобы вы могли использовать его в браузере.
Как видите, эта конечная точка очень проста. Он внедряет KafkaTemplate
настроенный ранее и отправляет сообщение, myTopic
когда GET
запрос сделан /kafka/produce
.
Давайте проверим, все ли работает как положено. Запустите main
метод внутри JavaKafkaApplication
класса. Для запуска из командной строки выполните следующую команду:
xxxxxxxxxx
1
./mvnw spring-boot:run
Ваш сервер должен работать на порту 8080, и вы уже можете делать запросы API к нему!
Перейдите в веб-браузер и перейдите по адресу http: // localhost: 8080 / kafka / products? Message = Это мое сообщение .
Когда вы делаете вызов с помощью команды выше, ваше приложение выполнит /kafka/produce
конечную точку, которая отправит сообщение в myTopic
тему внутри Kafka.
Но как узнать, команда успешно отправила сообщение в тему? Прямо сейчас вы не используете сообщения внутри своего приложения, а это значит, что вы не уверены!
К счастью, существует простой способ создать потребителя для тестирования сразу. Внутри bin
папки вашего каталога Kafka выполните следующую команду:
Оболочка
xxxxxxxxxx
1
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
Доступ http: // localhost: 8080 / kafka / product? Message = Это мое сообщение еще раз, чтобы увидеть следующее сообщение в терминале, на котором запущен потребитель Kafka:
Оболочка
xxxxxxxxxx
1
This is my message
Отличная работа! Вы можете остановить эту команду на данный момент.
Вместо того, чтобы выполнять из терминала, давайте добавим немного кода Java, чтобы использовать сообщения внутри вашего приложения.
Использовать сообщения из темы Кафки в приложении Java
Как и в случае с производителем, вам необходимо добавить конфигурации, чтобы потребитель мог найти брокера Kafka.
Внутри src/main/java/com/okta/javakafka/configuration
создайте следующий класс:
Джава
xxxxxxxxxx
1
import org.apache.kafka.clients.consumer.ConsumerConfig;
2
import org.apache.kafka.common.serialization.StringDeserializer;
3
import org.springframework.context.annotation.Bean;
4
import org.springframework.context.annotation.Configuration;
5
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
6
import org.springframework.kafka.core.ConsumerFactory;
7
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
8
import java.util.HashMap;
10
import java.util.Map;
11
13
public class ConsumerConfiguration {
14
private static final String KAFKA_BROKER = "localhost:9092";
16
private static final String GROUP_ID = "kafka-sandbox";
17
19
public ConsumerFactory<String, String> consumerFactory() {
20
return new DefaultKafkaConsumerFactory<>(consumerConfigurations());
21
}
22
24
public Map<String, Object> consumerConfigurations() {
25
Map<String, Object> configurations = new HashMap<>();
26
configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
28
configurations.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
29
configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
30
configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
31
return configurations;
33
}
34
36
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
37
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
38
factory.setConsumerFactory(consumerFactory());
39
return factory;
40
}
41
}
Приведенный выше код создает фабрику, которая знает, как подключиться к вашему локальному брокеру. Он также настраивает вашего потребителя на десериализацию и String
для ключа, и для значения, соответствующего конфигурации производителя.
Идентификатор группы является обязательным и используется Kafka для параллельного потребления данных. ConcurrentKafkaListenerContainerFactory
Боб позволяет вашему приложению получать сообщения в более чем одной нити.
Теперь, когда ваше Java-приложение настроено на поиск потребителей внутри вашего брокера Kafka, давайте начнем слушать сообщения, отправленные в тему.
Создайте src/main/java/com/okta/javakafka/consumer
каталог и следующий класс в нем:
Джава
xxxxxxxxxx
1
import org.springframework.kafka.annotation.KafkaListener;
2
import org.springframework.stereotype.Component;
3
4
import java.util.ArrayList;
5
import java.util.List;
6
7
8
public class MyTopicConsumer {
9
10
private final List<String> messages = new ArrayList<>();
11
12
topics = "myTopic", groupId = "kafka-sandbox") (
13
public void listen(String message) {
14
synchronized (messages) {
15
messages.add(message);
16
}
17
}
18
19
public List<String> getMessages() {
20
return messages;
21
}
22
23
}
Этот класс отвечает за прослушивание изменений внутри myTopic
темы. Это делается с помощью KafkaListener
аннотации. Каждый раз, когда новое сообщение отправляется от производителя к теме, ваше приложение получает сообщение внутри этого класса. Он добавляет сообщение в список полученных сообщений, делая его доступным для других классов с помощью getMessages()
метода.
Далее, давайте создадим конечную точку, которая отображает список использованных сообщений. Вернитесь к тому, KafkaController
чтобы добавить MyTopicConsumer
как зависимость и getMessages()
метод.
Джава
xxxxxxxxxx
1
import com.okta.javakafka.consumer.MyTopicConsumer;
2
import org.springframework.kafka.core.KafkaTemplate;
3
import org.springframework.web.bind.annotation.GetMapping;
4
import org.springframework.web.bind.annotation.RequestParam;
5
import org.springframework.web.bind.annotation.RestController;
6
import java.util.List;
8
10
public class KafkaController {
11
private KafkaTemplate<String, String> template;
13
private MyTopicConsumer myTopicConsumer;
14
public KafkaController(KafkaTemplate<String, String> template, MyTopicConsumer myTopicConsumer) {
16
this.template = template;
17
this.myTopicConsumer = myTopicConsumer;
18
}
19
"/kafka/produce") (
21
public void produce( String message) {
22
template.send("myTopic", message);
23
}
24
"/kafka/messages") (
26
public List<String> getMessages() {
27
return myTopicConsumer.getMessages();
28
}
29
}
Этот класс теперь имеет новую конечную точку для отображения сообщений, хранящихся у вашего потребителя. Когда вызывается эта конечная точка, она отправляет текущие уже обработанные сообщения из темы Kafka.
Ваше Java-приложение теперь имеет и производителя Kafka, и потребителя, поэтому давайте проверим все это вместе! Перезапустите приложение и перейдите по адресу http: // localhost: 8080 / kafka / messages .
В данный момент информация не возвращается. Причина довольно проста: ваш потребитель настроен только на получение новых сообщений, а вы еще не отправили новое сообщение. Давайте исправим эту проблему, зайдя в ваш веб-браузер и зайдя по адресу http: // localhost: 8080 / kafka / products? Message = Сообщение отправлено моим приложением! ,
Когда Кафка получит сообщение, оно сразу же сообщит об этом вашему потребителю. Перейдите по ссылке http: // localhost: 8080 / kafka / messages в вашем браузере. Теперь вы увидите, что ваше сообщение было успешно получено!
Отличная работа! У вас есть Java-приложение, способное генерировать и принимать сообщения от Kafka! Прежде чем мы назовем это днем, есть еще один последний шаг, и он очень важный.
Защитите свое приложение Java Kafka
Ваше приложение сейчас не очень безопасно. Хотя вы готовы обрабатывать многие сообщения в распределенной среде, эти сообщения по-прежнему доступны для всех, кто может найти ссылку на ваши конечные точки. Это критическая уязвимость, поэтому давайте удостоверимся, что она исправлена правильно.
Вы собираетесь использовать OAuth 2.0, чтобы убедиться, что только прошедшие проверку пользователи могут видеть ваши конечные точки. Лучшая часть? Это займет всего 5 минут, чтобы добавить эту функцию в ваше приложение с помощью Okta для аутентификации ваших пользователей!
Создать аккаунт Okta
Если у вас еще нет учетной записи Okta, создайте ее . После завершения регистрации выполните следующие шаги:
- Вход в свой аккаунт.
- Перейдите в Приложения > Добавить приложение . Вы будете перенаправлены на следующую страницу:
- Выберите Web и нажмите Next .
- Заполните следующие параметры в форме:
- Название:
Bootiful Kafka
- Базовые URI:
http://localhost:8080
- URL перенаправления входа:
http://localhost:8080/login/oauth2/code/okta
- Название:
- Нажмите Готово .
Теперь, когда у вас есть приложение Okta, вы можете использовать его для аутентификации пользователей в приложении Java + Kafka.
Защитите свое приложение Java с помощью аутентификации пользователя
Давайте начнем с добавления библиотеки Okta в ваш проект. Откройте свой pom.xml
и добавьте следующую зависимость внутри <dependencies>
тега:
XML
xxxxxxxxxx
1
<dependency>
2
<groupId>com.okta.spring</groupId>
3
<artifactId>okta-spring-boot-starter</artifactId>
4
<version>1.3.0</version>
5
</dependency>
Эта библиотека будет интегрирована с только что созданным приложением Okta. Это также добавит Spring Security в ваше текущее приложение. Настройте его со следующими переменными в src/main/resources/application.properties
:
Файлы свойств
xxxxxxxxxx
1
okta.oauth2.issuer: https://{yourOktaDomain}/oauth2/default
2
okta.oauth2.client-id: {yourClientID}
3
okta.oauth2.client-secret: {yourClientSecret}
ВАЖНО : Этот файл должен использоваться только локально. Не передавайте секрет вашего клиента в Git или любую другую систему контроля версий.
Чтобы избежать случайного раскрытия этих учетных данных, вы также можете указать значения вашего приложения Okta в качестве переменных среды. Создайте
okta.env
файл в корневом каталоге вашего приложения со следующими переменными среды. Затем запуститеsource okta.env
перед запуском вашего приложения.
Оболочка
xxxxxxxxxx
1
1
export OKTA_OAUTH2_ISSUER=https://{yourOktaDomain}/oauth2/default
2
export OKTA_OAUTH2_CLIENT_ID={yourClientID}
3
export OKTA_OAUTH2_CLIENT_SECRET={yourClientSecret}
Вы можете найти {yourClientID}
и {yourClientSecret}
на странице приложений Okta UI. Чтобы получить к нему доступ, выполните следующие действия:
- В вашем меню Okta, перейдите в Приложения .
- Выберите приложение Bootiful Kafka .
- Нажмите на вкладку Общие .
Вы должны увидеть оба значения в области «Учетные данные клиента».
Значение {yourOktaDomain}
для будет видно на вашей панели инструментов Okta, просто нажмите на панели инструментов в меню. Вы увидите URL организации в правом верхнем углу.
Это оно!
Перезапустите приложение Spring Boot и перейдите по адресу http: // localhost: 8080 / kafka / messages . Ваше приложение теперь перенаправит вас на страницу входа:
ПРИМЕЧАНИЕ. Если вам не предлагается войти в систему, это потому, что вы уже вошли в систему. Откройте свое приложение в окне инкогнито, и вы увидите экран входа, показанный выше.
Введите ваше имя пользователя и пароль. Если ваша попытка входа будет успешной, вы снова будете перенаправлены обратно в приложение.
Поздравляем! Теперь у вас есть безопасное Java-приложение, которое может генерировать и принимать сообщения от Kafka.
Если вы хотите проверить полный исходный код этого руководства, перейдите к oktadeveloper / okta-java-kafka-example на GitHub.
Хотите узнать больше о Java, безопасности и OAuth 2.0? Вот несколько ссылок, которые могут вас заинтересовать:
- Руководство по Java OAuth 2.0: защитите свое приложение за 5 минут
- Иллюстрированное руководство по OAuth и OpenID Connect
- Безопасные реактивные микросервисы с Spring Cloud Gateway
Чтобы узнать больше о подобных статьях, следите за @oktadev в Twitter . Мы также регулярно публикуем скринкасты на нашем канале YouTube !