Статьи

Интерпретация точно-когда-то семантики Кафки

До недавнего времени большинство организаций изо всех сил пытались достичь святого Грааля доставки сообщений, точно семантической доставки. Несмотря на то, что это была встроенная функция начиная с Apache Kafkas 0.11, люди все еще не спешат пользоваться этой функцией. Давайте потратим немного времени на понимание семантики точно-единожды. Что в этом большого и как Кафка решает проблему?

Apache Kafka предлагает следующие гарантии доставки. Давайте разберемся, что это на самом деле означает:

  • Максимальная однократная доставка.  Это гарантирует, что конкретное сообщение может быть доставлено один раз или вообще не доставлено. Может быть потеря сообщений, но сообщение не может быть доставлено более одного раза.

  • По крайней мере, однажды доставка : Это гарантирует, что конкретное сообщение всегда будет доставлено. Он может быть доставлен несколько раз, но сообщения никогда не будут потеряны.

  • Точно одна доставка : это гарантирует, что все сообщения будут доставлены только один раз. Точно один раз не означает, что не будет ни сбоев, ни повторных попыток. Это неизбежно. Важно то, что попытки повторяются. Другими словами, результат должен быть одинаковым независимо от того, был ли он успешно обработан ровно один раз или нет.

Почему именно один раз важен

Существуют определенные варианты использования (например, финансовые приложения, приложения IoT и другие потоковые приложения), которые не могут позволить себе ничего меньше, чем один раз. Вы не можете позволить себе иметь дубликаты или потерять сообщения при пополнении или снятии денег с банковского счета. Это нужно ровно один раз в качестве окончательного результата.

Почему это трудно достичь

Предполагая, что у вас есть небольшое потоковое приложение Kafka с несколькими входными разделами и несколькими выходными разделами. Цель и ожидание приложения — получать данные из входных разделов, обрабатывать данные и записывать их в выходные разделы. Это то, где каждый хочет достичь точно один раз в качестве гарантии. Существуют сценарии из-за сбоев в работе сети, сбоев системы и других ошибок, когда в процессе появляются дубликаты.

Проблема 1: Дублирование или множественные записи

Обратитесь к рисунку 1а. Сообщение m1 обрабатывается и записывается в тему B. Сообщение m1 успешно записывается в тему B (как m1 ‘ ), но подтверждение не принимается. Причиной может быть, скажем, задержка сети, и это в конечном итоге истекает.

Рисунок 1a: Дублирующая проблема записи.

Рисунок 1b: Дублирующая проблема записи из-за повторной попытки.

Поскольку приложение не знает, что сообщение уже успешно записано, так как оно никогда не получало подтверждение, оно повторяет попытку и приводит к повторной записи. Обратитесь к рисунку 1b. Сообщение m1 ‘ переписывается в тему B. Это дублирующая проблема записи, которую необходимо исправить.

Проблема 2: перечитать входную запись

Рисунок 2a: Перечитайте проблему из-за сбоя приложения.

Рисунок 2b: Перечитайте проблему при перезапуске аварийного приложения.

Обратитесь к рисунку 2а. У нас тот же сценарий, что и выше, но в этом случае потоковое приложение вылетает непосредственно перед фиксацией смещения. Поскольку смещение не зафиксировано, когда потоковое приложение снова появляется, оно перечитывает сообщение m1 и снова обрабатывает данные (рисунок 2b). Это снова приводит к дублированию записей сообщения m1 в теме B.

Как Apache Kafka помогает

Apache Kafka решает описанные выше проблемы с помощью семантики с точным однократным использованием, используя следующее.

Идемпотент Продюсер

Идемпотентность на стороне производителя может быть достигнута путем предотвращения многократной обработки сообщений. Это достигается сохранением сообщения только один раз. При включенной идемпотентности каждое сообщение Кафки получает две вещи: идентификатор производителя (PID) и порядковый номер (seq). Назначение PID полностью прозрачно для пользователей и никогда не раскрывается клиентами.

Рисунок 3: Идемпотентный производитель

producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "100");

В случае сбоя посредника или сбоя клиента при повторной отправке сообщения в теме будут приниматься только сообщения с новым уникальным порядковым номером и идентификатором производителя. Брокер автоматически дедуплицирует любые сообщения, отправленные этим производителем, обеспечивая идемпотентность. Никаких дополнительных изменений кода не требуется.

Транзакции через разделы

Чтобы гарантировать, что каждое сообщение обрабатывается ровно один раз, могут использоваться транзакции. Транзакции имеют подход «все или ничего». Они гарантируют, что после выбора сообщения оно может быть преобразовано и атомарно записано в несколько тем / разделов вместе со смещением использованного сообщения.

Фрагмент кода для атомарных транзакций

producer.initTransactions();
try {

producer.beginTxn(); 
 // ... read from input topic
 // ... transform

producer.send(rec1); // topic A 
producer.send(rec2); // topic B 
producer.send(rec3); // topic C 

producer.sendOffsetsToTxn(offsetsToCommit, “group-id”);

  producer.commitTransaction();
} catch ( Exception e ) {
  producer.abortTransaction();
}

В Apache Kafka v0.11 представлены два компонента — координатор транзакций и журнал транзакций, которые поддерживают состояние атомарных записей.

На приведенной ниже диаграмме показан высокоуровневый поток событий, который включает атомарные транзакции в различных разделах:

Рисунок 4: Транзакции через раздел.

  1. initTransactions() регистрируется  transactional.id с координатором.
  2. Координатор увеличивает время PID, так что предыдущий экземпляр этого PID считается зомби и отгорожен. Никакие записи в будущем не принимаются от этих зомби.
  3. Производитель добавляет раздел с координатором, когда производитель собирается отправить данные в раздел.
  4. Координатор транзакций сохраняет состояние каждой транзакции, которой он владеет, в памяти, а также записывает это состояние в журнал транзакций (в данном случае, информацию о разделе).
  5. Производитель отправляет данные фактическим разделам.
  6. Производитель инициирует транзакцию фиксации, и в результате координатор запускает протокол двухфазной фиксации.
  7. Здесь начинается первая фаза, и координатор обновляет журнал транзакций до «prepare_commit».
  8. Затем координатор начинает этап 2, где записывает маркеры принятия транзакции в разделы раздела, которые являются частью транзакции.
  9. После записи маркеров координатор транзакции помечает транзакцию как «совершенную».

Транзакционный потребитель

Если потребитель является транзакционным, мы должны использовать уровень изоляции  read_committed. Это гарантирует, что он читает только зафиксированные данные.

Значением по умолчанию  isolation.level является  read_uncommitted.

Это просто общее представление о том, как транзакции работают в Apache Kafka. Я бы порекомендовал изучить документы, если вы заинтересованы в более глубоком погружении.

Заключение

В этом посте мы говорили о различных семантиках гарантии доставки, таких как, по крайней мере, один раз, самое большее один раз и ровно один раз. Мы также поговорили о том, почему «точно один раз» важен, о проблемах на пути достижения «точно один раз» и о том, как Kafka поддерживает это «из коробки» с простой конфигурацией и минимальным кодированием.

Ссылки

  1. Транзакция в Apache Kafka

  2. Включение-точно-Кафка-потоки