Одной из замечательных возможностей, которые предоставляет отличный проект Spring Kafka , помимо более простой в использовании абстракции над необработанными Kafka Producer и Consumer , является способ использования Kafka в тестах. Это достигается путем предоставления встроенной версии Kafka, которую можно очень легко установить и разложить.
Все, что проекту необходимо включить в эту поддержку, это модуль «spring-kafka-test», для сборки gradle следующим образом:
1
|
testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT" |
Обратите внимание, что я использую версию проекта со снимком, поскольку она поддерживает Kafka 0.10+.
Имея эту зависимость, Embedded Kafka можно развернуть в тесте с помощью @ClassRule из JUnit:
1
2
|
@ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded( 2 , true , 2 , "messages" ); |
Это запустит кластер Kafka с двумя брокерами, с темой «сообщения», использующей 2 раздела, и правило класса обеспечит, чтобы кластер Kafka запускался до запуска тестов, а затем завершал его работу.
Вот как выглядит пример с Raw Kafka Producer / Consumer, использующим этот встроенный кластер Kafka, встроенный Kafka можно использовать для получения свойств, требуемых для Kafka Producer / Consumer:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send( new ProducerRecord<>( "messages" , 0 , 0 , "message0" )).get(); producer.send( new ProducerRecord<>( "messages" , 0 , 1 , "message1" )).get(); producer.send( new ProducerRecord<>( "messages" , 1 , 2 , "message2" )).get(); producer.send( new ProducerRecord<>( "messages" , 1 , 3 , "message3" )).get(); Map<String, Object> consumerProps = KafkaTestUtils.consumerProps( "sampleRawConsumer" , "false" , embeddedKafka); consumerProps.put( "auto.offset.reset" , "earliest" ); final CountDownLatch latch = new CountDownLatch( 4 ); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> { KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps); kafkaConsumer.subscribe(Collections.singletonList( "messages" )); try { while ( true ) { ConsumerRecords<Integer, String> records = kafkaConsumer.poll( 100 ); for (ConsumerRecord<Integer, String> record : records) { LOGGER.info( "consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}" , record.topic(), record.partition(), record.offset(), record.key(), record.value()); latch.countDown(); } } } finally { kafkaConsumer.close(); } }); assertThat(latch.await( 90 , TimeUnit.SECONDS)).isTrue(); |
Более подробный тест доступен здесь
Ссылка: | Использование Kafka с Junit от нашего партнера JCG Биджу Кунджуммен в блоге all and sundry. |