Статьи

Траление бинлога с FlexCDC и новыми плагинами FlexCDC для MySQL

Первоначально Написал Джастин Суонхарт

Swanhart-Tools включает FlexCDC, инструмент сбора данных об изменениях для MySQL. FlexCDC следует двоичному журналу сервера и обычно записывает «журналы изменений», которые отслеживают изменения в таблицах в базе данных. Обычно я говорю, потому что последняя версия Swanhart-Tools  ( пока только в github) поддерживает плагины FlexCDC, которые позволяют отправлять обновления в удаленный источник данных или в любое другое место по вашему вкусу. Вы можете узнать больше об основах FlexCDC в предыдущем сообщении в блоге.

Обратите внимание, что FlexCDC по-прежнему должен иметь экземпляры источника и назначения, определенные в конфигурации, даже при использовании плагинов. Это связано с тем, что состояние FlexCDC (в какой степени двоичный журнал имеет прогресс FlexCDC и какие таблицы записываются) сохраняется в «dest». Обычно при использовании плагина исходный и целевой экземпляры будут одинаковыми. FlexCDC создаст базу данных «flexviews» с несколькими таблицами состояний в целевом экземпляре. Это также означает, что вы все еще должны использовать

create_mvlog.php
add_table.php или процедура create_mvlog (…) Flexview, чтобы отметить, какие таблицы нужно захватить! Смотрите предыдущий пост в блоге о FlexCDC.

Когда вы создаете mvlog, все еще будет таблица изменений, созданная в dest, точно так же, как когда вы не используете плагин. Это связано с тем, что INFORMATION_SCHEMA используется для получения типов данных столбцов и дополнительной информации (например, если int имеет подпись или без знака), и этот поиск выполняется для таблицы в dest. Причина, по которой это необходимо, состоит в том, что mysqlbinlog, утилита, используемая для очистки binlog, выдает странный вывод для больших целых чисел со знаком (он предоставляет версию со знаком и без знака), поэтому FlexCDC должен найти правильный вариант для выбора из действительного DDL таблица изменений. FlexCDC не может посмотреть на DDL исходной таблицы, потому что потребитель может отставать, а текущая структура может не соответствовать структуре строк в журнале.

Новая система плагинов позволяет вам делать много полезных вещей, таких как:

  • Репликация на внешние базы данных
  • Публиковать изменения в очереди сообщений (это похоже на червоточину Facebook)
  • Синхронизируйте сервер удаленного кэша
  • и более…

Последняя версия Swanhart-Tools включает в себя плагин Example (в flexviews / consumer / include / example_plugin.php), который просто печатает события, которые проходят через него, а не регистрирует их вообще в таблице изменений. В конце поста приведен пример вывода.

Пример плагина выглядит так:

<?php
class FlexCDC_Plugin {
 
    static function begin_trx($uow_id, $gsn) {
        echo "START TRANSACTION: trx_id: $uow_id, Prev GSN: $gsn";
    }
 
    static function commit_trx($uow_id, $gsn) {
        echo "COMMIT: trx_id: $uow_id, Last GSN: $gsn";
    }
 
    static function rollback_trx($uow_id) {
        echo "ROLLBACK: trx_id: $uow_id";
    }
 
    static function insert($row, $db, $table, $trx_id, $gsn) {
        echo "TRX_ID: $trx_id, Schema:$db, Table: $table, DML: INSERT, AT: $gsn"; print_r($row);    
    }
    
    static function delete($row, $db, $table, $trx_id, $gsn) {
        echo "TRX_ID: $trx_id, Schema:$db, Table: $table, DML: DELETE, AT: $gsn"; print_r($row);    
    }
    
    static function update_before($row, $db, $table, $trx_id, $gsn) {
        echo "TRX_ID: $trx_id, Schema:$db, Table: $table, DML: UPDATE (OLD), AT: $gsn"; print_r($row);    
    }
 
    static function update_after($row, $db, $table, $trx_id, $gsn) {
        echo "TRX_ID: $trx_id, Schema:$db, Table: $table, DML: UPDATE (NEW), AT: $gsn"; print_r($row);    
    }
}

Важное примечание: Вы должны определить все семь из этих функций в вашем плагине, даже если вы не планируете иметь действия для каждого из обратных вызовов — просто оставьте тело функции пустым, чтобы не делать никаких действий (вызов в этом случае просто noop). ) Обратите внимание, что функции плагина должны быть объявлены  STATIC . Это связано с тем, что FlexCDC вызывает функции.

Обратные вызовы состояния транзакции
Есть три функции обратного вызова, которые уведомляют плагин об изменениях состояния транзакции . Прежде чем перейти к тому, что они делают, я хочу отметить параметры $ trx_id и $ gsn, которые присутствуют в каждом обратном вызове. Каждой транзакции назначается монотонно увеличивающийся идентификатор транзакции. Эта последовательность однозначно идентифицирует каждую транзакцию, которую обрабатывает FlexCDC. Кроме того, каждому изменению строки назначается уникальный порядковый номер, который FlexCDC называет общим порядковым номером (или GSN).

Как видите, обратному вызову start_trx (…) (вызываемому при запуске транзакции) передается как новый номер транзакции, так и самый высокий GSN, использованный в предыдущей транзакции. Это называется высшей отметкой GSN (GSNHWM). При фиксации транзакции вызывается обратный вызов commit_trx (…), а идентификатор транзакции и последний GSN, назначенный в транзакции, передаются в обратный вызов. Это же значение будет отображаться как GSNHWM в следующем обратном вызове start_trx (…). Наконец, при откате все порядковые номера, назначенные в этой транзакции, будут использованы повторно, поэтому GSN не будет передан обратному вызову отката, а есть идентификатор транзакции, который позволяет точно определить, какая транзакция откатывается.

Обратные вызовы изменения строки

Каждая из четырех функций обратного вызова изменения строки фиксирует конкретное изменение набора данных. Каждая из функций принимает пять параметров. Первый ($ row) — это массив, который содержит строку, с которой выполняется действие. Вторым ($ db) является схема, которая содержит строку. Третий ($ table) — это таблица, содержащая строку. Каждый обратный вызов также получает идентификатор транзакции, и, конечно, каждому изменению строки назначается уникальный GSN.

Например:
Обновление будет запускать обратные вызовы update_before (…) и update_after (…) с изображениями строк до и после изменения, соответственно. Пример этого есть в конце поста.

Настройка FlexCDC для использования плагина

FlexCDC по умолчанию использует файл конфигурации с именем consumer.ini. В раздел [flexcdc] добавьте:
plugin = plugin_file.php

Плагин должен находиться в каталоге FlexCDC include /  . Вы найдете example_plugin.php в этом каталоге, чтобы служить примером.

Как это работает

Flexviews использует mysqlbinlog для декодирования двоичного журнала с исходного сервера. Он использует опцию –decode-row = ROWS для декодирования RBR в формат, который может быть проанализирован внешней утилитой. FlexCDC собирает информацию о каждой транзакции и изменениях строк, происходящих в базе данных (что означает, что для этого требуется использование двоичного ведения журнала на основе ROW). Когда определяется плагин, обычные действия, используемые FlexCDC, переопределяются функциями обратного вызова.

Вот выходные данные примера плагина для обновления, которое затронуло 3 строки (update test.t3 set c1 = c1 — 1):

START TRANSACTION: trx_id: 44, Prev GSN: 107
TRX_ID: 44, Schema:test, Table: t3, DML: UPDATE (OLD), AT: 108
Array
(
    [c1] => -3
    [c2] => 1
)
TRX_ID: 44, Schema:test, Table: t3, DML: UPDATE (NEW), AT: 109
Array
(
    [c1] => -4
    [c2] => 1
)
TRX_ID: 44, Schema:test, Table: t3, DML: UPDATE (OLD), AT: 110
Array
(
    [c1] => -5
    [c2] => 2
)
TRX_ID: 44, Schema:test, Table: t3, DML: UPDATE (NEW), AT: 111
Array
(
    [c1] => -6
    [c2] => 2
)
TRX_ID: 44, Schema:test, Table: t3, DML: UPDATE (OLD), AT: 112
Array
(
    [c1] => -5
    [c2] => 2
)
TRX_ID: 44, Schema:test, Table: t3, DML: UPDATE (NEW), AT: 113
Array
(
    [c1] => -6
    [c2] => 2
)
COMMIT: trx_id: 44, Last GSN: 113

Вы должны заметить, что FlexCDC предоставляет имена столбцов для данных, поступающих из двоичного журнала. Это потому, что таблица журналов существует в экземпляре dest, и FlexCDC может получить список имен столбцов оттуда. Когда вы используете другие инструменты CDC, такие как C binlog API, вы не получаете имена столбцов.