Вам также может понравиться:
Шаблоны проектирования для микросервисов
Постановка проблемы
Микросервисы часто публикуют события после выполнения транзакции базы данных. Запись в базу данных и публикация события — это две разные транзакции, и они должны быть атомарными. Отказ опубликовать событие может означать критический отказ бизнес-процесса.
Чтобы лучше объяснить формулировку проблемы, давайте рассмотрим микросервис Студента, который помогает зачислить студента. После зачисления служба «Каталог курсов» отправляет студенту по электронной почте все доступные курсы. Предполагая приложение , управляемое событиями , микросервис Студента регистрирует студента, вставляя запись в базу данных, и публикует событие, сообщающее, что регистрация для студента завершена. Сервис «Каталог курсов» прослушивает это событие и выполняет его действия. В случае сбоя, если микросервис Студента выходит из строя после вставки записи, система остается в несогласованном состоянии.
Шаблон OutBox
Этот шаблон обеспечивает эффективное решение для надежной публикации событий. Идея этого подхода заключается в том, чтобы иметь таблицу «Исходящие» в базе данных сервиса. При получении запроса на зачисление выполняется не только вставка в таблицу Student, но и запись, представляющая событие, также вставляется в таблицу Outbox. Два действия базы данных выполняются как часть одной транзакции.
Асинхронный процесс отслеживает в таблице исходящих сообщений новые записи и, если они есть, публикует события в шине событий. Шаблон просто разделяет две транзакции на разные сервисы, повышая надежность.
Описание этого шаблона можно найти на отличном сайте microservices.io Криса Ричардсона . Как описано на сайте, существует два подхода к реализации шаблона «Исходящие» ( отслеживание журнала транзакций и издатель опроса ). В решении ниже мы будем использовать подход с хвостами.
Хвост журналирования транзакций может быть реализован очень элегантно и эффективно с использованием Change Data Capture (CDC) с Debezium и Kafka-Connect .
Шаблон исходящих сообщений с Kafka Connect
Дизайн решения
Микросервис Student предоставляет конечные точки для выполнения операций базы данных в домене. Микросервис использует базу данных Postgres, в которой находятся две таблицы «Студент» и «Исходящие». Транзакционные операции, изменить / вставить в таблицу «Студент» и добавить запись в таблицу «Исходящие».
Платформа Kafka-Connect работает как отдельный сервис помимо брокера Kafka. Разъем Debezium для Postgres развертывается во время выполнения Kafka-Connect, чтобы фиксировать изменения в базе данных. В нашем примере в Kafka-Connect также развернут пользовательский соединитель, чтобы помочь определить правильные темы Kafka для события.
Соединитель Debezium привязывает журналы транзакций базы данных (журнал предварительной записи) из таблицы «Исходящие» и публикует событие в разделах, определенных настраиваемым соединителем.
Это решение гарантирует доставку как минимум один раз , поскольку службы Kafka Connect гарантируют, что каждый соединитель всегда работает; однако есть вероятность, что решение может публиковать одно и то же событие несколько раз между разомкнутыми и запускающимися соединителями. Чтобы обеспечить точную однократную доставку , клиент-клиент должен быть Idempotent , чтобы повторные события не обрабатывались снова.
Понимание кода
Вы можете найти код здесь . Я бы посоветовал вам прочитать историю — поскольку я рассмотрел некоторые ключевые детали реализации и ограничения этого шаблона.
Студенческий Микросервис
Это простой микросервис Spring-Boot, который предоставляет три конечных точки через контроллер REST и использует Spring-JPA для действий с базой данных. Представленные конечные точки — это GET для получения информации о студенте, POST для создания или регистрации студента и PUT для обновления адреса электронной почты студента. POST и PUT генерируют события «Студент зачислен» и «Студенческая электронная почта изменена». Изменение для вызова действий базы данных и вставки события обрабатывается в классе Service.
@Transactional
public StudentDTO enrollStudent(EnrollStudentDTO student)
throws Exception {
log.info("Enroll Student details for StudentId: {}",
student.getName());
StudentEntity studentEntity = StudentMapper.
INSTANCE.studentDTOToEntity(student);
studentRepository.save(studentEntity);
//Publish the event
event.fire(EventUtils.createEnrollEvent(studentEntity));
return StudentMapper.INSTANCE.studentEntityToDTO(studentEntity);
}
...
public static OutboxEvent createEnrollEvent(StudentEntity studentEntity)
{
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.convertValue(studentEntity, JsonNode.class);
return new OutboxEvent(
studentEntity.getStudentId(),
"STUDENT_ENROLLED",
jsonNode
);
}
Метод нуждается в Transactional
аннотации, чтобы действие базы данных и запись события были связаны одной транзакцией. enrollStudent()
Создает новую запись в таблице Стьюдента , а затем запускает событие , используя Spring в ApplicationEventPublisherAware
поддержку. Этот метод createEnrollEvent()
помогает создавать данные для вставки в OutBox. Вставка события в таблицу «Исходящие» обрабатывается в EventService
классе, который использует репозиторий Spring-JPA для обработки взаимодействий с базой данных.
@EventListener
public void handleOutboxEvent(OutboxEvent event) {
UUID uuid = UUID.randomUUID();
OutBoxEntity entity = new OutBoxEntity(
uuid,
event.getAggregateId(),
event.getEventType(),
event.getPayload().toString(),
new Date()
);
log.info("Handling event : {}.", entity);
outBoxRepository.save(entity);
/*
* Delete the event once written, so that the outbox doesn't grow.
* The CDC eventing polls the database log entry and not the table in the database.
*/
outBoxRepository.delete(entity);
}
Ключевым моментом, на который следует обратить внимание, является то, что код удаляет запись из таблицы «Исходящие» после ее записи, чтобы таблица исходящих не увеличивалась. Кроме того, Debezium не проверяет фактическое содержимое таблицы базы данных, а вместо этого привязывает журнал транзакций с опережением записи. Вызовы save()
и delete()
сделают запись CREATE
и DELETE
в журнале, как только транзакция будет совершена. Пользовательский преобразователь Kafka-Connect можно запрограммировать так, чтобы он не выполнял никаких действий над DELETE
входом.
Custom Debezium Transformer
Этот компонент определяет тему Kafka, для которой необходимо опубликовать событие. Это делается с помощью столбца EVENT_TYPE полезной нагрузки из таблицы «Исходящие». Компонент построен как JAR и будет помещен во время выполнения Kafka-Connect. Настройка размещения JAR во время выполнения Kafka-Connect выполняется DockerFile.
FROM debezium/connect
ENV DEBEZIUM_DIR=$KAFKA_CONNECT_PLUGINS_DIR/debezium-transformer
RUN mkdir $DEBEZIUM_DIR
COPY target/custom-debezium-transformer-0.0.1.jar $DEBEZIUM_DIR
Мы используем изображение debezium/connect
, так как оно поставляется со всеми доступными разъемами. Для установки конкретного разъема вы можете обратиться к документации здесь . Компонент состоит только из одного класса, который помогает определить тему до публикации сообщения.
public class CustomTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
/**
* This method is invoked when a change is made on the outbox schema.
*
* @param sourceRecord
* @return
*/
public R apply(R sourceRecord) {
Struct kStruct = (Struct) sourceRecord.value();
String databaseOperation = kStruct.getString("op");
//Handle only the Create's
if ("c".equalsIgnoreCase(databaseOperation)) {
// Get the details.
Struct after = (Struct) kStruct.get("after");
String UUID = after.getString("uuid");
String payload = after.getString("payload");
String eventType = after.getString("event_type").toLowerCase();
String topic = eventType.toLowerCase();
Headers headers = sourceRecord.headers();
headers.addString("eventId", UUID);
// Build the event to be published.
sourceRecord = sourceRecord.newRecord(topic, null, Schema.STRING_SCHEMA, UUID,
null, payload, sourceRecord.timestamp(), headers);
}
return sourceRecord;
}
Трансформатор расширяет Кафка-Connect Transformation класс. apply()
Метод, фильтрует CREATE
операцию ( «C») пропуская DELETE
, как объяснено выше.
Для каждого CREATE
идентифицируется название темы и возвращается полезная нагрузка. Для простоты в этом примере имя темы — это строчное значение столбца EVENT_TYPE, вставленного в таблицу «Исходящие» студенческим микросервисом.
Установка необходимых изображений и рамок
Руководство предполагает, что пользователь предварительно установил докер. Вы можете следовать инструкциям по установке здесь . Создание образа Debezium Connect выполняется путем запуска сборки maven в проекте custom-debezium-connect и построения образа докера.
mvn clean install
docker build -t custom-debezium-connect .
При запуске Docker Compose в папке проекта устанавливаются все необходимые компоненты: Zookeeper, Kafka, Postgres и Kafka-Connect. Файл Docker Compose:
version: "3.5"
services:
# Install postgres and setup the student database.
postgres:
container_name: postgres
image: debezium/postgres
ports:
- 5432:5432
environment:
- POSTGRES_DB=studentdb
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
# Install zookeeper.
zookeeper:
container_name: zookeeper
image: zookeeper
ports:
- 2181:2181
# Install kafka and create needed topics.
kafka:
container_name: kafka
image: confluentinc/cp-kafka
hostname: kafka
ports:
- 9092:9092
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://kafka:29092
LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "student_email_changed:1:1,student_enrolled:1:1"
depends_on:
- zookeeper
# Install debezium-connect.
debezium-connect:
container_name: custom-debezium-connect
image: custom-debezium-connect
hostname: debezium-connect
ports:
- '8083:8083'
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_connect_config
OFFSET_STORAGE_TOPIC: debezium_connect_offsets
STATUS_STORAGE_TOPIC: debezium_connect_status
BOOTSTRAP_SERVERS: kafka:29092
depends_on:
- kafka
- postgres
Мы используем изображение debezium/postgres
, потому что оно поставляется с функцией логического декодирования . Это механизм, который позволяет извлекать изменения, которые были зафиксированы в журнале транзакций, что делает возможным CDC. Документацию по установке плагина в Postgres можно найти здесь .
Настройка тем Кафки
Выполните следующие команды, чтобы создать две темы Kafka: « student_enrolled » и « student_email_changed »
docker exec -t kafka /usr/bin/kafka-topics \
--create --bootstrap-server :9092 \
--topic student_email_changed \
--partitions 1 \
--replication-factor 1
docker exec -t kafka /usr/bin/kafka-topics \
--create --bootstrap-server :9092 \
--topic student_enrolled \
--partitions 1 \
--replication-factor 1
Связывание Debezium Kafka Connect с исходящей таблицей
Выполните приведенную ниже команду curl, чтобы создать соединитель на сервере Kafka-Connect. Этот соединитель указывает на установку Postgres, а также указывает таблицу и пользовательский класс преобразователя, который мы создали ранее.
curl -X POST \
http://localhost:8083/connectors/ \
-H 'content-type: application/json' \
-d '{
"name": "student-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "studentdb",
"database.server.name": "pg-outbox-server",
"tombstones.on.delete": "false",
"table.whitelist": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "com.sohan.transform.CustomTransformation"
}
}'
На этом настройка завершена, и у нас Zookeeper работает на порту 2181, Kafka работает на порту 9092 со всеми необходимыми темами, Postgres работает на порту 5432 с предварительно созданным «StudentDB» и, наконец, Kafka-Connect с Debezium и нашими Трансформатор работает на порту 8083.
Запуск решения
После запуска Студенческого микросервиса мы можем увидеть образец в действии. Чтобы смоделировать зачисление студентов, мы можем выполнить приведенный ниже скручивание.
curl -X POST \
'http://localhost:8080/students/~/enroll' \
-H 'content-type: application/json' \
-d '{
"name": "Megan Clark",
"email": "[email protected]",
"address": "Toronto, ON"
}'
Мы видим, что новая запись студента вставлена в базу данных для «Меган Кларк».
И мы видим событие, опубликованное в теме student_enrolled
, уведомляющее нижестоящие системы, что «Меган Кларк» зарегистрировался.
Чтобы смоделировать студента, обновляющего адрес электронной почты, мы можем выполнить следующую операцию curl.
$ curl -X PUT \ http://localhost:8080/students/1/update-email \
-H 'content-type: application/json' \ -d '{ "email": "[email protected]"}'
Мы можем заметить, что электронная почта была изменена на «[email protected]»
И мы видим событие, опубликованное в теме student_email_changed
, уведомляющее нижестоящие системы о том, что Студент с идентификатором студента ‘1’ изменил свой идентификатор электронной почты.
Если мы прокомментируем строку кода, которая удаляет исходящие события после записи их в EventService ( outBoxRepository.delete(entity)
), мы можем просмотреть события, вставленные в исходящую таблицу.
Резюме
В микросервисной архитектуре сбой системы неизбежен. Адаптация этого архитектурного стиля заставляет нас проектировать для сбоев. Шаблон исходящих событий дает нам надежный метод надежного обмена сообщениями перед лицом отказа.
Вышеупомянутое решение делает реализацию шаблона простой. Но для обеспечения высокой доступности системы мы должны запустить несколько экземпляров (кластеров) Zookeeper, Apache Kafka и Kafka Connect.
Наконец, я хотел бы отметить, что это не единственный способ решения проблемы надежного обмена сообщениями. Но это бесценный образец в вашем распоряжении.