Статьи

Реализация шаблона исходящих

Глядя за пределы коробки.


Вам также может понравиться:
Шаблоны проектирования для микросервисов

Постановка проблемы

Микросервисы часто публикуют события после выполнения транзакции базы данных. Запись в базу данных и публикация события — это две разные транзакции, и они должны быть атомарными. Отказ опубликовать событие может означать критический отказ бизнес-процесса.

Чтобы лучше объяснить формулировку проблемы, давайте рассмотрим микросервис Студента, который помогает зачислить студента. После зачисления служба «Каталог курсов» отправляет студенту по электронной почте все доступные курсы. Предполагая приложение , управляемое событиями , микросервис Студента регистрирует студента, вставляя запись в базу данных, и публикует событие, сообщающее, что регистрация для студента завершена. Сервис «Каталог курсов» прослушивает это событие и выполняет его действия. В случае сбоя, если микросервис Студента выходит из строя после вставки записи, система остается в несогласованном состоянии.

Изображение 1. Ошибка при публикации события после обновления / вставки базы данных.


Шаблон OutBox

Этот шаблон обеспечивает эффективное решение для надежной публикации событий. Идея этого подхода заключается в том, чтобы иметь таблицу «Исходящие» в базе данных сервиса. При получении запроса на зачисление выполняется не только вставка в таблицу Student, но и запись, представляющая событие, также вставляется в таблицу Outbox. Два действия базы данных выполняются как часть одной транзакции.

Асинхронный процесс отслеживает в таблице исходящих сообщений новые записи и, если они есть, публикует события в шине событий. Шаблон просто разделяет две транзакции на разные сервисы, повышая надежность.

Рисунок 2. Две отдельные транзакции с использованием шаблона исходящих.

Описание этого шаблона можно найти на отличном сайте microservices.io Криса Ричардсона . Как описано на сайте, существует два подхода к реализации шаблона «Исходящие» ( отслеживание журнала транзакций и издатель опроса ). В решении ниже мы будем использовать подход с хвостами.

Хвост журналирования транзакций может быть реализован очень элегантно и эффективно с использованием Change Data Capture (CDC) с Debezium и Kafka-Connect .

Шаблон исходящих сообщений с Kafka Connect

Дизайн решения

Микросервис Student предоставляет конечные точки для выполнения операций базы данных в домене. Микросервис использует базу данных Postgres, в которой находятся две таблицы «Студент» и «Исходящие». Транзакционные операции, изменить / вставить в таблицу «Студент» и добавить запись в таблицу «Исходящие».

Платформа Kafka-Connect работает как отдельный сервис помимо брокера Kafka. Разъем Debezium для Postgres развертывается во время выполнения Kafka-Connect, чтобы фиксировать изменения в базе данных. В нашем примере в Kafka-Connect также развернут пользовательский соединитель, чтобы помочь определить правильные темы Kafka для события.

Соединитель Debezium привязывает журналы транзакций базы данных (журнал предварительной записи) из таблицы «Исходящие» и публикует событие в разделах, определенных настраиваемым соединителем.

Изображение 3. Дизайн решения

Это решение гарантирует доставку как минимум один раз , поскольку службы Kafka Connect гарантируют, что каждый соединитель всегда работает; однако есть вероятность, что решение может публиковать одно и то же событие несколько раз между разомкнутыми и запускающимися соединителями. Чтобы обеспечить точную однократную доставку , клиент-клиент должен быть Idempotent , чтобы повторные события не обрабатывались снова.

Понимание кода

Вы можете найти код здесь . Я бы посоветовал вам прочитать историю — поскольку я рассмотрел некоторые ключевые детали реализации и ограничения этого шаблона.

Студенческий Микросервис

Это простой микросервис Spring-Boot, который предоставляет три конечных точки через контроллер REST и использует Spring-JPA для действий с базой данных. Представленные конечные точки — это GET для получения информации о студенте, POST для создания или регистрации студента и PUT для обновления адреса электронной почты студента. POST и PUT генерируют события «Студент зачислен» и «Студенческая электронная почта изменена». Изменение для вызова действий базы данных и вставки события обрабатывается в классе Service.

@Transactional
    public StudentDTO enrollStudent(EnrollStudentDTO student) 
      throws Exception {
        log.info("Enroll Student details for StudentId: {}", 
                 student.getName());

        StudentEntity studentEntity = StudentMapper.
          INSTANCE.studentDTOToEntity(student);
        studentRepository.save(studentEntity);

        //Publish the event
        event.fire(EventUtils.createEnrollEvent(studentEntity));

        return StudentMapper.INSTANCE.studentEntityToDTO(studentEntity);
    }

    ...

    public static OutboxEvent createEnrollEvent(StudentEntity studentEntity) 
    {
      ObjectMapper mapper = new ObjectMapper();
      JsonNode jsonNode = mapper.convertValue(studentEntity, JsonNode.class);

      return new OutboxEvent(
              studentEntity.getStudentId(),
              "STUDENT_ENROLLED",
              jsonNode
      );
    }

Метод нуждается в Transactional аннотации, чтобы действие базы данных и запись события были связаны одной транзакцией.  enrollStudent() Создает новую запись в таблице Стьюдента , а затем запускает событие , используя Spring в ApplicationEventPublisherAwareподдержку. Этот метод  createEnrollEvent() помогает создавать данные для вставки в OutBox. Вставка события в таблицу «Исходящие» обрабатывается в  EventService классе, который использует репозиторий Spring-JPA для обработки взаимодействий с базой данных.

@EventListener
    public void handleOutboxEvent(OutboxEvent event) {

        UUID uuid = UUID.randomUUID();
        OutBoxEntity entity = new OutBoxEntity(
                uuid,
                event.getAggregateId(),
                event.getEventType(),
                event.getPayload().toString(),
                new Date()
        );

        log.info("Handling event : {}.", entity);

        outBoxRepository.save(entity);

        /*
         * Delete the event once written, so that the outbox doesn't grow.
         * The CDC eventing polls the database log entry and not the table in the database.
         */
        outBoxRepository.delete(entity);
    }

Ключевым моментом, на который следует обратить внимание, является то, что код удаляет запись из таблицы «Исходящие» после ее записи, чтобы таблица исходящих не увеличивалась. Кроме того, Debezium не проверяет фактическое содержимое таблицы базы данных, а вместо этого привязывает журнал транзакций с опережением записи. Вызовы  save() и delete()сделают запись CREATEи DELETEв журнале, как только транзакция будет совершена. Пользовательский преобразователь Kafka-Connect можно запрограммировать так, чтобы он не выполнял никаких действий над DELETEвходом.

Custom Debezium Transformer

Этот компонент определяет тему Kafka, для которой необходимо опубликовать событие. Это делается с помощью столбца EVENT_TYPE полезной нагрузки из таблицы «Исходящие». Компонент построен как JAR и будет помещен во время выполнения Kafka-Connect. Настройка размещения JAR во время выполнения Kafka-Connect выполняется DockerFile.

FROM debezium/connect
ENV DEBEZIUM_DIR=$KAFKA_CONNECT_PLUGINS_DIR/debezium-transformer

RUN mkdir $DEBEZIUM_DIR
COPY target/custom-debezium-transformer-0.0.1.jar $DEBEZIUM_DIR

Мы используем изображение debezium/connect, так как оно поставляется со всеми доступными разъемами. Для установки конкретного разъема вы можете обратиться к документации здесь . Компонент состоит только из одного класса, который помогает определить тему до публикации сообщения.

public class CustomTransformation<R extends ConnectRecord<R>> implements Transformation<R> {

    /**
     * This method is invoked when a change is made on the outbox schema.
     *
     * @param sourceRecord
     * @return
     */
    public R apply(R sourceRecord) {

        Struct kStruct = (Struct) sourceRecord.value();
        String databaseOperation = kStruct.getString("op");

        //Handle only the Create's
        if ("c".equalsIgnoreCase(databaseOperation)) {

            // Get the details.
            Struct after = (Struct) kStruct.get("after");
            String UUID = after.getString("uuid");
            String payload = after.getString("payload");
            String eventType = after.getString("event_type").toLowerCase();
            String topic = eventType.toLowerCase();

            Headers headers = sourceRecord.headers();
            headers.addString("eventId", UUID);

            // Build the event to be published.
            sourceRecord = sourceRecord.newRecord(topic, null, Schema.STRING_SCHEMA, UUID,
                    null, payload, sourceRecord.timestamp(), headers);
        }

        return sourceRecord;
    }

Трансформатор расширяет Кафка-Connect Transformation класс. apply()Метод, фильтрует CREATEоперацию ( «C») пропуская DELETE, как объяснено выше.

Для каждого CREATEидентифицируется название темы и возвращается полезная нагрузка. Для простоты в этом примере имя темы — это строчное значение столбца EVENT_TYPE, вставленного в таблицу «Исходящие» студенческим микросервисом.

Установка необходимых изображений и рамок

Руководство предполагает, что пользователь предварительно установил докер. Вы можете следовать инструкциям по установке здесь . Создание образа Debezium Connect выполняется путем запуска сборки maven в проекте custom-debezium-connect и построения образа докера.

mvn clean install
docker build -t custom-debezium-connect .

При запуске Docker Compose в папке проекта устанавливаются все необходимые компоненты: Zookeeper, Kafka, Postgres и Kafka-Connect. Файл Docker Compose:

version: "3.5"

services:
  # Install postgres and setup the student database.
  postgres:
    container_name: postgres
    image: debezium/postgres
    ports:
      - 5432:5432
    environment:
      - POSTGRES_DB=studentdb
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password

  # Install zookeeper. 
  zookeeper:
    container_name: zookeeper
    image: zookeeper
    ports:
      - 2181:2181

  # Install kafka and create needed topics. 
  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka
    hostname: kafka
    ports:
      - 9092:9092
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:29092
      LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CREATE_TOPICS: "student_email_changed:1:1,student_enrolled:1:1"
    depends_on:
      - zookeeper

  # Install debezium-connect. 
  debezium-connect:
    container_name: custom-debezium-connect
    image: custom-debezium-connect
    hostname: debezium-connect 
    ports:
      - '8083:8083'
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: debezium_connect_config
      OFFSET_STORAGE_TOPIC: debezium_connect_offsets
      STATUS_STORAGE_TOPIC: debezium_connect_status
      BOOTSTRAP_SERVERS: kafka:29092
    depends_on:
      - kafka
      - postgres

Мы используем изображение debezium/postgres, потому что оно поставляется с функцией логического декодирования . Это механизм, который позволяет извлекать изменения, которые были зафиксированы в журнале транзакций, что делает возможным CDC. Документацию по установке плагина в Postgres можно найти здесь .

Настройка тем Кафки

Выполните следующие команды, чтобы создать две темы Kafka: « student_enrolled » и « student_email_changed »

docker exec -t kafka /usr/bin/kafka-topics \
      --create --bootstrap-server :9092 \
      --topic student_email_changed \
      --partitions 1 \
      --replication-factor 1

docker exec -t kafka /usr/bin/kafka-topics \
      --create --bootstrap-server :9092 \
      --topic student_enrolled \
      --partitions 1 \
      --replication-factor 1

Связывание Debezium Kafka Connect с исходящей таблицей

Выполните приведенную ниже команду curl, чтобы создать соединитель на сервере Kafka-Connect. Этот соединитель указывает на установку Postgres, а также указывает таблицу и пользовательский класс преобразователя, который мы создали ранее.

curl -X POST \
  http://localhost:8083/connectors/ \
  -H 'content-type: application/json' \
  -d '{
   "name": "student-outbox-connector",
   "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max": "1",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "user",
      "database.password": "password",
      "database.dbname": "studentdb",
      "database.server.name": "pg-outbox-server",
      "tombstones.on.delete": "false",
      "table.whitelist": "public.outbox",
      "transforms": "outbox",
      "transforms.outbox.type": "com.sohan.transform.CustomTransformation"
   }
}'

На этом настройка завершена, и у нас Zookeeper работает на порту 2181, Kafka работает на порту 9092 со всеми необходимыми темами, Postgres работает на порту 5432 с предварительно созданным «StudentDB» и, наконец, Kafka-Connect с Debezium и нашими Трансформатор работает на порту 8083.

Запуск решения

После запуска Студенческого микросервиса мы можем увидеть образец в действии. Чтобы смоделировать зачисление студентов, мы можем выполнить приведенный ниже скручивание.

curl -X POST \
  'http://localhost:8080/students/~/enroll' \
  -H 'content-type: application/json' \
  -d '{ 
    "name": "Megan Clark",
    "email": "mclark@gmail.com",
    "address": "Toronto, ON"
}'

Мы видим, что новая запись студента вставлена ​​в базу данных для «Меган Кларк».

Рисунок 4. Зачисленный студент вставлен в базу данных

И мы видим событие, опубликованное в теме student_enrolled, уведомляющее нижестоящие системы, что «Меган Кларк» зарегистрировался.

Рисунок 5. Консольный потребитель для проверки данных, публикуемых в Kafka

Чтобы смоделировать студента, обновляющего адрес электронной почты, мы можем выполнить следующую операцию curl.

$ curl -X PUT \  http://localhost:8080/students/1/update-email \  
-H 'content-type: application/json' \  -d '{    "email": "jsmith@gmail.com"}'

Мы можем заметить, что электронная почта была изменена на «jsmith@gmail.com»

Рисунок 6. Электронная почта студента изменена в базе данных

И мы видим событие, опубликованное в теме student_email_changed, уведомляющее нижестоящие системы о том, что Студент с идентификатором студента ‘1’ изменил свой идентификатор электронной почты.

Рисунок 6. Консольный потребитель для проверки данных, публикуемых в Kafka

Если мы прокомментируем строку кода, которая удаляет исходящие события после записи их в EventService ( outBoxRepository.delete(entity)), мы можем просмотреть события, вставленные в исходящую таблицу.

Изображение 7. События в таблице OutBox.

Резюме

В микросервисной архитектуре сбой системы неизбежен. Адаптация этого архитектурного стиля заставляет нас проектировать для сбоев. Шаблон исходящих событий дает нам надежный метод надежного обмена сообщениями перед лицом отказа.

Вышеупомянутое решение делает реализацию шаблона простой. Но для обеспечения высокой доступности системы мы должны запустить несколько экземпляров (кластеров) Zookeeper, Apache Kafka и Kafka Connect.

Наконец, я хотел бы отметить, что это не единственный способ решения проблемы надежного обмена сообщениями. Но это бесценный образец в вашем распоряжении.

Дальнейшее чтение

Кафка Технический обзор

Введение в Apache Kafka

Пример Kafka Producer и Consumer с использованием Java