Статьи

Достижение порядка гарантии в Кафке с разметкой

Одной из наиболее важных функций Kafka является балансировка нагрузки сообщений и обеспечение порядка в распределенном кластере, что в противном случае было бы невозможно в традиционной очереди.

Давайте сначала попробуем понять формулировку проблемы

Предположим, у нас есть тема, в которой отправляются сообщения, и есть потребитель, который потребляет эти сообщения.
Если есть только один потребитель, он получит сообщения в том порядке, в котором они находятся в очереди, или в том порядке, в котором они отправлены.

Теперь, чтобы достичь более высокой производительности, нам нужно обрабатывать сообщения быстрее и, следовательно, мы представляем несколько экземпляров приложения-потребителя.

Это может привести к проблеме, если сообщения содержат какое-либо состояние.

Давайте попробуем понять это на примере:

Если для конкретного идентификатора сообщения у нас есть 3 события:
Первый: СОЗДАТЬ
ВТОРОЕ: ОБНОВЛЕНИЕ
ТРЕТЬЕ: УДАЛИТЬ
Мы требуем, чтобы событие «ОБНОВЛЕНИЕ» или «УДАЛЕНИЕ» сообщения обрабатывалось ТОЛЬКО после его события «СОЗДАТЬ». Теперь, если 2 отдельных экземпляра получили одинаковые сообщения «CREATE» и «UPDATE» практически в одно и то же время, есть вероятность, что экземпляр с сообщением «UPDATE» попытается обработать его даже до того, как другой экземпляр завершит сообщение «CREATE». , Это может быть проблемой, так как потребитель попытается обновить сообщение, которое еще не создано, и выдаст исключение, и это «обновление» может быть потеряно.

Возможные решения

Первое решение, которое приходит на ум, — это Оптимистическая блокировка базы данных, которая предотвратит это, но сценарии исключений затем должны будут быть учтены. Это не очень прямой подход и может включать в себя больше проблем блокировки и параллелизма.

Другое более простое решение было бы, если бы сообщения / события определенного идентификатора всегда шли к определенному экземпляру и, следовательно, они были бы в порядке. В этом случае CREATE всегда будет выполняться до ОБНОВЛЕНИЯ, поскольку это был первоначальный порядок, в котором они были отправлены.

Вот тут и пригодится Кафка .

У Kafka есть понятие «разделов» в темах, которые могут обеспечить как гарантии заказа, так и распределение нагрузки по пулу процессов потребителей.

Каждый раздел представляет собой упорядоченную, неизменную последовательность сообщений, которая постоянно добавляется в журнал фиксации. Каждому сообщениям в разделах присваивается последовательный идентификационный номер, называемый смещением, который однозначно идентифицирует каждое сообщение в разделе.

Таким образом, тема будет иметь несколько разделов, каждый из которых будет поддерживать свое смещение.
Теперь, чтобы убедиться, что событие определенного идентификатора всегда должно идти в конкретный экземпляр, это можно сделать, если мы свяжем каждого потребителя с определенным разделом, а затем убедимся, что все события и сообщения определенного идентификатора всегда идут в определенный раздел, поэтому они всегда используются одним и тем же потребителем.

Чтобы добиться такого разделения, клиентский API Kafka предоставляет нам 2 способа:
1) Определите ключ для разделения, который будет использоваться в качестве ключа для логики разделения по умолчанию.
2) Напишите класс Partitioning, чтобы определить нашу собственную логику разделения.

Давайте рассмотрим первый:

Логика разделения по умолчанию

Стратегия секционирования по умолчанию — hash(key)%numPartitions . Если ключ нулевой, то выбирается случайный раздел. Итак, если мы хотим, чтобы ключ для раздела был определенным атрибутом, нам нужно передать его в конструктор ProducerRecord при отправке сообщения от источника.

Давайте посмотрим на пример:

ПРИМЕЧАНИЕ. Для запуска этого примера нам необходимо иметь следующее:
1. Запуск Zookeeper (на локальном хосте: 2181)
2. Запуск Кафки (на локальном хосте: 9092)
3. Создайте тему под названием «TRADING-INFO» с 3 разделами (для простоты у нас может быть только один брокер).
Для завершения вышеупомянутых трех, следуйте документации здесь .

Предположим, что мы отправляем информацию о сделках по теме «TRADING-INFO», которую потребляет потребитель.

1. Торговый класс

(Примечание: я использовал Ломбок здесь)

1
2
3
4
5
6
7
8
@Data
@Builder
public class Trade {
    private String id;
    private String securityId;
    private String fundShortName;
    private String value;
}

2. Кафка клиентская зависимость

Чтобы сделать Kafka Producer, нам нужно включить зависимость Kafka:

1
2
3
4
5
<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.10.0.0</version>
        </dependency>

Кафка Продюсер

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Producer {
 
    public static void main(String[] args) {
        final String TOPIC = "TRADING-INFO";
        KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties());
 
        Runnable task1 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ABCD", 1, 5);
        Runnable task2 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "PQ1234@1211111111111", 6, 10);
        Runnable task3 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ZX12345OOO", 11, 15);
 
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.submit(task1);
        executorService.submit(task2);
        executorService.submit(task3);
 
        executorService.shutdown();
 
    }
 
    private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) {
        for (int i = idStart; i <= idEnd; i++) {
            Trade trade = Trade.builder().id(i).securityId(securityId).value("abcd").build();
            try {
                String s = new ObjectMapper().writeValueAsString(trade);
                kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s));
                System.out.println("Sending to " + topic + "msg : " + s);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    }
 
    private static Properties getProducerProperties() {
        Properties props = new Properties();
        String KAFKA_SERVER_IP = "localhost:9092";
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return props;
    }
 
}

потребитель

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TConsumer {
 
    public static void main(String[] args) {
        final String TOPIC = "TRADING-INFO";
        final String CONSUMER_GROUP_ID = "consumer-group";
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getConsumerProperties(CONSUMER_GROUP_ID));
        kafkaConsumer.subscribe(Arrays.asList(TOPIC));
 
        while(true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            consumerRecords.forEach(e -> {
                System.out.println(e.value());
            });
        }
    }
 
    private static Properties getConsumerProperties(String consumerGroupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", consumerGroupId);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return props;
    }
}

Поскольку у нас есть 3 раздела, мы запустим 3 экземпляра Consumer.

Теперь, когда мы запускаем производителя с различными потоками, создающими сообщения с 3 типами «Тип безопасности», который является нашим ключом здесь. Мы увидим, что конкретный экземпляр всегда обслуживает определенный «тип безопасности» и, следовательно, сможет обрабатывать сообщения по порядку.

Выходы

Потребитель 1:

1
2
3
4
5
{"id":1,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":2,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":3,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":4,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":5,"securityId":"ABCD","fundShortName":null,"value":"abcd"}

Потребитель 2:

1
2
3
4
5
{"id":6,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":7,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":8,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":9,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":10,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}

Потребитель 3:

1
2
3
4
5
{"id":11,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":12,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":13,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":14,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":15,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}

Итак, здесь 3 типа «securityIds» генерировали разные значения хеш-функции и, следовательно, распределялись по разным разделам, гарантируя, что один тип сделки всегда переходит к конкретному экземпляру.

Теперь, если мы не хотим использовать логику секционирования по умолчанию, а наш сценарий более сложный, нам необходимо реализовать наш собственный Partitioner, в следующем блоге я объясню, как его использовать и как он работает.

Ссылка: Получение заказа Guarnetee в Кафке с разделением от нашего партнера JCG Анируд Бхатнагар в блоге anirudh bhatnagar .