Статьи

Сбор данных изменений (CDC) со встроенным Debezium и Spring Boot

При работе с данными или репликации источников данных вы, вероятно, слышали термин « сбор данных изменений» (CDC) . Как следует из названия, «CDC» — это шаблон проектирования, который постоянно идентифицирует и фиксирует постепенные изменения данных. Этот шаблон используется для репликации данных в реальном времени через действующие базы данных в аналитические источники данных или для чтения реплик. Он также может использоваться для запуска событий на основе изменений данных, таких как шаблон OutBox .

Большинство современных баз данных поддерживают CDC через журналы транзакций . Журнал транзакций представляет собой последовательную запись всех изменений, внесенных в базу данных, в то время как фактические данные содержатся в отдельном файле.

В этом блоге я хотел сосредоточиться на использовании фреймворка, обычно используемого для CDC , и встраивании его в Spring Boot.

Вам также могут понравиться: Коннекторы Kafka Без Kafka .

Что такое дебезиум?

Debezium — это распределенная платформа, созданная для CDC. Он использует журналы транзакций базы данных и создает потоки событий при изменениях на уровне строк. Приложения, прослушивающие эти события, могут выполнять необходимые действия на основе постепенных изменений данных.

Debezium предоставляет библиотеку соединителей , поддерживающих различные базы данных, доступные сегодня. Эти соединители могут отслеживать и записывать изменения на уровне строк в схемах базы данных. Затем они публикуют изменения в потоковом сервисе, таком как Kafka .

Обычно один или несколько коннекторов развертываются в кластере Kafka Connect  и настроены на мониторинг баз данных и публикацию событий изменения данных в Kafka. Распределенный кластер Kafka Connect обеспечивает отказоустойчивость и масштабируемость, гарантируя, что все настроенные соединители всегда работают.

Что такое встраиваемый дебезиум?

Приложения, которым не требуется уровень отказоустойчивости и надежности, которые предлагает Kafka Connect, или которые хотят минимизировать затраты на их использование для работы всей платформы, могут использовать в приложении соединители Debezium. Это делается путем встраивания движка Debezium и настройки соединителя для работы в приложении. При событиях изменения данных соединители отправляют их непосредственно в приложение.

Дебезиум с пружинным ботинком

Для простоты примера давайте создадим приложение Spring Boot «Student CDC Relay», в котором будет запущен встроенный Debezium и настроены журналы транзакций базы данных Postgres, в которой находится таблица «Student». Коннектор Debezium, настроенный в приложении Spring Boot, вызывает метод в приложении, когда в таблице «Студент» выполняется операция базы данных, такая как «Вставить / Обновить / Удалить». Метод воздействует на эти события и синхронизирует данные в индексе Student в ElasticSearch.

Дизайн демонстрируемого примера

Код для образца можно найти
здесь .

Установка инструментов

Все необходимые инструменты могут быть установлены с помощью файла docker-compose ниже. Это запустит базу данных Postgres через порт  5432 и Elastic Search через порт 9200 (HTTP)  и  9300 (транспорт) .

version: "3.5"

services:
  # Install postgres and setup the student database.
  postgres:
    container_name: postgres
    image: debezium/postgres
    ports:
      - 5432:5432
    environment:
      - POSTGRES_DB=studentdb
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password

  # Install Elasticsearch.
  elasticsearch:
    container_name: elasticsearch
    image: docker.elastic.co/elasticsearch/elasticsearch:6.8.0
    environment:
    - discovery.type=single-node
    ports:
      - 9200:9200
      - 9300:9300

Мы используем изображение, debezium/postgresпотому что оно поставляется с функцией логического декодирования . Это механизм, который позволяет извлекать изменения, зафиксированные в журнале транзакций, что делает возможным CDC. Документацию по установке плагина в Postgres можно найти здесь .

Понимание кода

Первый шаг — определить зависимости Maven в pom.xmlfor debezium-embeddedи debezium-connector. Пример читает изменения из Postgres, поэтому мы используем соединитель Postgres.

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-postgres</artifactId>
    <version>${debezium.version}</version>
</dependency>

Затем мы настраиваем соединитель, который прослушивает изменения в таблице Student. Мы используем класс PostgresConnectorдля connector.classнастройки, которая предоставляется Debezium. Это имя класса Java для коннектора, который привязывает исходную базу данных.

Соединитель также принимает важный параметр,  offset.storageкоторый помогает приложению отслеживать объем обработки в журнале транзакций. Если во время обработки произойдет сбой приложения, оно может возобновить чтение изменений с того места, на котором произошла ошибка после перезапуска.

Существует несколько способов хранения смещений, но в этом примере мы используем класс FileOffsetBackingStoreдля хранения смещений в локальном файле, определенном с помощью offset.storage.file.filename. Соединитель записывает смещения в файле, и для каждого считываемого изменения механизм Debezium периодически сбрасывает смещения в файл в зависимости от настроек offset.flush.interval.ms.

Другими параметрами для соединителя являются свойства базы данных Postgres, в которых находится таблица Student.

@Bean
public io.debezium.config.Configuration studentConnector() {
    return io.debezium.config.Configuration.create()
            .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
            .with("offset.storage",  "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with("offset.storage.file.filename", "/path/cdc/offset/student-offset.dat")
            .with("offset.flush.interval.ms", 60000)
            .with("name", "student-postgres-connector")
            .with("database.server.name", studentDBHost+"-"+studentDBName)
            .with("database.hostname", studentDBHost)
            .with("database.port", studentDBPort)
            .with("database.user", studentDBUserName)
            .with("database.password", studentDBPassword)
            .with("database.dbname", studentDBName)
            .with("table.whitelist", STUDENT_TABLE_NAME).build();
}

Последнее изменение в настройке встроенного Debezium — запустить его при запуске приложения. Для этого мы используем класс EmbeddedEngine, который действует как оболочка для соединителя и управляет жизненным циклом соединителей. Механизм создается с использованием конфигурации соединителя и функции, которую он будет вызывать для каждого события изменения данных — в нашем примере это метод handleEvent().

private CDCListener(Configuration studentConnector, StudentService studentService) {
    this.engine = EmbeddedEngine
            .create()
            .using(studentConnector)
            .notifying(this::handleEvent).build();

    this.studentService = studentService;
}

После этого handleEvent()мы анализируем каждое событие, определяем, какая операция имела место, и вызываем StudentServiceдля выполнения операций Create / Update / Delete в Elastic Search с использованием Spring Data JPA для Elasticsearch.

Теперь, когда мы настроили EmbeddedEngine, мы можем запустить его асинхронно, используя сервис Executor.

    private final Executor executor = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void start() {
        this.executor.execute(engine);
    }

    @PreDestroy
    private void stop() {
        if (this.engine != null) {
            this.engine.stop();
        }
    }

Видеть код в действии

Затем мы запускаем все необходимые инструменты, запустив файл docker-compose , используя команду, docker-compose up -dи запустив «Student CDC Relay», используя команду mvn spring-boot:run. Мы можем настроить таблицу Student, запустив следующий скрипт:

CREATE TABLE public.student
(
    id integer NOT NULL,
    address character varying(255),
    email character varying(255),
    name character varying(255),
    CONSTRAINT student_pkey PRIMARY KEY (id)
);

Чтобы увидеть код в действии, мы вносим изменения в таблицу, которую мы только что создали.

Вставка записи в таблицу ученика

Мы можем выполнить следующую инструкцию SQL, чтобы вставить запись в таблицу Student в нашей базе данных Postgres.

INSERT INTO STUDENT(ID, NAME, ADDRESS, EMAIL) VALUES('1','Jack','Dallas, TX','jack@gmail.com');

Мы можем проверить, что запись была создана на ElasticSearch.

$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
  "_index" : "student",
  "_type" : "student",
  "_id" : "1",
  "_version" : 31,
  "_seq_no" : 30,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 1,
    "name" : "Jack",
    "address" : "Dallas, TX",
    "email" : "jack@gmail.com"
  }
}


 Обновление записи в таблице учеников

Мы можем запустить следующую инструкцию SQL, чтобы обновить запись в таблице Student в нашей базе данных Postgres.

UPDATE STUDENT SET EMAIL='jill@gmail.com', NAME='Jill' WHERE ID = 1; 

Мы можем убедиться, что данные были изменены на «Jill» в ElasticSearch.

$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
  "_index" : "student",
  "_type" : "student",
  "_id" : "1",
  "_version" : 32,
  "_seq_no" : 31,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 1,
    "name" : "Jill",
    "address" : "Dallas, TX",
    "email" : "jill@gmail.com"
  }
}


 Удаление записи в таблице учеников

Мы можем выполнить следующую инструкцию SQL, чтобы удалить запись из таблицы Student в нашей базе данных Postgres.

DELETE FROM STUDENT WHERE ID = 1;

Мы можем проверить, что данные были удалены с ElasticSearch.

$ curl -X GET http://localhost:9200/student/student/1?pretty=true
{
  "_index" : "student",
  "_type" : "student",
  "_id" : "1",
  "_version" : 33,
  "_seq_no" : 32,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "id" : 1,
    "name" : null,
    "address" : null,
    "email" : null
  }
}

Последние мысли

Этот подход действительно намного проще с небольшим количеством движущихся частей, но он более ограничен в плане масштабирования и гораздо менее терпим к сбоям.

The source record will be handled exactly once when the CDC-Relay application is running fine. The underlying applications do need to be tolerant of receiving duplicate events following a restart of the CDC-Relay application.

We can test the limitations around scaling by starting another instance of the ‘Student CDC Relay’ [on another port]. We see the below exception:

ERROR 59453 --- [pool-2-thread-1] io.debezium.embedded.EmbeddedEngine      : Error while trying to run connector class 'io.debezium.connector.postgresql.PostgresConnector'

Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" is active for PID <>
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1116) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:842) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:58) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:42) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38) ~[postgresql-42.2.5.jar:42.2.5]
at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:37) ~[postgresql-42.2.5.jar:42.2.5]

If your application needs at-least-once delivery guarantees of all messages, it would be better to use the the full distributed Debezium system with Kafka-Connect.

Further Reading