Статьи

Создайте поток событий CDC из базы данных Oracle в Kafka с GoldenGate

Oracle предоставляет   обработчик Kafka Connect в своем   наборе Oracle GoldenGate for Big Data для передачи потока событий CDC (Change Data Capture) в кластер Apache Kafka.

Таким образом, с учетом базы данных Oracle любая операция DML (INSERT, UPDATE, DELETE) внутри успешно завершенной бизнес-транзакции будет преобразована в сообщение Кафки, опубликованное в режиме реального времени. 

Эта интеграция может быть очень интересной и полезной для таких случаев использования:

  • Учитывая унаследованное монолитное приложение, имеющее базу данных Oracle в качестве  единственного источника правды , должна быть возможность создавать поток событий обновления в реальном времени, просто отслеживая изменения соответствующих таблиц. Другими словами, мы можем реализовать конвейеры данных из устаревших приложений,  не меняя их .

  • Мы должны разрешить, чтобы сообщение Кафки публиковалось только в случае успешного завершения транзакции базы данных. Чтобы предоставить эту функцию, мы можем записать (всегда транзакционно) сообщение Kafka в специально отслеживаемую таблицу GoldenGate, которая через свой обработчик Kafka Connect опубликует событие «INSERT», сохраняющее исходное сообщение Kafka для развертывания.

В этой статье мы расскажем, шаг за шагом, как реализовать PoC (Proof-of-Concept) для тестирования интеграции между базой данных Oracle с Kafka с помощью технологии GoldenGate.

Необходимые условия для PoC

Мы установим все это на локальной виртуальной машине, поэтому вам нужно:

  • Установка  Oracle VirtualBox  (я тестировал на Oracle VirtualBox 5.2.20)

  • 16 ГБ ОЗУ.
  • Около 75 ГБ свободного места на диске.
  • И последнее, но не менее важное: знать  vi .

PoC Architecture

Это руководство создаст одну виртуальную машину, имеющую:

  • Oracle Database 12c : где таблицы на монитор сохраняется.

  • Oracle GoldenGate 12с (классический вариант) : где бизнес — операции применяются на отслеживаемых таблицах  извлекаются  в режиме реального времени, хранятся в промежуточном формате журнал ( журнале след ) и  закачивается  в удаленный журнал след управляемый другой GoldenGate (для больших данных ) пример.

  • Oracle GoldenGate для 12с Big Data : где подаваемые бизнес — операции принимаются и  реплицируется  в сообщениях Кафки.

  • Apache  Zookeeper / Apache Кафка  пример: когда бизнес — операции конвертируется в сообщениях Кафки опубликованы.

Другими словами, любая операция INSERT, UPDATE и DELETE, применяемая к некоторым таблицам Oracle, будет генерировать поток событий CDC сообщений Kafka, которые будут опубликованы в одной теме Kafka. 

Следующая архитектура и поток данных в реальном времени, который мы собираемся создать:

Шаг 1/12: загрузка базы данных Oracle

Вы можете установить базу данных Oracle и Oracle GoldenGate вручную. Но (к счастью …) Oracle делится некоторыми виртуальными машинами, которые содержат все уже установленное и готовое к работе в целях разработки.

Виртуальные машины Oracle можно скачать  здесь , для их получения необходима бесплатная учетная запись Oracle. 

Я использовал  виртуальную машину Oracle Big Data Lite (версия 4.11) , она содержит множество продуктов Oracle, включая:

  • Oracle Database 12c Release 1 Enterprise Edition (12.1.0.2)

  • Oracle GoldenGate 12c (12.3.0.1.2)

Получает все 7-zip-файлы (около 22 ГБ ) со страницы загрузки выше, извлекает файл образа  виртуальной машины BigDataLite411.ova  и запускает мастер импорта в Oracle VirtualBox, дважды щелкнув файл. После завершения процесса импорта будет доступна виртуальная машина с именем  BigDataLite-4.11  .

Запускает BigDataLite-4.11 и входит в систему со следующими учетными данными:

  • пользователь:  oracle

  • пароль:  welcome1 

и появится удобная среда Linux Desktop.

Дважды щелкните значок « Запуск / остановка служб»  на рабочем столе и:

  1. Проверьте первый элемент ORCL ( Oracle Database 12c ).

  2. Снимите все остальные вещи (бесполезные и вредные для PoC).

  3. Нажмите ENTER, чтобы подтвердить выбор.

И, наконец, база данных Oracle запустится.


Когда вы перезагрузите виртуальную машину, база данных Oracle будет запущена автоматически.

Другая полезная информация, связанная с загруженной виртуальной машиной:

  • Домашняя папка Oracle ( $ ORACLE_HOME ):  /u01/app/oracle/product/12.1.0.2/dbhome_1

  • GoldenGate (classic) устанавливается в / u01 / ogg

  • SQL Developer установлен в / u01 / sqldeveloper . Вы можете запустить SQL Developer с помощью значка на панели инструментов выше.

  • База данных Oracle устанавливается как база данных многопользовательских контейнеров (CDB) .
  • Порт прослушивателя базы данных Oracle — 1521

  • Oracle SID корневого контейнера — это cdb

  • Oracle SID PDB (подключаемой базы данных) — orcl

  • Все пользователи базы данных Oracle ( SYS , SYSTEM и т. Д.) Имеют пароль welcome1

  • Псевдоним tnsname для подключения к базе данных PDB — ORCL (см.  Содержимое  файла $ ORACLE_HOME / network / admin / tnsnames.ora ). 

  • Домашняя папка Java ( $ JAVA_HOME ) — это / usr / java / latest

  • Java Development Kit, установленный в $ JAVA_HOME, является обновлением JDK8 151. 

Шаг 2/12: Включить архивирование в Oracle

Нам нужно включить архив журнала в Oracle для использования GoldenGate (classic).

Запустите SQL Plus как SYS из оболочки Linux на виртуальной машине:

sqlplus sys/welcome1 as sysdba

Затем из оболочки SQL Plus запускается этот список команд (я предлагаю запускать их по одной):

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE FORCE LOGGING;
ALTER SYSTEM SWITCH LOGFILE;
ALTER SYSTEM SET ENABLE_GOLDENGATE_REPLICATION=TRUE;
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;

Затем проверьте, успешно ли был включен архив журнала:

ARCHIVE LOG LIST;

Вывод должен быть примерно таким:

Database log mode       Archive Mode
Automatic archival       Enabled
Archive destination       USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     527
Next log sequence to archive   529
Current log sequence       529

Шаг 3/12: создайте пользователя ggadmin

Для правильной работы GoldenGate (classic) необходимо создать специального пользователя-администратора Oracle.

Снова откройте SQL Plus из оболочки Linux виртуальной машины: 

sqlplus sys/welcome1 as sysdba

и создайте  пользователя ggadmin , запустив этот скрипт:

ALTER SESSION SET "_ORACLE_SCRIPT"=TRUE;  
CREATE USER ggadmin IDENTIFIED BY ggadmin;
GRANT CREATE SESSION, CONNECT, RESOURCE, ALTER SYSTEM TO ggadmin;
EXEC DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE(grantee=>'ggadmin', privilege_type=>'CAPTURE', grant_optional_privileges=>'*');
GRANT SELECT ANY DICTIONARY TO ggadmin;
GRANT UNLIMITED TABLESPACE TO ggadmin;

Шаг 4/12 — Создание схемы ESHOP

Мы собираемся создать схему ( ESHOP ), имеющую всего пару таблиц ( CUSTOMER_ORDER и CUSTOMER_ORDER_ITEM ) для генерации потока событий CDC, который нужно отправить в Kafka.

Подключитесь к Oracle PDB с  orcl в качестве SID с SQL Plus (или, если хотите, используйте SQL Developer):

sqlplus sys/welcome1@ORCL as sysdba

и запустите этот скрипт:

-- init session 
ALTER SESSION SET "_ORACLE_SCRIPT"=TRUE;  

-- create tablespace for eshop 
CREATE TABLESPACE eshop_tbs DATAFILE 'eshop_tbs.dat' SIZE 10M AUTOEXTEND ON;
CREATE TEMPORARY TABLESPACE eshop_tbs_temp TEMPFILE 'eshop_tbs_temp.dat' SIZE 5M AUTOEXTEND ON;

-- create user schema eshop, please note that the password is eshop
CREATE USER ESHOP IDENTIFIED BY eshop DEFAULT TABLESPACE eshop_tbs TEMPORARY TABLESPACE eshop_tbs_temp;

-- grant eshop user permissions
GRANT CREATE SESSION TO ESHOP;
GRANT CREATE TABLE TO ESHOP;
GRANT UNLIMITED TABLESPACE TO ESHOP;
GRANT RESOURCE TO ESHOP;
GRANT CONNECT TO ESHOP;
GRANT CREATE VIEW TO ESHOP;

-- create eshop sequences
CREATE SEQUENCE ESHOP.CUSTOMER_ORDER_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;
CREATE SEQUENCE ESHOP.CUSTOMER_ORDER_ITEM_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;

-- create eshop tables
CREATE TABLE ESHOP.CUSTOMER_ORDER (
    ID NUMBER(19) PRIMARY KEY,
    CODE VARCHAR2(10),
    CREATED DATE,
    STATUS VARCHAR2(32),
    UPDATE_TIME TIMESTAMP
);

CREATE TABLE ESHOP.CUSTOMER_ORDER_ITEM (
    ID NUMBER(19) PRIMARY KEY,
    ID_CUSTOMER_ORDER NUMBER(19),
    DESCRIPTION VARCHAR2(255),
    QUANTITY NUMBER(3),
    CONSTRAINT FK_CUSTOMER_ORDER FOREIGN KEY (ID_CUSTOMER_ORDER) REFERENCES ESHOP.CUSTOMER_ORDER (ID)
);

Шаг 5/12: инициализация GoldenGate Classic

Теперь пришло время настроить экземпляр GoldenGate (classic), установленный на виртуальной машине BigDataListe-4.11 .

Из оболочки Linux запускается:

cd /u01/ogg
./ggsci

и GoldenGate CLI (интерфейс командной строки) запустится:

Oracle GoldenGate Command Interpreter for Oracle
Version 12.2.0.1.0 OGGCORE_12.2.0.1.0_PLATFORMS_151101.1925.2_FBO
Linux, x64, 64bit (optimized), Oracle 12c on Nov 11 2015 03:53:23
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2015, Oracle and/or its affiliates. All rights reserved.

GGSCI (bigdatalite.localdomain) 1> 

Из CLI GoldenGate запустите менеджер с помощью следующей команды:

start mgr

он загрузит основной процесс контроллера GoldenGate (прослушивает порт 7810 ).

Теперь создайте хранилище учетных данных  для хранения  учетных данных пользователя ggadmin (и обратитесь к ним с псевдонимом с тем же именем): 

add credentialstore
alter credentialstore add user ggadmin password ggadmin alias ggadmin

Теперь подключитесь к базе данных Oracle, используя  только что созданный псевдоним ggadmin, и включите дополнительный журнал для схемы eshop, хранящейся в PDB с именем orcl :

dblogin useridalias ggadmin
add schematrandata orcl.eshop

Шаг 6/12: Создайте экстракт GoldenGate

На этом этапе мы собираемся создать извлечение GoldenGate  , этот процесс будет отслеживать журналы повторов архива Oracle для захвата транзакций базы данных, связанных с  таблицами ESHOP, и записывать этот поток изменений SQL в другой файл журнала с именем trail log .

Из GoldenGate CLI работает:

edit params exteshop

Эта команда откроет экземпляр vi, который ссылается на новый пустой файл. Поместите в редактор vi следующий контент:

EXTRACT exteshop
USERIDALIAS ggadmin
EXTTRAIL ./dirdat/aa
TABLE orcl.eshop.*;

Сохраните содержимое и выйдите из vi, чтобы вернуться в интерфейс GoldenGate CLI.


Сохраненный контент будет сохранен в файле /u01/ogg/dirprm/exteshop.prm.
Вы также можете редактировать его содержимое без необходимости повторного запуска команды «edit params exteshop» из CLI GoldenGate.

Теперь зарегистрируйте процесс извлечения в Oracle, выполните следующие команды из интерфейса командной строки GoldenGate:

dblogin useridalias ggadmin
register extract exteshop database container (orcl)

Вывод последней команды должен выглядеть примерно так:

OGG-02003  Extract EXTESHOP successfully registered with database at SCN 13624423.

Используйте номер SCN, показанный для завершения  конфигурации извлечения . Из GoldenGate CLI:

add extract exteshop, integrated tranlog, scn 13624423
add exttrail ./dirdat/aa, extract exteshop

Теперь мы можем запустить процесс извлечения GoldenGate с именем exteshop :

start exteshop

Вы можете проверить состояние процесса с помощью одной из следующих команд:

info exteshop
view report exteshop

Убедитесь, что процесс извлечения работает правильно для завершения этого шага. Подключитесь к схеме ESHOP с помощью SQL Plus (или разработчика SQL), выполнив эту команду из оболочки Linux:

sqlplus eshop/eshop@ORCL

Создайте фиктивный заказ клиента:

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)
VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA01', SYSDATE, 'DRAFT', SYSTIMESTAMP);

INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)
VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Toy Story', 1);

COMMIT;

и, наконец, из CLI GoldenGate запустите:

stats exteshop

и убедитесь, что предыдущие операции INSERT были учтены. Ниже приведен небольшой пример  вывода команды stats :

Extracting from ORCL.ESHOP.CUSTOMER_ORDER to ORCL.ESHOP.CUSTOMER_ORDER:

*** Total statistics since 2019-05-29 09:18:12 ***
Total inserts                              1.00
Total updates                              0.00
Total deletes                              0.00
Total discards                             0.00
Total operations                           1.00


Другой способ проверить, что процесс извлечения работает правильно, это проверить временную метку файла журнала GoldenGate.
Из оболочки Linux запускается ‘ls -l / u01 / ogg / dirdat /’ и убедитесь, что временная метка файлов, начинающихся с «aa», была изменена.

Шаг 7/12: Установите и запустите Apache Kafka

Откройте Firefox из среды рабочего стола виртуальной машины и загрузите Apache Kafka (я использовал kafka_2.11-2.1.1.tgz ).

Теперь откройте оболочку Linux и сбросьте переменную среды CLASSPATH  (текущее значение, установленное внутри виртуальной машины BigDataLite-4.11, может создавать конфликты в Kafka):

declare -x CLASSPATH=""

Из той же оболочки Linux распакуйте архив и запустите ZooKeeper и Kafka:

cd
tar zxvf Downloads/kafka_2.11-2.1.1.tgz
cd kafka_2.11-2.1.1
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
./bin/kafka-server-start.sh -daemon config/server.properties

Вы можете проверить, работает ли ZooKeeper, запустив « echo stats | nc localhost 2181 »:

[oracle@bigdatalite ~]$ echo stats | nc localhost 2181
Zookeeper version: 3.4.5-cdh5.13.1--1, built on 11/09/2017 16:28 GMT
Clients:
 /127.0.0.1:34997[1](queued=0,recved=7663,sent=7664)
 /0:0:0:0:0:0:0:1:17701[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/25
Received: 8186
Sent: 8194
Connections: 2
Outstanding: 0
Zxid: 0x3f
Mode: standalone
Node count: 25

Вы можете проверить, есть ли у Кафки « echo dump | nc localhost 2181 | grep brokers » ( должна появиться строка  / brokers / ids / 0 )

[oracle@bigdatalite ~]$ echo dump | nc localhost 2181 | grep brokers
/brokers/ids/0


Виртуальная машина BigDataLite-4.11, используемая для PoC, уже имеет более старый экземпляр ZooKeeper, запущенный при загрузке виртуальной машины.
Поэтому обязательно отключите все службы, как описано в шаге 1.

Кроме того, когда вы откроете новую оболочку Linux, всегда помните, что необходимо всегда сбрасывать переменную среды CLASSPATH, прежде чем запускать ZooKeeper и Kafka, как объяснялось в начале шага.

Шаг 8/12: установите GoldenGate для больших данных

Опять же, просто используйте браузер Firefox, установленный на виртуальной машине, для загрузки Oracle GoldenGate для больших данных 12c  с этой страницы  (я использовал  Oracle GoldenGate для больших данных 12.3.2.1.1 в Linux x86-64 ). Имейте в виду, что вам нужна (бесплатная) учетная запись Oracle, чтобы получить ее.

Установка очень проста, просто взорвите архив внутри загруженного zip-файла:

cd ~/Downloads
unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
cd ..
mkdir ogg-bd-poc
cd ogg-bd-poc
tar xvf ../Downloads/OGG_BigData_Linux_x64_12.3.2.1.1.tar

И все, GoldenGate для Big Data 12c устанавливается в папку  / home / oracle / ogg-bd-poc


Опять же, на виртуальной машине BigDataLite-4.11 уже установлена ​​программа GoldenGate для больших данных в папке / u01 / ogg-bd.
Но это более старая версия с меньшим количеством вариантов подключения к Kafka.

Шаг 9/12: Запустите GoldenGate для менеджера больших данных

Откройте GoldenGate для больших данных CLI:

cd ~/ogg-bd-poc
./ggsci

Порт менеджера должен быть изменен, в противном случае возникнет конфликт с менеджером GoldenGate (classic), начатый до того, как возникнет проблема.

Итак, из GoldenGate для больших данных CLI работает:

create subdirs
edit params mgr

VI экземпляр будет начать, просто написать этот материал:

PORT 27801

Затем сохраните содержимое и выйдите из vi, чтобы вернуться в CLI, где мы наконец сможем запустить диспетчер GoldenGate для больших данных, прослушивающий порт 27081 :

start mgr

Шаг 10/12: Создать Data Pump

Теперь нам нужно создать то, что в мире GoldenGate называется  Data Pump . Насос данных  представляет собой экстракт процесс , который отслеживает журнал следа и (в реальном времени) толкает какие — либо изменения в другой журнал след управляемый другим (и обычно удаленном) , например GoldenGate.

Для этого PoC журнал трасс,  управляемый GoldenGate (classic), будет закачан в журнал трассировок, управляемый GoldenGate для больших данных.

Итак, если вы закроете его, вернитесь к CLI GoldenGate (classic) из оболочки Linux:

cd /u01/ogg
./ggsci

Из GoldenGate (classic) CLI работает:

edit params pmpeshop

и в  vi положить этот контент:

EXTRACT pmpeshop
USERIDALIAS ggadmin
SETENV (ORACLE_SID='orcl')
-- GoldenGate for Big Data address/port:
RMTHOST localhost, MGRPORT 27801
RMTTRAIL ./dirdat/bb
PASSTHRU
-- The "tokens" part it is useful for writing in the Kafka messages
-- the Transaction ID and the database Change Serial Number
TABLE orcl.eshop.*, tokens(txid = @GETENV('TRANSACTION', 'XID'), csn = @GETENV('TRANSACTION', 'CSN'));

сохранить содержимое и выйти из vi


Как уже объяснялось для экстрактора, сохраненный контент будет сохранен в файле /u01/ogg/dirprm/pmpeshop.prm.

Теперь мы собираемся зарегистрироваться и запустить насос данных из CLI GoldenGate:

dblogin useridalias ggadmin
add extract pmpeshop, exttrailsource ./dirdat/aa begin now
add rmttrail ./dirdat/bb extract pmpeshop
start pmpeshop

Проверяет состояние насоса данных  , выполнив одну из этих команд из CLI:

info pmpeshop
view report pmpeshop

Вы даже можете проверить, был ли создан журнал следов bb  в  папке dirdat Золотых Ворот для Больших Данных:

[oracle@bigdatalite dirdat]$ ls -l ~/ogg-bd-poc/dirdat
total 0
-rw-r-----. 1 oracle oinstall 0 May 30 13:22 bb000000000
[oracle@bigdatalite dirdat]$

А что насчет проверки процесса прокачки? Из оболочки Linux:

sqlplus eshop/eshop@ORCL

Выполните этот сценарий SQL для создания нового фиктивного заказа клиента:

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)
VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA02', SYSDATE, 'SHIPPING', SYSTIMESTAMP);

INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)
VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Inside Out', 1);

COMMIT;

Теперь из GoldenGate (classic) CLI работает:

stats pmpeshop

Для проверки правильности подсчета операций INSERT (ниже части выходных данных):

GGSCI (bigdatalite.localdomain as ggadmin@cdb/CDB$ROOT) 11> stats pmpeshop

Sending STATS request to EXTRACT PMPESHOP ...

Start of Statistics at 2019-05-30 14:49:00.

Output to ./dirdat/bb:

Extracting from ORCL.ESHOP.CUSTOMER_ORDER to ORCL.ESHOP.CUSTOMER_ORDER:

*** Total statistics since 2019-05-30 14:01:56 ***
Total inserts                              1.00
Total updates                              0.00
Total deletes                              0.00
Total discards                             0.00
Total operations                           1.00


Опять же, вы также можете проверить временную метку журнала, хранящегося в GoldenGate для больших данных, для тестирования процесса насоса.
После фиксации транзакции запускается из оболочки Linux: «ln -l ~ / ogg-bd-poc / dirdat» и проверяет временную метку последнего файла с префиксом «bb».

Шаг 11/12: публикация транзакций в Kafka

Наконец, мы собираемся создать  процесс репликации в GoldenGate для BigData, чтобы опубликовать накачанную бизнес-транзакцию в теме Kafka. Replicat прочтет INSERT, UPDATE и DELETE операций внутри транзакций из регистрационного журнала след бб  и преобразует их в сообщениях Кафки , закодированных в формате JSON.

Итак, создайте файл с именем eshop_kafkaconnect.properties  внутри папки / home / oracle / ogg-bd-poc / dirprm, имеющий следующее содержимое:

# File: /home/oracle/ogg-bd-poc/dirprm/eshop_kafkaconnect.properties
# -----------------------------------------------------------

# address/port of the Kafka broker
bootstrap.servers=localhost:9092
acks=1

#JSON Converter Settings
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

#Adjust for performance
buffer.memory=33554432
batch.size=16384
linger.ms=0

# This property fix a start-up error as explained by Oracle Support here:
# https://support.oracle.com/knowledge/Middleware/2455697_1.html
converter.type=key

В этой же папке создается файл с именем eshop_kc.props,  имеющий следующее содержимое:

# File: /home/oracle/ogg-bd-poc/dirprm/eshop_kc.props
# ---------------------------------------------------
gg.handlerlist=kafkaconnect

#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=eshop_kafkaconnect.properties
gg.handler.kafkaconnect.mode=tx

#The following selects the topic name based only on the schema name
gg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName}

#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}

#The formatter properties
gg.handler.kafkaconnect.messageFormatting=op
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
gg.handler.kafkaconnect.includeTableName=true
gg.handler.kafkaconnect.includeOpType=true
gg.handler.kafkaconnect.includeOpTimestamp=true
gg.handler.kafkaconnect.includeCurrentTimestamp=true
gg.handler.kafkaconnect.includePosition=true
gg.handler.kafkaconnect.includePrimaryKeys=true
gg.handler.kafkaconnect.includeTokens=true

goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

# Apache Kafka Classpath
# Put the path of the "libs" folder inside the Kafka home path
gg.classpath=/home/oracle/kafka_2.11-2.1.1/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm

Если он был закрыт, перезапустите интерфейс командной строки GoldenGate для больших данных:

cd ~/ogg-bd-poc
./ggsci

и начать создавать репликацию из CLI с помощью:

edit params repeshop

in vi put this content:

REPLICAT repeshop
TARGETDB LIBFILE libggjava.so SET property=dirprm/eshop_kc.props
GROUPTRANSOPS 1000
MAP orcl.eshop.*, TARGET orcl.eshop.*;

then save the content and exit vi. Now associate the replicat to the trail log bb and start the replicat process with these commands to launch from GoldenGate for Big Data CLI:

add replicat repeshop, exttrail ./dirdat/bb
start repeshop

Check that the replicat is live and kicking with one of these commands:

info repeshop
view report repeshop

Now, connect to the ESHOP schema from another Linux shell:

sqlplus eshop/eshop@ORCL

and commit something:

INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)
VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA03', SYSDATE, 'DELIVERED', SYSTIMESTAMP);

INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)
VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Cars 3', 2);

COMMIT;

From the GoldenGate for Big Data CLI, check that the INSERT operation was counted for the replicat process by running:

stats repeshop

And (hurrah!) we can have a look inside Kafka, as the Linux shell checks that the topic named CDC-ESHOP was created:

cd ~/kafka_2.11-2.1.1/bin
./kafka-topics.sh --list --zookeeper localhost:2181

and from the same folder run the following command for showing the CDC events stored in the topic:

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning

You should see something like:

[oracle@bigdatalite kafka_2.11-2.1.1]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning 
{"table":"ORCL.ESHOP.CUSTOMER_ORDER","op_type":"I","op_ts":"2019-05-31 04:24:34.000327","current_ts":"2019-05-31 04:24:39.637000","pos":"00000000020000003830","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"CODE":"AAAA03","CREATED":"2019-05-31 04:24:34","STATUS":"DELIVERED","UPDATE_TIME":"2019-05-31 04:24:34.929950000"}}
{"table":"ORCL.ESHOP.CUSTOMER_ORDER_ITEM","op_type":"I","op_ts":"2019-05-31 04:24:34.000327","current_ts":"2019-05-31 04:24:39.650000","pos":"00000000020000004074","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"ID_CUSTOMER_ORDER":11.0,"DESCRIPTION":"Cars 3","QUANTITY":2}}

For a better output, install jq:

sudo yum -y install jq
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic CDC-ESHOP --from-beginning | jq .

and here is how will appear the JSON events:

{
  "table": "ORCL.ESHOP.CUSTOMER_ORDER",
  "op_type": "I",
  "op_ts": "2019-05-31 04:24:34.000327",
  "current_ts": "2019-05-31 04:24:39.637000",
  "pos": "00000000020000003830",
  "primary_keys": [
    "ID"
  ],
  "tokens": {
    "txid": "9.32.6726",
    "csn": "13906131"
  },
  "before": null,
  "after": {
    "ID": 11,
    "CODE": "AAAA03",
    "CREATED": "2019-05-31 04:24:34",
    "STATUS": "DELIVERED",
    "UPDATE_TIME": "2019-05-31 04:24:34.929950000"
  }
}
{
  "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM",
  "op_type": "I",
  "op_ts": "2019-05-31 04:24:34.000327",
  "current_ts": "2019-05-31 04:24:39.650000",
  "pos": "00000000020000004074",
  "primary_keys": [
    "ID"
  ],
  "tokens": {
    "txid": "9.32.6726",
    "csn": "13906131"
  },
  "before": null,
  "after": {
    "ID": 11,
    "ID_CUSTOMER_ORDER": 11,
    "DESCRIPTION": "Cars 3",
    "QUANTITY": 2
  }
}

Now leaves the kafka-console-consumer.sh process open and make some other database transactions on ESHOP for printing in real-time the CDC event stream sent to Kafka.

Following some samples of JSON events for UPDATE and DELETE operations:

// Generated with: UPDATE CUSTOMER_ORDER SET STATUS='DELIVERED' WHERE ID=8; 
{
  "table": "ORCL.ESHOP.CUSTOMER_ORDER",
  "op_type": "U",
  "op_ts": "2019-05-31 06:22:07.000245",
  "current_ts": "2019-05-31 06:22:11.233000",
  "pos": "00000000020000004234",
  "primary_keys": [
    "ID"
  ],
  "tokens": {
    "txid": "14.6.2656",
    "csn": "13913689"
  },
  "before": {
    "ID": 8,
    "CODE": null,
    "CREATED": null,
    "STATUS": "SHIPPING",
    "UPDATE_TIME": null
  },
  "after": {
    "ID": 8,
    "CODE": null,
    "CREATED": null,
    "STATUS": "DELIVERED",
    "UPDATE_TIME": null
  }
}

// Generated with: DELETE CUSTOMER_ORDER_ITEM WHERE ID=3;
{
  "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM",
  "op_type": "D",
  "op_ts": "2019-05-31 06:25:59.000916",
  "current_ts": "2019-05-31 06:26:04.910000",
  "pos": "00000000020000004432",
  "primary_keys": [
    "ID"
  ],
  "tokens": {
    "txid": "14.24.2651",
    "csn": "13913846"
  },
  "before": {
    "ID": 3,
    "ID_CUSTOMER_ORDER": 1,
    "DESCRIPTION": "Toy Story",
    "QUANTITY": 1
  },
  "after": null
}

Congratulations! You completed the PoC:

Step 12/12: Play With PoC

The Kafka Connect handler available in GoldenGate for Big Data has a lot of options useful to customize the integration according to your needs. Checks the official documentation here.

For example, you can choose to create a different topic for each table involved in the CDC stream, just edit this property in eshop_kc.props:

gg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName}-${tableName}

Restart the replicat after any change, from the GoldenGate for Big Data CLI:

stop repeshop
start repeshop

You can find some other configuration example in the folder «~/ogg-bd-poc/AdapterExamples/big-data/kafka_connect».

Conclusions

In this article we created a full integration between an Oracle database and a Kafka broker through the GoldenGate technology. The CDC event stream is published in Kafka in real-time.

For simplicity we have used a single virtual machine having all installed, but you are free to install GoldenGate for Big Data and Kafka in different hosts.

Let me know in the comments what you think of the potential (or limits) of this integration.