Статьи

Быстрый и практический пример тестирования Кафки

1. Введение

В этом руководстве мы быстро рассмотрим некоторые базовые и высокоуровневые подходы для тестирования микросервисных приложений, созданных с использованием Kafka. Также мы узнаем о преимуществах декларативного способа тестирования приложений Kafka по сравнению с традиционным / существующим способом тестирования.

Для всего объясненного здесь, мы можем найти примеры кода в разделе «Заключение» этого поста

Для краткости учебника мы продемонстрируем только следующие аспекты.

  1. Тестирование продюсеров
  2. Тестирование потребителей
  3. Проведение тестов как производителя, так и потребителя
  4. Создание RAW-записей и JSON-записей
  5. Использование RAW-записей и JSON-записей
  6. Традиционные испытания.
  7. Преимущества декларативного тестирования стиля ( документ IEEE )
  8. Объединение тестирования REST API с тестированием Kafka
  9. Spinning Up Kafka в Docker — один узел и несколько узлов

Я настоятельно рекомендую прочитать статью « Минимум, что нам нужно знать для тестирования Кафки»,  прежде чем приступить к изучению этого урока.

Более подробную информацию о потоках Kafka и о том, как разработать потоковое приложение, можно найти в  руководстве по разработке потоковых приложений от Confluent.

2. Испытания Кафки

Сложная часть — это некоторая часть логики приложения или процедура БД, которая продолжает создавать записи по теме, а другая часть приложения продолжает потреблять записи и непрерывно обрабатывает их на основе бизнес-правил.

Записи, разделы, смещения, сценарии исключений и т. Д. Постоянно меняются, что затрудняет понимание того, что тестировать, когда тестировать и как тестировать.

Данные трубы-линия случайная            

3. Подход к тестируемому решению

Мы можем пойти на сквозной подход к тестированию, который будет проверять как создание, потребление и записи DLQ, так и логику обработки приложения. Это даст нам уверенность в выпуске нашего приложения для более высоких сред.

Мы можем сделать это, запустив Kafka в докеризованных контейнерах или направив наши тесты в любую интегрированную тестовую среду где-нибудь в нашем кластере Kubernetes-Kafka или в любую другую инфраструктуру микросервисов.  

Здесь мы выбираем функциональные возможности , производить нужные записи и проверки, потребляют намеченную записи и проверки, наряду с HTTP REST или SOAP API проверки , который помогает в сохранении наших тестов намного чище и менее шумные.

Название изображения           

4. Производитель Тестирование

Когда мы создаем запись по теме, мы можем проверить подтверждение от брокера Kafka. Это подтверждение в формате recordMetadata.

Например, визуализация «recordMetaData» как JSON будет выглядеть так:

Response from the broker after a successful "produce".

{
    "recordMetadata": {
        "offset": 0,
        "timestamp": 1547760760264,
        "serializedKeySize": 13,
        "serializedValueSize": 34,
        "topicPartition": {
            "hash": 749715182,
            "partition": 0,   //<--- To which partition the record landed
            "topic": "demo-topic"
        }
    }
}

5. Потребительские испытания

Когда мы читаем или употребляем тему, мы можем проверить записи, извлеченные из тем. Здесь мы также можем проверить / утвердить некоторые метаданные, но в большинстве случаев вам может потребоваться иметь дело только с записями (не с метаданными).

Например, могут быть случаи, когда мы проверяем только количество записей, т.е. только размер, а не фактические записи

Например, визуализация извлеченных «записей» в виде JSON будет выглядеть так:

Records fetched after a successful "consume".

{
    "records": [
        {
            "topic": "demo-topic",
            "key": "1547792460796",
            "value": "Hello World 1"
        },
        {
            // ...
        }
    ]
}

Полная запись (записи) с информацией метаданных выглядит так, как мы получили ниже, которую мы также можем проверить / подтвердить, если у нас есть для этого требование теста.

The fetched records with the metadata from the broker.

{
    "records": [
        {
            "topic": "demo-topic",
            "partition": 0,
            "offset": 3,
            "key": "1547792460796", //<---- Record key
            "value": "Hello World", //<---- Record value 
        }
    ]
}

6. Тестирование производителей и потребителей

В одном и том же сквозном тесте мы можем выполнить два шага, как показано ниже, для одной и той же записи:

  • Шаг 1:

    • Произведите в тему «Демо-тема» и подтвердите полученные recordMetadataот брокера.

      • Например, произвести запись с "key":"1234", "value":"Hello World"
  • Шаг 2:

    • Потребляйте из той же темы «демо-тему» ​​и проверяйте записи.

      • Утверждающие , что та же запись присутствует в ответе, то есть "key": "1234", "value": "Hello World".
      • Мы могли бы использовать более одной записи, если они были подготовлены к одной и той же теме, прежде чем мы начали потреблять.

7. Проблемы с традиционным стилем тестирования

Пункт 1

Во-первых, нет ничего плохого в традиционном стиле. Но у него есть крутая кривая обучения, с которой приходится иметь дело, когда дело доходит до брокеров Kafka.

Например, когда мы имеем дело с брокерами, нам необходимо тщательно ознакомиться с клиентскими API-интерфейсами Kafka, например, Key-SerDe, Value-SerDe, тайм-аутами, а также пулами записей, commitSyncs, recordTypes и т. Д., И многими другими уровень API.

Для функционального тестирования нам не нужно знать эти концепции на уровне API.

Пункт 2

Наш тестовый код тесно связан с клиентским API-кодом. Это означает, что мы вводим много проблем в поддержку тестовых наборов вместе с кодом тестовой среды.

8. Преимущества декларативного стиля тестирования

Чтобы провести сравнение, интересный способ работы docker-compose называется «Декларативный путь». Мы говорим инфраструктуре Docker Compose (в файле YAML) ускорять определенные действия в определенных портах, связывать определенные службы с другими службами и т. Д., И все это делает для нас среда. Мы можем проводить наши тесты также аналогичным декларативным способом, который мы увидим в следующих разделах.

Насколько это аккуратно? Подумайте, сколько будет хлопот, если бы нам пришлось писать скрипты кода / оболочки для тех же повторяющихся задач.

Пункт 1

В декларативном стиле мы можем полностью пропустить уровень API, который имеет дело с брокерами, и сосредоточиться только на тестовых сценариях. Но, тем не менее, у нас есть гибкость в использовании клиентских API-интерфейсов Kafka и добавлении в него наших собственных вкусов.

Пункт 2

Это способствует обнаружению большего количества дефектов, потому что мы не тратим время на написание кода, а тратим больше времени на написание тестов и покрытие большего количества бизнес-сценариев / поездок пользователей.

Как?

  • Здесь мы говорим тесту использовать тему Кафки, которая является нашей «конечной точкой» или «URL»

     т.е. "url": "kafka-topic: demo-topic" 

  • Далее мы говорим тесту использовать операцию «производить»

     т.е. "operation":"produce" 

  • Далее нам нужно отправить записи в полезную нагрузку запроса:

"request": {
    "records": [
        {
            "key": "KEY-1234",
            "value": "Hello World"
        }
    ]
}
  • Затем мы сообщаем тесту, что ожидаем, что ответ «status» будет возвращен как «Ok», и некоторые метаданные записи от посредника, то есть ненулевое значение. Это часть «утверждений» нашего теста.

"assertions": {
    "status" : "Ok",
    "recordMetadata" : "$NOT.NULL"
}
  • Примечание: мы можем даже утверждать все recordMetadata сразу, что мы увидим в следующих разделах. А пока давайте будем простыми и продолжим.

  • Как только мы закончим, наш полный тест будет выглядеть следующим образом:

{
    "name": "produce_a_record",
    "url": "kafka-topic:demo-topic",
    "operation": "produce",
    "request": {
        "recordType" : "RAW",
        "records": [
            {
                "key": 101,
                "value": "Hello World"
            }
        ]
    },
    "assertions": {
        "status": "Ok",
        "recordMetadata": "$NOT.NULL"
    }
}

Вот и все. Мы закончили с тестовым набором и готовы к запуску.


Теперь, взглянув на приведенный выше тест, любой может легко понять, какой сценарий тестируется.

Обратите внимание, что:

  1. Мы устранили проблемы с кодированием, используя клиентский API для работы с брокерами Kafka.

  2. Мы устранили проблемы кодирования, связанные с утверждением каждого ключа / значения поля, путем обхода пути к их объектам, анализа запросов-полезных нагрузок, анализа ответов-полезных нагрузок и т. Д.

В то же время мы использовали функцию сравнения JSON платформы, чтобы сразу утверждать результат, поэтому тесты стали намного проще и чище.


Мы избежали двух основных неприятностей во время тестирования.

И порядок полей здесь не имеет значения. Приведенный ниже код также является правильным (порядок полей поменялся местами).

"assertions": {
        "recordMetadata": "$NOT.NULL"
        "status": "Ok",
}

9. Запуск одного теста с использованием JUnit

Это супер просто. Нам просто нужно указать наш @Test метод JUnit  на файл JSON. Это действительно так.

@TargetEnv("kafka_servers/kafka_test_server.properties")
@RunWith(ZeroCodeUnitRunner.class)
public class KafkaProduceTest {

    @Test
    @JsonTestCase("kafka/produce/test_kafka_produce.json")
    public void testProduce() throws Exception {
         // No code is needed here. What? 
         // Where are the 'assertions' gone ?
    }

}

В приведенном выше коде:

  • test_kafka_produce.json ‘ — это тестовый пример, содержащий этапы JSON, о которых мы говорили ранее.

  • kafka_test_server.properties содержит сведения о « брокере » и конфигурации производителя / потребителя.

  • « @RunWith (ZeroCodeUnitRunner.class) » — это пользовательский бегун JUnit для запуска теста.

Кроме того, мы можем использовать бегунок Suite или Package runner для запуска всего набора тестов.

Пожалуйста, посетите эти RAW и JSON примеры и объяснения.

10. Написание нашего первого продюсерского теста

В приведенном выше разделе мы узнали, как создать запись и подтвердить ответ / подтверждение брокера.

Но мы не должны останавливаться на достигнутом. Мы можем пойти дальше и попросить наш тест подтвердить  "recordMetadata" поле за полем, чтобы убедиться, что он был записан в правильный «раздел» правильной «темы» и многое другое, как показано ниже.

"assertions": {
    "status": "Ok",
    "recordMetadata": {
        "offset": 0,   //<--- This is the record 'offset' in the partition
        "topicPartition": {
            "partition": 0,   //<--- This is the partition number
            "topic": "demo-topic"  //<--- This is the topic name
        }
    }
}

Вот и все. В приведенном выше блоке «Утверждения» мы закончили сравнение ожидаемых и фактических значений.

Примечание: сравнения и утверждения выполняются мгновенно. Блок «утверждение» мгновенно сравнивается с фактическим «состоянием» и «recordMetadata», полученными от брокера Kafka. Порядок полей здесь не имеет значения. Тест не пройден, только если значения полей или структуры не совпадают.

11. Написание нашего первого потребительского теста

Аналогично, чтобы написать «потребительский» тест, нам нужно знать:

  • Название темы «демо-тема» наша «конечная точка» ака «URL»:  "url": "kafka-topic: demo-topic".

  • Операция, то есть «потреблять»:   "operation": "consume".

  • При использовании сообщения из темы, мы должны отправить, как показано ниже: "request": { }

Вышеупомянутый «запрос» означает ничего не делать, кроме как потреблять без выполнения «коммита».

Или мы можем упомянуть в нашем тесте, чтобы делать определенные вещи во время потребления или после потребления записей.

"request": {
    "consumerLocalConfigs": {
        "commitSync": true,
        "maxNoOfRetryPollsOrTimeouts": 3
    }
}
  • "commitSync": true: Здесь мы говорим тесту сделать `commitSync` после использования сообщения, это означает, что он не будет читать сообщение снова, когда вы` опросите ‘в следующий раз. Он будет читать только новые сообщения, если они появятся в теме.

  • "maxNoOfRetryPollsOrTimeouts": 3Здесь мы говорим тесту показывать опрос максимум три раза, а затем прекращать опрос. Если у нас есть больше записей, мы можем установить это большее значение. Значением по умолчанию является 1.

  • "pollingTime": 500:  Здесь мы говорим тест на опрос в течение 500 миллисекунд каждый раз опрашивает. Значение по умолчанию составляет 100 миллисекунд, если вы пропустите этот флаг.

Посетите эту страницу для  всех настраиваемых ключей — ConsumerLocalConfigs из исходного кода.

Посетите репозиторий с примерами HelloWorld Kafka, чтобы попробовать его дома.

Примечание. Эти значения конфигурации могут быть установлены в файле свойств глобально для всех тестов, что означает, что они будут применяться ко всем тестам в нашем тестовом пакете. Также мы можем переопределить любой из конфигов для конкретного теста или тестов внутри пакета. Следовательно, это дает нам гибкость для охвата всех видов тестовых сценариев.

Что ж, настройка этих свойств не имеет большого значения, и мы все равно должны сделать это, чтобы их экстернализовать. Следовательно, чем проще они поддерживаются, тем лучше для нас! Но мы должны понять, что происходит внутри них.

Мы обсудим это в следующих разделах.

12. Сочетание тестирования REST API с тестированием Kafka

Большую часть времени в архитектуре микросервисов мы создаем приложения, используя сервисы RESTful, сервисы SOAP (возможно, устаревшие) и Kafka.

Поэтому нам необходимо охватить все проверки контрактов API в наших сценариях сквозного тестирования, включая Kafka.

Но это не имеет большого значения, так как, в конце концов, здесь ничего не меняется, за исключением того, что мы просто указываем наш URL-адрес на конечную точку HTTP для нашей службы REST или SOAP, а затем соответствующим образом манипулируем блоком полезной нагрузки / утверждений. Это действительно так.

Пожалуйста, посетите раздел Объединение тестирования Kafka с тестированием REST API для полного пошагового подхода.

Если у нас есть сценарий использования:

Шаг 1: вызов Кафки — мы отправляем запись «Адрес» с идентификатором «id-lon-123» в «адрес-тему», которая в конечном итоге обрабатывается и записывается в базу данных «Адрес» (например, Postgres или Hadoop). Затем мы утверждаем подтверждение брокера.

Шаг 2: вызов REST — запрос (GET) REST API «Address» с помощью «/ api / v1 / address / id-lon-123» и подтверждение ответа.

Соответствующий тестовый пример выглядит ниже.

{
    "scenarioName": "Kafka and REST api validation example",
    "steps": [
        {
            "name": "produce_to_kafka",
            "url": "kafka-topic:people-address",
            "operation": "produce",
            "request": {
                "recordType" : "JSON",
                "records": [
                    {
                        "key": "id-lon-123",
                        "value": {
                            "id": "id-lon-123",
                            "postCode": "UK-BA9"
                        }
                    }
                ]
            },
            "assertions": {
                "status": "Ok",
                "recordMetadata" : "$NOT.NULL"
            }
        },
        {
            "name": "verify_updated_address",
            "url": "/api/v1/addresses/${$.produce_to_kafka.request.records[0].value.id}",
            "operation": "GET",
            "request": {
                "headers": {
                    "X-GOVT-API-KEY": "top-key-only-known-to-secu-cleared"
                }
            },
            "assertions": {
                "status": 200,
                "value": {
                    "id": "${$.produce_to_kafka.request.records[0].value.id}",
                    "postCode": "${$.produce_to_kafka.request.records[0].value.postcode}"
                }
            }
        }
    ]
}

Легко читать! Легко написать!

Поле повторно использовать значения через путь JSON вместо жесткого кодирования. Это отличная экономия времени!

13. Производство RAW Records против JSON Records

1. В случае с RAW, мы просто тихо говорим:

"recordType" : "RAW",

Затем наш тестовый пример выглядит следующим образом:

{
    "name": "produce_a_record",
    "url": "kafka-topic:demo-topic",
    "operation": "produce",
    "request": {
        "recordType" : "RAW",
        "records": [
            {
                "key": 101,
                "value": "Hello World"
            }
        ]
    },
    "assertions": {
        "status": "Ok",
        "recordMetadata": "$NOT.NULL"
    }
}

2. И для записи JSON мы упоминаем об этом точно так же:

"recordType" : "JSON"

И наш тестовый пример выглядит следующим образом:

{
    "name": "produce_a_record",
    "url": "kafka-topic:demo-topic",
    "operation": "produce",
    "request": {
        "recordType" : "JSON",
        "records": [
            {
                "key": 101,
                "value": { 
                    "name" : "Jey"
                }
            }
        ]
    },
    "assertions": {
        "status": "Ok",
        "recordMetadata": "$NOT.NULL"
    }
}

Примечание. На этот раз в разделе «value» есть запись JSON.

14. Кафка в докере

В идеале этот раздел должен был быть в начале. Но какой смысл просто запускать файл docker-compose, даже не зная его результатов? Мы можем найти его здесь, чтобы облегчить жизнь каждому!

Мы можем найти файлы docker-compose и пошаговые инструкции ниже.

  1. Каффка с одним узлом в докере
  2. Многоузловой кластер Kafka в Docker

15. Вывод

В этом уроке мы изучили некоторые фундаментальные аспекты тестирования Kafka декларативным способом. Также мы узнали, как легко мы можем тестировать микросервисы, включающие как Kafka, так и REST. 

Используя этот подход, мы протестировали и проверили кластеризованные конвейеры данных Kafka для Hadoop, а также  API-интерфейсы REST / SOAP Http, развернутые в оркестрированных модулях Kubernetes. Мы нашли этот подход очень прямолинейным и упрощенным, чтобы поддерживать и продвигать артефакты в более высокие среды.

Благодаря такому подходу мы смогли с полной ясностью охватить множество тестовых сценариев и обнаружить больше дефектов на ранних этапах цикла разработки даже без написания тестового кода. Это помогло нам создать и поддерживать наш пакет регрессии простым и чистым способом.

Полный исходный код этих примеров репозитория  GitHub (Try at Home) приведен ниже.

Чтобы запустить любой тест (ы), мы можем напрямую перейти к соответствующему JUnit @Testв ‘ src / test / java ‘. Нам нужно вызвать Docker с kafka перед тем, как нажимать на любые тесты Junit.

Используйте » kafka-schema-registry.yml  (см. Вики)», чтобы иметь возможность запускать все тесты.

Если вы нашли эту страницу полезной для тестирования API-интерфейсов Kafka и HTTP, пожалуйста, оставьте звездочку на  GitHub !

Удачного тестирования!