В этом блоге мы собираемся исследовать разделитель Kafka. Мы попытаемся понять, почему разделителя по умолчанию недостаточно, и когда вам может понадобиться специальный разделитель. Мы также рассмотрим вариант использования и создадим код для пользовательского разделителя. Я предполагаю, что вы хорошо знакомы с Кафкой. Давайте разберемся с поведением разделителя по умолчанию.
Разделитель по умолчанию следует этим правилам:
- Если производитель указывает номер раздела в записи сообщения, используйте его.
- Если производитель не предоставляет номер раздела, но предоставляет ключ, выберите раздел, основываясь на хэш-значении ключа.
- Если номер раздела или ключ отсутствуют, выберите раздел в циклическом порядке.
Таким образом, вы можете использовать разделитель по умолчанию в трех сценариях:
- Если вы уже знаете номер раздела, в который хотите отправить запись сообщения, используйте первое правило.
- Если вы хотите распространять данные на основе хеш-ключа, вы будете использовать второе правило стандартного разделителя.
- Если вам не важно, какая запись сообщения раздела будет сохранена, тогда вы будете использовать третье правило разделителя по умолчанию.
Есть две проблемы с ключом:
- Если производитель предоставляет один и тот же ключ для каждой записи сообщения, то хэширование даст вам тот же хэш-номер, но не гарантирует, что если вы предоставите два разных ключа, то оно никогда не даст вам один и тот же хэш-номер.
- Разделитель по умолчанию использует хеш-значение ключа и общее количество разделов в теме, чтобы определить номер раздела. Если вы увеличите номер раздела, разделитель по умолчанию вернет разные номера, даже если вы предоставите один и тот же ключ.
Теперь у вас могут возникнуть вопросы о том, как решить эту проблему.
Ответ на этот вопрос очень прост: вы можете реализовать свой алгоритм на основе ваших требований и использовать его в пользовательском разделителе.
Вам также может понравиться:
Kafka Internals: темы и разделы .
Kafka Custom Partitioner Пример
Давайте создадим пример варианта использования и реализуем пользовательский разделитель. Попытайтесь понять постановку проблемы с помощью диаграммы.
Предположим, мы собираем данные из разных отделов. Все отделы отправляют данные в одну тему с именем отдел. Я запланировал пять разделов по теме. Но я хочу, чтобы два раздела были выделены для конкретного отдела под названием ИТ, а остальные три — для остальных отделов. Как бы вы достигли этого?
Вы можете решить это требование и любые другие требования к разделению, внедрив пользовательский разделитель.
Кафка Продюсер
Давайте посмотрим на код производителя.
Scala
x
1
package com.knoldus
2
import java.util.Properties
4
import org.apache.kafka.clients.producer._
5
object KafkaProducer extends App {
7
val props = new Properties()
8
val topicName = "department"
9
props.put("bootstrap.servers", "localhost:9092,localhost:9093")
10
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
11
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
12
props.put("partitioner.class", "com.knoldus.CustomPartitioner")
13
val producer = new KafkaProducer[String, String](props)
15
try {
17
for (i <- 0 to 5) {
18
val record = new ProducerRecord[String, String](topicName,"IT" + i,"My Site is knoldus.com " + i)
19
producer.send(record)
20
}
21
for (i <- 0 to 5) {
22
val record = new ProducerRecord[String, String](topicName,"COMP" + i,"My Site is knoldus.com " + i)
23
producer.send(record)
24
}
25
} catch {
26
case e: Exception => e.printStackTrace()
27
} finally {
28
producer.close()
29
}
30
}
Первым шагом при написании сообщений в Kafka является создание объекта-производителя со свойствами, которые вы хотите передать производителю. Производитель Kafka имеет три обязательных свойства, как вы можете видеть в приведенном выше коде:
bootstrap.serversz
: пары портов брокера Kafka, которые производитель будет использовать для установления соединения с кластером Kafka. Рекомендуется включить как минимум двух брокеров Kafka, потому что, если один брокер Kafka выйдет из строя, производитель все равно сможет подключиться к кластеру Kafka.Key.serializer
Имя класса, который будет использоваться для сериализации ключа.value.serializer
: Имя класса, который будет использоваться для сериализации значения.
Если вы посмотрите на остальную часть кода, есть только три шага:
- Создайте объект KafkaProducer.
- Создайте объект ProducerRecord.
- Отправьте запись брокеру.
Это все, что мы делаем в Kafka Producer.
Kafka Custom Partitioner
Нам нужно создать наш класс путем реализации Partitioner
интерфейса. Ваш пользовательский класс разделителя должен реализовывать три метода из интерфейса.
Configure
,Partition
,Close
,
Давайте посмотрим на код.
Scala
xxxxxxxxxx
1
package com.knoldus
2
import java.util
4
import org.apache.kafka.common.record.InvalidRecordException
5
import org.apache.kafka.common.utils.Utils
6
import org.apache.kafka.clients.producer.Partitioner
7
import org.apache.kafka.common.Cluster
8
class CustomPartitioner extends Partitioner {
10
val departmentName = "IT"
11
override def configure(configs: util.Map[String, _]): Unit = {}
12
override def partition(topic: String,key: Any, keyBytes: Array[Byte], value: Any,valueBytes: Array[Byte],cluster: Cluster): Int = {
14
val partitions = cluster.partitionsForTopic(topic)
15
val numPartitions = partitions.size
16
val it = Math.abs(numPartitions * 0.4).asInstanceOf[Int]
17
if ((keyBytes == null) || (!key.isInstanceOf[String]))
19
throw new InvalidRecordException("All messages must have department name as key")
20
if (key.asInstanceOf[String].startsWith(departmentName)) {
22
val p = Utils.toPositive(Utils.murmur2(keyBytes)) % it
23
p
24
} else {
25
val p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - it) + it
26
p
27
}
28
}
29
override def close(): Unit = {}
31
}
configure
и close
методы используются для инициализации и очистки. В нашем примере нам не нужно ничего очищать и инициализировать.
Метод разбиения - это место, где происходит все действие. Производитель будет вызывать этот метод для каждой записи сообщения. Вход в этот метод - ключ, тема, сведения о кластере. нам нужно вернуть целое число в качестве номера раздела. Это место, где мы должны написать наш алгоритм.
Алгоритм
Давайте попробуем понять алгоритм, который я реализовал. Я применяю свой алгоритм в четыре простых шага.
- Первый шаг - определить количество разделов и зарезервировать 40% для ИТ-отдела. Если у меня есть пять разделов для этой темы, эта логика зарезервирует два раздела для ИТ. Следующий вопрос, как мы можем получить количество разделов в теме?
В качестве входных данных мы получили объект кластера, а метод
partitionsForTopic
выдаст нам список всех разделов. Затем мы берем размер списка. Это количество разделов в теме. Затем мы устанавливаем ИТ как 40% от числа разделов. Итак, если у меня есть пять разделов, IT должен быть установлен на 2. - Если мы не получим сообщение Key, выведите исключение. Нам нужен Ключ, потому что Ключ сообщает нам название отдела. Не зная названия отдела, мы не можем решить, что сообщение должно идти в один из двух зарезервированных разделов или оно должно идти в три других раздела.
- Следующим шагом является определение номера раздела. Если Ключ = IT, то мы хэшируем значение сообщения, делим его на 2 и принимаем мод как номер раздела. Используя мод, мы всегда будем получать 0 или 1.
- Если
Key != IT
, то мы разделим его на 3 и снова возьмем мод. Мод будет где-то между 0 и 2. Итак, я добавляю 2, чтобы сместить его на 2
Kafka Consumer
Давайте посмотрим на потребительский код.
Scala
xxxxxxxxxx
1
package com.knoldus
2
import java.util
4
import java.util.Properties
5
import scala.jdk.CollectionConverters._
6
import org.apache.kafka.clients.consumer.KafkaConsumer
7
object KafkaConsumer extends App {
9
val props: Properties = new Properties()
11
val topicName = "department"
12
props.put("group.id", "test")
14
props.put("bootstrap.servers", "localhost:9092,localhost:9093")
15
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
16
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
17
val consumer = new KafkaConsumer(props)
19
try {
20
consumer.subscribe(util.Arrays.asList(topicName))
21
while (true) {
22
val records = consumer.poll(10)
23
for (record <- records.asScala) {
24
println("Topic: " + record.topic() + ", Offset: " + record.offset() +", Partition: " + record.partition())
25
}
26
}
27
} catch {
28
case e: Exception => e.printStackTrace()
29
} finally {
30
consumer.close()
31
}
32
}
33
Потребитель Kafka имеет три обязательных свойства, как вы можете видеть в приведенном выше коде:
bootstrap.servers
: пары портов брокера Kafka, которые потребитель будет использовать для установления соединения с кластером Kafka. Рекомендуется включить как минимум двух брокеров Kafka, потому что, если один брокер Kafka выйдет из строя, потребитель все равно сможет подключиться к кластеру Kafka. ,key.deserializer
Имя класса, который будет использоваться для десериализации ключа.value.deserializer
: Имя класса , который будет использоваться для десериализации значения.
Если вы посмотрите на остальную часть кода, есть только два шага:
- Подписывайтесь на тему.
- Потребляйте сообщения из темы.
Это все, что мы делаем в Kafka Consumer.
Я надеюсь, вам понравится этот блог. Теперь вы можете создать собственный разделитель в Kafka, используя scala. Если вы хотите исходный код , пожалуйста, скачайте его.
Спасибо за прочтение!