Статьи

Использование Кафки с Junit

Одной из замечательных возможностей, которые предоставляет отличный проект Spring Kafka , помимо более простой в использовании абстракции над необработанными Kafka Producer и Consumer , является способ использования Kafka в тестах. Это достигается путем предоставления встроенной версии Kafka, которую можно очень легко установить и разложить.

Все, что проекту необходимо включить в эту поддержку, это модуль «spring-kafka-test», для сборки gradle следующим образом:

1
testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"

Обратите внимание, что я использую версию проекта со снимком, поскольку она поддерживает Kafka 0.10+.

Имея эту зависимость, Embedded Kafka можно развернуть в тесте с помощью @ClassRule из JUnit:

1
2
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");

Это запустит кластер Kafka с двумя брокерами, с темой «сообщения», использующей 2 раздела, и правило класса обеспечит, чтобы кластер Kafka запускался до запуска тестов, а затем завершал его работу.

Вот как выглядит пример с Raw Kafka Producer / Consumer, использующим этот встроенный кластер Kafka, встроенный Kafka можно использовать для получения свойств, требуемых для Kafka Producer / Consumer:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get();
producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get();
producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get();
producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();
 
 
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");
 
final CountDownLatch latch = new CountDownLatch(4);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
    KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
    kafkaConsumer.subscribe(Collections.singletonList("messages"));
    try {
        while (true) {
            ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
                latch.countDown();
            }
        }
    } finally {
        kafkaConsumer.close();
    }
});
 
assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();

Более подробный тест доступен здесь

Ссылка: Использование Kafka с Junit от нашего партнера JCG Биджу Кунджуммен в блоге all and sundry.