Одной из наиболее важных функций Kafka является балансировка нагрузки сообщений и обеспечение порядка в распределенном кластере, что в противном случае было бы невозможно в традиционной очереди.
Давайте сначала попробуем понять формулировку проблемы
Предположим, у нас есть тема, в которой отправляются сообщения, и есть потребитель, который потребляет эти сообщения.
Если есть только один потребитель, он получит сообщения в том порядке, в котором они находятся в очереди, или в том порядке, в котором они отправлены.
Теперь, чтобы достичь более высокой производительности, нам нужно обрабатывать сообщения быстрее и, следовательно, мы представляем несколько экземпляров приложения-потребителя.
Это может привести к проблеме, если сообщения содержат какое-либо состояние.
Давайте попробуем понять это на примере:
Если для конкретного идентификатора сообщения у нас есть 3 события:
Первый: СОЗДАТЬ
ВТОРОЕ: ОБНОВЛЕНИЕ
ТРЕТЬЕ: УДАЛИТЬ
Мы требуем, чтобы событие «ОБНОВЛЕНИЕ» или «УДАЛЕНИЕ» сообщения обрабатывалось ТОЛЬКО после его события «СОЗДАТЬ». Теперь, если 2 отдельных экземпляра получили одинаковые сообщения «CREATE» и «UPDATE» практически в одно и то же время, есть вероятность, что экземпляр с сообщением «UPDATE» попытается обработать его даже до того, как другой экземпляр завершит сообщение «CREATE». , Это может быть проблемой, так как потребитель попытается обновить сообщение, которое еще не создано, и выдаст исключение, и это «обновление» может быть потеряно.
Возможные решения
Первое решение, которое приходит на ум, — это Оптимистическая блокировка базы данных, которая предотвратит это, но сценарии исключений затем должны будут быть учтены. Это не очень прямой подход и может включать в себя больше проблем блокировки и параллелизма.
Другое более простое решение было бы, если бы сообщения / события определенного идентификатора всегда шли к определенному экземпляру и, следовательно, они были бы в порядке. В этом случае CREATE всегда будет выполняться до ОБНОВЛЕНИЯ, поскольку это был первоначальный порядок, в котором они были отправлены.
Вот тут и пригодится Кафка .
У Kafka есть понятие «разделов» в темах, которые могут обеспечить как гарантии заказа, так и распределение нагрузки по пулу процессов потребителей.
Каждый раздел представляет собой упорядоченную, неизменную последовательность сообщений, которая постоянно добавляется в журнал фиксации. Каждому сообщениям в разделах присваивается последовательный идентификационный номер, называемый смещением, который однозначно идентифицирует каждое сообщение в разделе.
Таким образом, тема будет иметь несколько разделов, каждый из которых будет поддерживать свое смещение.
Теперь, чтобы убедиться, что событие определенного идентификатора всегда должно идти в конкретный экземпляр, это можно сделать, если мы свяжем каждого потребителя с определенным разделом, а затем убедимся, что все события и сообщения определенного идентификатора всегда идут в определенный раздел, поэтому они всегда используются одним и тем же потребителем.
Чтобы добиться такого разделения, клиентский API Kafka предоставляет нам 2 способа:
1) Определите ключ для разделения, который будет использоваться в качестве ключа для логики разделения по умолчанию.
2) Напишите класс Partitioning, чтобы определить нашу собственную логику разделения.
Давайте рассмотрим первый:
Логика разделения по умолчанию
Стратегия секционирования по умолчанию — hash(key)%numPartitions . Если ключ нулевой, то выбирается случайный раздел. Итак, если мы хотим, чтобы ключ для раздела был определенным атрибутом, нам нужно передать его в конструктор ProducerRecord при отправке сообщения от источника.
Давайте посмотрим на пример:
ПРИМЕЧАНИЕ. Для запуска этого примера нам необходимо иметь следующее:
1. Запуск Zookeeper (на локальном хосте: 2181)
2. Запуск Кафки (на локальном хосте: 9092)
3. Создайте тему под названием «TRADING-INFO» с 3 разделами (для простоты у нас может быть только один брокер).
Для завершения вышеупомянутых трех, следуйте документации здесь .
Предположим, что мы отправляем информацию о сделках по теме «TRADING-INFO», которую потребляет потребитель.
1. Торговый класс
(Примечание: я использовал Ломбок здесь)
|
1
2
3
4
5
6
7
8
|
@Data@Builderpublic class Trade { private String id; private String securityId; private String fundShortName; private String value;} |
2. Кафка клиентская зависимость
Чтобы сделать Kafka Producer, нам нужно включить зависимость Kafka:
|
1
2
3
4
5
|
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.0.0</version> </dependency> |
Кафка Продюсер
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
public class Producer { public static void main(String[] args) { final String TOPIC = "TRADING-INFO"; KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties()); Runnable task1 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ABCD", 1, 5); Runnable task2 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "PQ1234@1211111111111", 6, 10); Runnable task3 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ZX12345OOO", 11, 15); ExecutorService executorService = Executors.newFixedThreadPool(3); executorService.submit(task1); executorService.submit(task2); executorService.submit(task3); executorService.shutdown(); } private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) { for (int i = idStart; i <= idEnd; i++) { Trade trade = Trade.builder().id(i).securityId(securityId).value("abcd").build(); try { String s = new ObjectMapper().writeValueAsString(trade); kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s)); System.out.println("Sending to " + topic + "msg : " + s); } catch (JsonProcessingException e) { e.printStackTrace(); } } } private static Properties getProducerProperties() { Properties props = new Properties(); String KAFKA_SERVER_IP = "localhost:9092"; props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return props; }} |
потребитель
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public class TConsumer { public static void main(String[] args) { final String TOPIC = "TRADING-INFO"; final String CONSUMER_GROUP_ID = "consumer-group"; KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getConsumerProperties(CONSUMER_GROUP_ID)); kafkaConsumer.subscribe(Arrays.asList(TOPIC)); while(true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000); consumerRecords.forEach(e -> { System.out.println(e.value()); }); } } private static Properties getConsumerProperties(String consumerGroupId) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", consumerGroupId); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); return props; }} |
Поскольку у нас есть 3 раздела, мы запустим 3 экземпляра Consumer.
Теперь, когда мы запускаем производителя с различными потоками, создающими сообщения с 3 типами «Тип безопасности», который является нашим ключом здесь. Мы увидим, что конкретный экземпляр всегда обслуживает определенный «тип безопасности» и, следовательно, сможет обрабатывать сообщения по порядку.
Выходы
Потребитель 1:
|
1
2
3
4
5
|
{"id":1,"securityId":"ABCD","fundShortName":null,"value":"abcd"}{"id":2,"securityId":"ABCD","fundShortName":null,"value":"abcd"}{"id":3,"securityId":"ABCD","fundShortName":null,"value":"abcd"}{"id":4,"securityId":"ABCD","fundShortName":null,"value":"abcd"}{"id":5,"securityId":"ABCD","fundShortName":null,"value":"abcd"} |
Потребитель 2:
|
1
2
3
4
5
|
{"id":6,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}{"id":7,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}{"id":8,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}{"id":9,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}{"id":10,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"} |
Потребитель 3:
|
1
2
3
4
5
|
{"id":11,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}{"id":12,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}{"id":13,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}{"id":14,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}{"id":15,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"} |
Итак, здесь 3 типа «securityIds» генерировали разные значения хеш-функции и, следовательно, распределялись по разным разделам, гарантируя, что один тип сделки всегда переходит к конкретному экземпляру.
Теперь, если мы не хотим использовать логику секционирования по умолчанию, а наш сценарий более сложный, нам необходимо реализовать наш собственный Partitioner, в следующем блоге я объясню, как его использовать и как он работает.
| Ссылка: | Получение заказа Guarnetee в Кафке с разделением от нашего партнера JCG Анируд Бхатнагар в блоге anirudh bhatnagar . |