Это вторая статья в серии Spring Cloud Stream и Kafka . В этом посте рассказывается о различных способах тестирования приложений Spring Boot с использованием EmbeddedKafka и Awaitility.
При тестировании любого синхронного приложения все дело в «звони и жди». Мы вызываем определенный API или конечную точку и ждем ответа. Тест блокирует основной поток выполнения до тех пор, пока API не вернет ответ. После завершения обработки мы получаем ответ и можем сравнить результат с ожидаемым результатом.
Вам также может понравиться:
Кафка с Spring Cloud Stream
Асинхронные приложения тестируются по-разному по сравнению с синхронными или блокирующими приложениями, т.е. нам не нужно блокировать основной поток выполнения. Проще говоря, он не будет ждать ответа от API, и нам нужно вручную запрограммировать тест для удержания выполнения в определенной точке и ожидания результатов от всех неблокирующих операций. На этом этапе мы можем написать утверждения.
Трудно управлять различными потоками и проблемами параллелизма и написать краткий, читаемый модульный тест.
Существует несколько способов написания тестов для микро-сервисов Spring Boot — Spring Cloud Stream для подключения к Kafka.
Давайте рассмотрим простой вариант использования для этой цели.
Пример использования
Существует компонент-производитель, который будет отправлять сообщения в тему Kafka.
Джава
1
package com.techwording.scs;
2
import org.springframework.cloud.stream.annotation.EnableBinding;
4
import org.springframework.cloud.stream.messaging.Source;
5
Source.class) (
7
public class Producer {
8
private Source mySource;
10
public Producer(Source mySource) {
12
super();
14
this.mySource = mySource;
15
}
16
public Source getMysource() {
18
return mySource;
20
}
21
public void setMysource(Source mysource) {
23
mySource = mysource;
25
}
26
}
Потребительский компонент будет слушать тему Kafka и получать сообщения.
Джава
xxxxxxxxxx
1
Sink.class) (
2
public class Consumer {
3
private String receivedMessage;
5
(target = Sink.INPUT)
7
public void consume(String message) {
8
receivedMessage = message;
10
latch.countDown();
11
}
13
public String getReceivedMessage() {
15
return receivedMessage;
17
}
18
}
Создан Kafka брокер с темой. Для этого теста мы будем использовать сервер Embedded Kafka с spring-kafka-test
.
Джава
xxxxxxxxxx
1
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
EmbeddedKafkaRule
Spring-kafka-test
предоставляет встроенный брокер Kafka. Мы можем использовать @ClassRule
аннотацию JUnit для создания этого брокера Kafka. Это правило запускает серверы Kafka и Zookeeper на произвольном порту перед выполнением тестов и отключает их после завершения тестов. Встроенный брокер Kafka устраняет необходимость в работе реального экземпляра Kafka и Zookeeper при выполнении теста.
Возвращаясь к тестам, я реализовал этот тест двумя способами, используя Awaitility и защелку обратного отсчета.
Тест с использованием Awaitility
здесь . Ниже приведена реализация теста с использованием Awaitility.
Это библиотека DSL, предоставляющая функции для написания тестов JUnit для асинхронного Java-приложения. Вы можете проверить их официальную страницу GitHubДжава
xxxxxxxxxx
1
package com.techwording.scs;
2
import java.util.concurrent.TimeUnit;
4
import org.junit.BeforeClass;
6
import org.junit.ClassRule;
7
import org.junit.Test;
8
import org.junit.runner.RunWith;
9
import org.springframework.beans.factory.annotation.Autowired;
10
import org.springframework.boot.autoconfigure.SpringBootApplication;
11
import org.springframework.boot.test.context.SpringBootTest;
12
import org.springframework.cloud.stream.annotation.EnableBinding;
13
import org.springframework.cloud.stream.annotation.StreamListener;
14
import org.springframework.cloud.stream.messaging.Sink;
15
import org.springframework.cloud.stream.messaging.Source;
16
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
17
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
18
import org.springframework.messaging.support.MessageBuilder;
19
import org.springframework.test.context.junit4.SpringRunner;
20
import static org.assertj.core.api.BDDAssertions.then;
22
import static org.awaitility.Awaitility.waitAtMost;
24
SpringRunner.class) (
26
classes = { EmbeddedKafkaAwaitilityTest.App.class, EmbeddedKafkaAwaitilityTest.Producer.class, EmbeddedKafkaAwaitilityTest.Consumer.class }) (
27
Source.class) (
28
public class EmbeddedKafkaAwaitilityTest {
29
(exclude = TestSupportBinderAutoConfiguration.class)
31
static class App {
32
}
34
private static final String TOPIC1 = "test-topic-1";
36
38
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
39
41
public static void setup() {
42
System.setProperty("spring.cloud.stream.kafka.binder.brokers",
44
embeddedKafka.getEmbeddedKafka()
45
.getBrokersAsString());
46
System.setProperty("spring.cloud.stream.bindings.input.destination", TOPIC1);
47
System.setProperty("spring.cloud.stream.bindings.input.content-type", "text/plain");
48
System.setProperty("spring.cloud.stream.bindings.input.group", "input-group-1");
49
System.setProperty("spring.cloud.stream.bindings.output.destination", TOPIC1);
50
System.setProperty("spring.cloud.stream.bindings.output.content-type", "text/plain");
51
System.setProperty("spring.cloud.stream.bindings.output.group", "output-group-1");
52
}
54
56
private Producer producer;
57
59
private Consumer consumer;
60
62
public void testMessageSendReceive_Awaitility() {
63
producer.getMysource()
65
.output()
66
.send(MessageBuilder.withPayload("payload")
67
.setHeader("type", "string")
68
.build());
69
waitAtMost(5, TimeUnit.SECONDS)
71
.untilAsserted(() -> {
72
then("payload").isEqualTo(
73
EmbeddedKafkaAwaitilityTest.this.consumer.getReceivedMessage());
74
});
75
}
76
}
Тест с использованием CountDownLatch
Согласно документации Java, CountDownLatch
это средство, которое позволяет одному или нескольким потокам ждать завершения набора операций, выполняемых в других потоках. Чтобы написать этот тест с помощью CountDownLatch
, мы сначала инициализируем защелку счетчиком.
Значение этого счетчика зависит от количества задач, которые наш тест должен ждать. Здесь мы инициализируем этот счетчик счетчиком 1. После того, как производитель отправил сообщение, защелка ожидает, пока счетчик достигнет 0. Потребитель несет ответственность за уменьшение количества. Следовательно, когда потребитель закончил со своей частью, основной поток возобновляет и выполняет утверждение.
Ниже приведена реализация теста с использованием CountDownLatch
:
Джава
xxxxxxxxxx
1
package com.techwording.scs;
2
import java.util.concurrent.CountDownLatch;
4
import org.junit.BeforeClass;
6
import org.junit.ClassRule;
7
import org.junit.Test;
8
import org.junit.runner.RunWith;
9
import org.springframework.beans.factory.annotation.Autowired;
10
import org.springframework.boot.autoconfigure.SpringBootApplication;
11
import org.springframework.boot.test.context.SpringBootTest;
12
import org.springframework.cloud.stream.annotation.EnableBinding;
13
import org.springframework.cloud.stream.annotation.StreamListener;
14
import org.springframework.cloud.stream.messaging.Sink;
15
import org.springframework.cloud.stream.messaging.Source;
16
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
17
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
18
import org.springframework.messaging.support.MessageBuilder;
19
import org.springframework.test.context.junit4.SpringRunner;
20
import static org.assertj.core.api.Assertions.assertThat;
22
SpringRunner.class) (
24
classes = { EmbeddedKafkaLatchTest.App.class, EmbeddedKafkaLatchTest.Producer.class, EmbeddedKafkaLatchTest.Consumer.class }) (
25
Source.class) (
26
public class EmbeddedKafkaLatchTest {
27
(exclude = TestSupportBinderAutoConfiguration.class)
29
static class App {
30
}
32
private static final String TOPIC1 = "test-topic-1";
34
36
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TOPIC1);
37
private static CountDownLatch latch = new CountDownLatch(1);
39
41
public static void setup() {
42
System.setProperty("spring.cloud.stream.kafka.binder.brokers",
44
embeddedKafka.getEmbeddedKafka()
45
.getBrokersAsString());
46
System.setProperty("spring.cloud.stream.bindings.input.destination", TOPIC1);
47
System.setProperty("spring.cloud.stream.bindings.input.content-type", "text/plain");
48
System.setProperty("spring.cloud.stream.bindings.input.group", "input-group-1");
49
System.setProperty("spring.cloud.stream.bindings.output.destination", TOPIC1);
50
System.setProperty("spring.cloud.stream.bindings.output.content-type", "text/plain");
51
System.setProperty("spring.cloud.stream.bindings.output.group", "output-group-1");
52
}
54
56
private Producer producer;
57
59
private Consumer consumer;
60
62
public void testMessageSendReceive() throws InterruptedException {
63
producer.getMysource()
65
.output()
66
.send(MessageBuilder.withPayload("payload")
67
.setHeader("type", "string")
68
.build());
69
latch.await();
71
assertThat(consumer.getReceivedMessage()).isEqualTo("payload");
72
}
73
}
Вы можете найти полный исходный код здесь .
Дальнейшее чтение
Кафка с весенним облачным потоком
5 аннотаций Spring Cloud, которые должны знать Java-программисты