Учебники

Apache Kafka — Основные операции

Сначала давайте приступим к реализации конфигурации с одним узлом и одним посредником, а затем перенесем нашу настройку на конфигурацию с одним узлом и несколькими посредниками.

Надеюсь, вы уже установили Java, ZooKeeper и Kafka на свою машину. Прежде чем перейти к настройке Kafka Cluster, сначала вам нужно запустить ZooKeeper, потому что Kafka Cluster использует ZooKeeper.

Запустить ZooKeeper

Откройте новый терминал и введите следующую команду —

bin/zookeeper-server-start.sh config/zookeeper.properties

Чтобы запустить Kafka Broker, введите следующую команду —

bin/kafka-server-start.sh config/server.properties

После запуска Kafka Broker введите команду jps на терминале ZooKeeper, и вы увидите следующий ответ:

821 QuorumPeerMain
928 Kafka
931 Jps

Теперь вы можете увидеть два демона, запущенных на терминале, где QuorumPeerMain — это демон ZooKeeper, а другой — демон Kafka.

Конфигурация с одним узлом и одним брокером

В этой конфигурации у вас есть один экземпляр ZooKeeper и идентификатор брокера. Ниже приведены шаги для его настройки.

Создание раздела Kafka — Kafka предоставляет утилиту командной строки с именем kafka-topics.sh для создания разделов на сервере. Откройте новый терминал и введите приведенный ниже пример.

Синтаксис

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

пример

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

Мы только что создали тему под названием Hello-Kafka с одним разделом и одним фактором реплики. Созданный выше вывод будет похож на следующий вывод —

Вывод — Создана тема Hello-Kafka

После создания темы вы можете получить уведомление в окне терминала брокера Kafka и журнал для созданной темы, указанный в «/ tmp / kafka-logs /» в файле config / server.properties.

Список тем

Чтобы получить список тем на сервере Kafka, вы можете использовать следующую команду —

Синтаксис

bin/kafka-topics.sh --list --zookeeper localhost:2181

Выход

Hello-Kafka

Так как мы создали тему, она перечислит только Hello-Kafka . Предположим, если вы создадите более одной темы, вы получите названия тем в выводе.

Начать продюсер для отправки сообщений

Синтаксис

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

Из приведенного выше синтаксиса для клиента командной строки производителя требуются два основных параметра:

Broker-list — список брокеров, которым мы хотим отправлять сообщения. В этом случае у нас есть только один брокер. Файл Config / server.properties содержит идентификатор порта посредника, поскольку мы знаем, что наш посредник прослушивает порт 9092, поэтому вы можете указать его напрямую.

Название темы. Вот пример названия темы.

пример

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

Производитель будет ждать ввода от stdin и публикует данные в кластере Kafka. По умолчанию каждая новая строка публикуется как новое сообщение, а свойства производителя по умолчанию указываются в файле config / provider.properties. Теперь вы можете набрать несколько строк сообщений в терминале, как показано ниже.

Выход

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

Начать приемник для получения сообщений

Как и для производителя, свойства потребителя по умолчанию указаны в файле config / consumer.proper-ties . Откройте новый терминал и введите приведенный ниже синтаксис для использования сообщений.

Синтаксис

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

пример

bin/kafka-console-consumer.sh --zookeeper localhost:2181 topic Hello-Kafka 
--from-beginning

Выход

Hello
My first message
My second message

Наконец, вы можете вводить сообщения из терминала производителя и видеть, что они появляются в терминале потребителя. На данный момент у вас есть очень хорошее понимание кластера с одним узлом с одним брокером. Давайте теперь перейдем к конфигурации с несколькими брокерами.

Конфигурация с одним узлом и несколькими брокерами

Прежде чем перейти к настройке кластера с несколькими брокерами, сначала запустите сервер ZooKeeper.

Создайте несколько брокеров Kafka — у нас есть один экземпляр брокера Kafka уже в con-fig / server.properties. Теперь нам нужно несколько экземпляров брокера, поэтому скопируйте существующий файл server.prop-erties в два новых файла конфигурации и переименуйте его в server-one.properties и server-two.prop -ties. Затем отредактируйте оба новых файла и назначьте следующие изменения:

конфиг / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

конфиг / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Запуск нескольких брокеров. После внесения всех изменений на трех серверах откройте три новых терминала, чтобы запустить каждого брокера по одному.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

Теперь у нас работают три разных брокера. Попробуйте сами проверить все демоны, набрав jps на терминале ZooKeeper, и вы увидите ответ.

Создание темы

Давайте назначим значение коэффициента репликации как три для этой темы, потому что у нас работают три разных брокера. Если у вас есть два брокера, то назначенное значение реплики будет равно двум.

Синтаксис

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

пример

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Выход

created topic “Multibrokerapplication”

Команда Describe используется для проверки, какой посредник прослушивает текущую созданную тему, как показано ниже —

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Выход

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

Исходя из вышеприведенного вывода, мы можем заключить, что в первой строке дается сводка по всем разделам, показывая название темы, количество разделов и коэффициент репликации, который мы уже выбрали. Во второй строке каждый узел будет лидером для случайно выбранной части разделов.

В нашем случае мы видим, что наш первый брокер (с broker.id 0) является лидером. Тогда Реплики: 0,2,1 означает, что все брокеры копируют тему, наконец, Isr — это набор синхронных реплик. Ну, это подмножество реплик, которые в настоящее время живы и захвачены лидером.

Начать продюсер для отправки сообщений

Эта процедура остается такой же, как в настройке одного брокера.

пример

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Выход

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

Начать приемник для получения сообщений

Эта процедура остается такой же, как показано в настройке одного брокера.

пример

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
topic Multibrokerapplica-tion --from-beginning

Выход

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

Основные тематические операции

В этой главе мы обсудим различные основные темы операций.

Изменение темы

Как вы уже поняли, как создать тему в Kafka Cluster. Теперь давайте изменим созданную тему, используя следующую команду

Синтаксис

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

пример

We have already created a topic Hello-Kafka with single partition count and one replica factor. 
Now using alter command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Выход

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Удаление темы

Чтобы удалить тему, вы можете использовать следующий синтаксис.

Синтаксис

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

пример

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Выход

> Topic Hello-kafka marked for deletion

Примечание. Это не окажет влияния, если для delete.topic.enable не задано значение true.