Учебники

Apache Kafka — простой пример для продюсера

Давайте создадим приложение для публикации и использования сообщений с помощью клиента Java. Клиент производителя Kafka состоит из следующих API.

KafkaProducer API

Давайте разберемся с наиболее важным набором API производителей Kafka в этом разделе. Центральная часть API KafkaProducer — класс KafkaProducer . Класс KafkaProducer предоставляет возможность подключить брокер Kafka в своем конструкторе с помощью следующих методов.

  • Класс KafkaProducer предоставляет метод send для асинхронной отправки сообщений в тему. Подпись send () выглядит следующим образом

Класс KafkaProducer предоставляет метод send для асинхронной отправки сообщений в тему. Подпись send () выглядит следующим образом

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord — производитель управляет буфером записей, ожидающих отправки.

  • Обратный вызов — предоставленный пользователем обратный вызов для выполнения, когда запись была подтверждена сервером (ноль означает отсутствие обратного вызова).

ProducerRecord — производитель управляет буфером записей, ожидающих отправки.

Обратный вызов — предоставленный пользователем обратный вызов для выполнения, когда запись была подтверждена сервером (ноль означает отсутствие обратного вызова).

  • Класс KafkaProducer предоставляет метод сброса, обеспечивающий фактическое завершение всех ранее отправленных сообщений. Синтаксис метода очистки следующий:

Класс KafkaProducer предоставляет метод сброса, обеспечивающий фактическое завершение всех ранее отправленных сообщений. Синтаксис метода очистки следующий:

public void flush()
  • Класс KafkaProducer предоставляет метод partitionFor, который помогает в получении метаданных раздела для данной темы. Это может быть использовано для пользовательского разбиения. Суть этого метода заключается в следующем —

Класс KafkaProducer предоставляет метод partitionFor, который помогает в получении метаданных раздела для данной темы. Это может быть использовано для пользовательского разбиения. Суть этого метода заключается в следующем —

public Map metrics()

Возвращает карту внутренних метрик, поддерживаемых производителем.

  • public void close () — класс KafkaProducer предоставляет блоки методов close, пока все ранее отправленные запросы не будут выполнены.

public void close () — класс KafkaProducer предоставляет блоки методов close, пока все ранее отправленные запросы не будут выполнены.

API продюсера

Центральной частью API производителя является класс Producer . Класс Producer предоставляет возможность подключить брокер Kafka в своем конструкторе следующими методами.

Класс продюсера

Класс продюсера предоставляет метод send для отправки сообщений в одну или несколько тем с использованием следующих подписей.


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

Существует два типа производителей — Sync и Async .

Та же конфигурация API применима и к производителю Sync . Разница между ними заключается в том, что производитель синхронизации отправляет сообщения напрямую, но отправляет сообщения в фоновом режиме. Асинхронный производитель предпочтителен, когда вы хотите более высокую пропускную способность. В предыдущих выпусках, таких как 0.8, у асинхронного производителя нет обратного вызова для send () для регистрации обработчиков ошибок. Это доступно только в текущей версии 0.9.

public void close ()

Класс Producer предоставляет метод close для закрытия соединений пула производителей со всеми брокерами Kafka.

Настройки конфигурации

Основные параметры конфигурации API производителя приведены в следующей таблице для лучшего понимания —

S.No Настройки конфигурации и описание
1

ID клиента

определяет приложение производителя

2

producer.type

синхронизация или асинхронность

3

ACKs

Конфигурация acks контролирует критерии по запросам производителя.

4

повторы

Если запрос производителя не удался, автоматически повторите попытку с указанным значением.

5

bootstrap.servers

загрузочный список брокеров.

6

linger.ms

если вы хотите уменьшить количество запросов, вы можете установить для linger.ms нечто большее, чем какое-либо значение.

7

key.serializer

Ключ для интерфейса сериализатора.

8

value.serializer

значение для интерфейса сериализатора.

9

размер партии

Размер буфера.

10

buffer.memory

контролирует общий объем памяти, доступной производителю для буферизации.

ID клиента

определяет приложение производителя

producer.type

синхронизация или асинхронность

ACKs

Конфигурация acks контролирует критерии по запросам производителя.

повторы

Если запрос производителя не удался, автоматически повторите попытку с указанным значением.

bootstrap.servers

загрузочный список брокеров.

linger.ms

если вы хотите уменьшить количество запросов, вы можете установить для linger.ms нечто большее, чем какое-либо значение.

key.serializer

Ключ для интерфейса сериализатора.

value.serializer

значение для интерфейса сериализатора.

размер партии

Размер буфера.

buffer.memory

контролирует общий объем памяти, доступной производителю для буферизации.

ProducerRecord API

ProducerRecord — это пара ключ / значение, отправляемая в кластер Kafka. Конструктор классаProducerRecord для создания записи с парами разделов, ключей и значений с использованием следующей подписи.

public ProducerRecord (string topic, int partition, k key, v value)
  • Тема — определенное пользователем название темы, которое будет добавлено в запись.

  • Раздел — количество разделов

  • Ключ — ключ, который будет включен в запись.

  • Значение — запись содержимого

Тема — определенное пользователем название темы, которое будет добавлено в запись.

Раздел — количество разделов

Ключ — ключ, который будет включен в запись.

public ProducerRecord (string topic, k key, v value)

Конструктор класса ProducerRecord используется для создания записи с ключом, парами значений и без разделения.

  • Тема — Создать тему для назначения записи.

  • Ключ — ключ для записи.

  • Значение — запись содержимого.

Тема — Создать тему для назначения записи.

Ключ — ключ для записи.

Значение — запись содержимого.

public ProducerRecord (string topic, v value)

Класс ProducerRecord создает запись без раздела и ключа.

  • Тема — создать тему.

  • Значение — запись содержимого.

Тема — создать тему.

Значение — запись содержимого.

Методы класса ProducerRecord перечислены в следующей таблице:

S.No Методы класса и описание
1

публичная строковая тема ()

Тема будет добавлена ​​в запись.

2

открытый ключ K ()

Ключ, который будет включен в запись. Если такой клавиши нет, значение null будет возвращено здесь.

3

общедоступное значение V ()

Записать содержимое.

4

раздел ()

Количество разделов для записи

публичная строковая тема ()

Тема будет добавлена ​​в запись.

открытый ключ K ()

Ключ, который будет включен в запись. Если такой клавиши нет, значение null будет возвращено здесь.

общедоступное значение V ()

Записать содержимое.

раздел ()

Количество разделов для записи

Приложение SimpleProducer

Перед созданием приложения сначала запустите ZooKeeper и Kafka broker, затем создайте свою собственную тему в Kafka broker, используя команду create topic. После этого создайте класс Java с именем Sim-pleProducer.java и введите следующую кодировку.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Компиляция — приложение может быть скомпилировано с помощью следующей команды.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Выполнение — приложение может быть выполнено с помощью следующей команды.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Выход

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Простой потребительский пример

На данный момент мы создали производителя для отправки сообщений в кластер Kafka. Теперь давайте создадим потребителя для потребления сообщений из кластера Kafka. API KafkaConsumer используется для приема сообщений из кластера Kafka. Конструктор класса KafkaConsumer определен ниже.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs — вернуть карту пользовательских конфигов.

Класс KafkaConsumer имеет следующие важные методы, которые перечислены в таблице ниже.

S.No Метод и описание
1

public java.util.Set назначение <TopicPar-tition> ()

Получить набор разделов, назначенных в настоящее время потребителем.

2

подписка на открытую строку ()

Подпишитесь на данный список тем, чтобы получить динамически подписанные разделы.

3

public void sub-scribe (темы java.util.List <java.lang.String>, слушатель ConsumerRe-balanceListener)

Подпишитесь на данный список тем, чтобы получить динамически подписанные разделы.

4

публичный аннулировать подписку ()

Отписаться на темы из данного списка разделов.

5

публичный подпункт void (темы java.util.List <java.lang.String>)

Подпишитесь на данный список тем, чтобы получить динамически подписанные разделы. Если данный список тем пуст, он обрабатывается так же, как и отмена подписки ().

6

открытый подписок void (шаблон java.util.regex.Pattern, слушатель ConsumerRebalanceLis-tener)

Шаблон аргумента относится к шаблону подписки в формате регулярного выражения, а аргумент слушателя получает уведомления от шаблона подписки.

7

public void as-sign (разделы java.util.List <TopicPartition>)

Вручную назначьте список разделов клиенту.

8

опрос()

Получить данные для тем или разделов, указанных с помощью одного из API подписки / назначения. Это вернет ошибку, если темы не подписаны перед опросом данных.

9

public void commitSync ()

Смещение коммитов, возвращаемое при последнем опросе () для всех подписанных списков тем и разделов. Та же операция применяется к commitAsyn ().

10

public void seek (раздел TopicPartition, длинное смещение)

Получить текущее значение смещения, которое потребитель будет использовать в следующем методе poll ().

11

общедоступное резюме ()

Возобновить приостановленные разделы.

12

public void wakeup ()

Пробуждение потребителя.

public java.util.Set назначение <TopicPar-tition> ()

Получить набор разделов, назначенных в настоящее время потребителем.

подписка на открытую строку ()

Подпишитесь на данный список тем, чтобы получить динамически подписанные разделы.

public void sub-scribe (темы java.util.List <java.lang.String>, слушатель ConsumerRe-balanceListener)

Подпишитесь на данный список тем, чтобы получить динамически подписанные разделы.

публичный аннулировать подписку ()

Отписаться на темы из данного списка разделов.

публичный подпункт void (темы java.util.List <java.lang.String>)

Подпишитесь на данный список тем, чтобы получить динамически подписанные разделы. Если данный список тем пуст, он обрабатывается так же, как и отмена подписки ().

открытый подписок void (шаблон java.util.regex.Pattern, слушатель ConsumerRebalanceLis-tener)

Шаблон аргумента относится к шаблону подписки в формате регулярного выражения, а аргумент слушателя получает уведомления от шаблона подписки.

public void as-sign (разделы java.util.List <TopicPartition>)

Вручную назначьте список разделов клиенту.

опрос()

Получить данные для тем или разделов, указанных с помощью одного из API подписки / назначения. Это вернет ошибку, если темы не подписаны перед опросом данных.

public void commitSync ()

Смещение коммитов, возвращаемое при последнем опросе () для всех подписанных списков тем и разделов. Та же операция применяется к commitAsyn ().

public void seek (раздел TopicPartition, длинное смещение)

Получить текущее значение смещения, которое потребитель будет использовать в следующем методе poll ().

общедоступное резюме ()

Возобновить приостановленные разделы.

public void wakeup ()

Пробуждение потребителя.

ConsumerRecord API

API ConsumerRecord используется для получения записей из кластера Kafka. Этот API состоит из имени темы, номера раздела, из которого принимается запись, и смещения, которое указывает на запись в разделе Kafka. Класс ConsumerRecord используется для создания записи потребителя с определенным именем темы, количеством разделов и парами <ключ, значение>. Имеет следующую подпись.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Тема — Название темы для записи потребителя, полученной из кластера Kafka.

  • Раздел — Раздел по теме.

  • Ключ — ключ записи, если ключа не существует, возвращается ноль.

  • Значение — запись содержимого.

Тема — Название темы для записи потребителя, полученной из кластера Kafka.

Раздел — Раздел по теме.

Ключ — ключ записи, если ключа не существует, возвращается ноль.

Значение — запись содержимого.

ConsumerRecords API

ConsumerRecords API действует как контейнер для ConsumerRecord. Этот API используется для хранения списка ConsumerRecord для каждого раздела для определенной темы. Его конструктор определен ниже.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition — возвращает карту разделов для определенной темы.

  • Записи — возврат списка ConsumerRecord.

TopicPartition — возвращает карту разделов для определенной темы.

Записи — возврат списка ConsumerRecord.

В классе ConsumerRecords определены следующие методы.

S.No Методы и описание
1

public int count ()

Количество записей по всем темам.

2

публичный набор разделов ()

Набор разделов с данными в этом наборе записей (если данные не были возвращены, то набор пуст).

3

публичный итератор итератор ()

Итератор позволяет циклически проходить через коллекцию, получать или перемещать элементы.

4

Публичный список записей ()

Получить список записей для данного раздела.

public int count ()

Количество записей по всем темам.

публичный набор разделов ()

Набор разделов с данными в этом наборе записей (если данные не были возвращены, то набор пуст).

публичный итератор итератор ()

Итератор позволяет циклически проходить через коллекцию, получать или перемещать элементы.

Публичный список записей ()

Получить список записей для данного раздела.

Настройки конфигурации

Параметры конфигурации основных параметров конфигурации API клиента-клиента перечислены ниже:

S.No Настройки и описание
1

bootstrap.servers

Начальный список брокеров.

2

group.id

Назначает отдельного потребителя в группу.

3

enable.auto.commit

Включите автоматическую фиксацию для смещений, если значение истинно, иначе не зафиксировано.

4

auto.commit.interval.ms

Верните, как часто обновленные смещения записываются в ZooKeeper.

5

session.timeout.ms

Указывает, сколько миллисекунд Кафка будет ждать, пока ZooKeeper ответит на запрос (чтение или запись), прежде чем отказаться от и продолжать потреблять сообщения.

bootstrap.servers

Начальный список брокеров.

group.id

Назначает отдельного потребителя в группу.

enable.auto.commit

Включите автоматическую фиксацию для смещений, если значение истинно, иначе не зафиксировано.

auto.commit.interval.ms

Верните, как часто обновленные смещения записываются в ZooKeeper.

session.timeout.ms

Указывает, сколько миллисекунд Кафка будет ждать, пока ZooKeeper ответит на запрос (чтение или запись), прежде чем отказаться от и продолжать потреблять сообщения.

Приложение SimpleConsumer

Шаги приложения производителя остаются неизменными. Сначала запустите брокера ZooKeeper и Kafka. Затем создайте приложение SimpleConsumer с помощью класса Java с именем SimpleCon-sumer.java и введите следующий код.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Компиляция — приложение может быть скомпилировано с помощью следующей команды.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Выполнение — приложение может быть выполнено с помощью следующей команды

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Вход — откройте CLI производителя и отправьте несколько сообщений в тему. Вы можете указать smple input как «Hello Consumer».

Вывод — следующим будет вывод.