В этом блоге мы собираемся исследовать разделитель Kafka . Мы попытаемся понять, почему разделителя по умолчанию недостаточно, и когда вам может понадобиться специальный разделитель. Мы также рассмотрим вариант использования и создадим код для пользовательского разделителя. Я предполагал, что у вас есть хорошее знание Кафки. Давайте разберемся с поведением разделителя по умолчанию.
Разделитель по умолчанию следует этим правилам:
- Если производитель указывает номер раздела в записи сообщения, используйте его.
- Если производитель не предоставляет номер раздела, но предоставляет ключ, выберите раздел, основываясь на хэш-значении ключа.
- Если номер раздела или ключ отсутствуют, выберите раздел в циклическом порядке.
Таким образом, вы можете использовать разделитель по умолчанию в трех сценариях:
- Если вы уже знаете номер раздела, в который хотите отправить запись сообщения, то используйте первое правило.
- Если вы хотите распространять данные на основе хеш-ключа, вы будете использовать второе правило разделителя по умолчанию.
- Если вам не важно, в каком разделе будет храниться запись сообщения, вы будете использовать третье правило разделителя по умолчанию.
Вам также могут понравиться:
Kafka Producer и Consumer Примеры использования Java .
Есть две проблемы с ключом:
- Если производитель предоставляет один и тот же ключ для каждой записи сообщения, хеширование даст вам один и тот же хэш-номер, но не гарантирует, что если вы предоставите два разных ключа, то оно никогда не даст вам один и тот же хэш-номер.
- Разделитель по умолчанию использует хеш-значение ключа и общее количество разделов в теме, чтобы определить номер раздела. Если вы увеличите номер раздела, то разделитель по умолчанию будет возвращать разные номера равномерно, если вы предоставите один и тот же ключ.
Теперь у вас могут возникнуть вопросы о том, как решить эту проблему?
Ответ на этот вопрос очень прост: вы можете реализовать свой собственный алгоритм на основе ваших требований и использовать его в пользовательском разделителе.
Kafka Custom Partitioner Пример
Давайте создадим пример варианта использования и реализуем пользовательский разделитель. Попытайтесь понять постановку проблемы с помощью диаграммы.
Предположим, мы собираем данные из разных отделов. Все отделы отправляют данные в одну тему с именем отдел. Я запланировал пять разделов по теме. Но я хочу, чтобы два раздела были выделены для определенного отдела с именем ИТ, а остальные три — для остальных отделов. Как бы вы достигли этого?
Вы можете решить это требование и любые другие требования к разделению, внедрив пользовательский разделитель.
Кафка Продюсер
Давайте посмотрим на код производителя.
Scala
1
package com.knoldus
2
import java.util.Properties
4
import org.apache.kafka.clients.producer._
5
object KafkaProducer extends App {
7
val props = new Properties()
8
val topicName = "department"
9
props.put("bootstrap.servers", "localhost:9092,localhost:9093")
10
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
11
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
12
props.put("partitioner.class", "com.knoldus.CustomPartitioner")
13
val producer = new KafkaProducer[String, String](props)
15
try {
17
for (i <- 0 to 5) {
18
val record = new ProducerRecord[String, String](topicName,"IT" + i,"My Site is knoldus.com " + i)
19
producer.send(record)
20
}
21
for (i <- 0 to 5) {
22
val record = new ProducerRecord[String, String](topicName,"COMP" + i,"My Site is knoldus.com " + i)
23
producer.send(record)
24
}
25
} catch {
26
case e: Exception => e.printStackTrace()
27
} finally {
28
producer.close()
29
}
30
}
Первым шагом при написании сообщений в Kafka является создание объекта-производителя со свойствами, которые вы хотите передать производителю. Производитель Kafka имеет три обязательных свойства, как вы можете видеть в приведенном выше коде:
- bootstrap.servers : Пары портов брокера Kafka, которые производитель будет использовать для установления соединения с кластером Kafka. Рекомендуется включить как минимум двух брокеров Kafka, потому что если один брокер Kafka выйдет из строя, производитель все равно сможет подключиться к кластеру Kafka.
- Key.serializer : имя класса, который будет использоваться для сериализации ключа.
- value.serializer : имя класса, который будет использоваться для сериализации значения.
Если вы посмотрите на остальную часть кода, есть только три шага:
- Создать
KafkaProducer
объект. - Создать
ProducerRecord
объект. - Отправьте запись брокеру.
Это все, что мы делаем в Kafka Producer .
Kafka Custom Partitioner
Нам нужно создать наш класс путем реализации интерфейса Partitioner. Ваш пользовательский класс разделителя должен реализовывать три метода из интерфейса.
Configure
,Partition
,Close
,
Давайте посмотрим на код.
Scala
xxxxxxxxxx
1
package com.knoldus
2
import java.util
4
import org.apache.kafka.common.record.InvalidRecordException
5
import org.apache.kafka.common.utils.Utils
6
import org.apache.kafka.clients.producer.Partitioner
7
import org.apache.kafka.common.Cluster
8
class CustomPartitioner extends Partitioner {
10
val departmentName = "IT"
11
override def configure(configs: util.Map[String, _]): Unit = {}
12
override def partition(topic: String,key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
14
val partitions = cluster.partitionsForTopic(topic)
15
val numPartitions = partitions.size
16
val it = Math.abs(numPartitions * 0.4).asInstanceOf[Int]
17
if ((keyBytes == null) || (!key.isInstanceOf[String]))
19
throw new InvalidRecordException("All messages must have department name as key")
20
if (key.asInstanceOf[String].startsWith(departmentName)) {
22
val p = Utils.toPositive(Utils.murmur2(keyBytes)) % it
23
p
24
} else {
25
val p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - it) + it
26
p
27
}
28
}
29
override def close(): Unit = {}
31
}
configure
и close
используются для инициализации и очистки. В нашем примере нам не нужно ничего очищать и инициализировать.
Метод разбиения - это место, где происходит все действие. Производитель будет вызывать этот метод для каждой записи сообщения. Вход в этот метод - ключ, тема, сведения о кластере. нам нужно вернуть целое число в качестве номера раздела. Это место, где мы должны написать наш собственный алгоритм.
Алгоритм
Давайте попробуем понять алгоритм, который я реализовал. Я применяю свой собственный алгоритм в четыре простых шага.
- Первым шагом является определение количества разделов и резервирование 40% из них для ИТ-отдела. Если у меня есть пять разделов для этой темы, эта логика зарезервирует два раздела для ИТ. Следующий вопрос, как мы можем получить количество разделов в теме?
У нас есть кластерный объект в качестве входных данных, и метод
partitionsForTopic
выдаст нам список всех разделов. Затем мы берем размер списка. Это количество разделов в теме. Затем мы устанавливаем ИТ как 40% от числа разделов. Итак, если у меня есть пять разделов, IT должен быть установлен на 2. - Если мы не получим сообщение Key, мы выдадим исключение. Нам нужен Ключ, потому что Ключ сообщает нам название отдела. Не зная названия отдела, мы не можем решить, что сообщение должно идти в один из двух зарезервированных разделов или оно должно идти в три других раздела.
- Следующим шагом является определение номера раздела. Если Ключ = IT, то мы хэшируем значение сообщения, делим его на 2 и принимаем мод в качестве номера раздела. Использование мода гарантирует, что мы всегда получим 0 или 1.
- Если Ключ! = IT, то мы делим его на 3 и снова берем мод. Мод будет где-то между 0 и 2. Итак, я добавляю 2, чтобы сместить его на 2
Kafka Consumer
Давайте посмотрим на потребительский код.
Scala
xxxxxxxxxx
1
package com.knoldus
2
import java.util
4
import java.util.Properties
5
import scala.jdk.CollectionConverters._
6
import org.apache.kafka.clients.consumer.KafkaConsumer
7
object KafkaConsumer extends App {
9
val props: Properties = new Properties()
11
val topicName = "department"
12
props.put("group.id", "test")
14
props.put("bootstrap.servers", "localhost:9092,localhost:9093")
15
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
16
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
17
val consumer = new KafkaConsumer(props)
19
try {
20
consumer.subscribe(util.Arrays.asList(topicName))
21
while (true) {
22
val records = consumer.poll(10)
23
for (record <- records.asScala) {
24
println("Topic: " + record.topic() + ", Offset: " + record.offset() +", Partition: " + record.partition())
25
}
26
}
27
} catch {
28
case e: Exception => e.printStackTrace()
29
} finally {
30
consumer.close()
31
}
32
}
33
Потребитель Kafka имеет три обязательных свойства, как вы можете видеть в приведенном выше коде:
- bootstrap.servers: пары портов брокера Kafka, которые потребитель будет использовать для установления соединения с кластером Kafka. Рекомендуется включить как минимум двух брокеров Kafka, потому что, если один брокер Kafka выйдет из строя, потребитель все равно сможет подключить кафку кластера.
- key.deserializer: имя класса, который будет использоваться для десериализации ключа.
- value.deserializer: имя класса, который будет использоваться для десериализации значения.
Если вы посмотрите на остальную часть кода, есть только два шага.
- Шаг 1 - Подписаться на тему.
- Шаг 2 - Изъять сообщения из темы.
Это все, что мы делаем в Kafka Consumer.
Я надеюсь, вам понравится этот блог, и вы сможете создать собственный разделитель в Kafka, используя scala. если вы хотите исходный код, пожалуйста, не стесняйтесь загружать исходный код .
Спасибо за прочтение!
Ссылки:
1. https://kafka.apache.org/documentation/ .
2. https://docs.confluent.io/ ..