Статьи

Киба: ETL сделано правильно

Снимок экрана 2015-06-25 12.57.05

Я «консультант» или «подрядчик», которые являются двумя из различных слов, которые начинаются с «con», как меня называли. По сути, у моих клиентов есть зуд, который они хотят поцарапать с помощью технологии, и они нанимают мою компанию для выполнения этой работы.

Я не в восторге от термина «консультант», но это отличная шутка , если вы ее еще не слышали.

Как нанятая цифровая пушка, я часто занимаюсь перемещением приложений и систем с одной платформы на другую. Из ASP в Ruby или из SQL Server в PostgreSQL. Во многих из этих случаев существуют данные, которые живут в «старой» системе, которые необходимо переместить / обновить / преобразовать в «новую» систему. Канонический термин «миграция данных», и он вызывает страх в сердцах тех, у кого был проект, просроченный на месяцы или годы из-за его осложнений. Это был последний раз, когда Scope of Work (SOW) требовал переноса данных:

kiba1

Даже на первый взгляд простые миграции могут быть ужасными. Это те, которые я собираюсь посмотреть в сегодняшнем посте. Простые миграции, получение данных из одного источника, небольшая трансформация, а затем сохранение этих данных в новом месте назначения. Мне нужно перенести информацию о пользователях из одной системы в новую систему, которую мы создаем. Сама идея миграции заставляла меня стучать зубами, пока я не нашел Кибу.

Киба на помощь

Kiba — это «облегченная среда Ruby ETL». ETL, для тех, кто не знает, расшифровывается как E ract, T transform и L oad. ETL — это еще один способ сказать «миграция данных», но он немного ближе к линии фронта. ETL обычно означает, что есть известный источник или источники, наряду с известным назначением (ями). Когда вы будете готовы говорить об ETL, вы говорите о том, как извлечь (извлечь) данные из источника (ов), изменить (преобразовать) их в соответствии с требованиями, а затем записать (загрузить) их в новое место назначения. ). Вот и все. Три простых шага, которые могут съесть график вашего проекта, как я ем блины (я люблю блины).

kiba2

Киба стремится решить эту проблему, и я должен сказать, что я был очень рад ее найти. Посмотрев на различные решения ETL в течение многих лет, я выполнял эту работу, я был настроен скептически. Я в восторге, чтобы сказать, Киба проста и гибка.

В репозитории Kiba Github есть отличный обзор того, как работает Kiba, который охватывает гораздо больше деталей, чем я буду здесь. Прочитав этот пост, вы сможете получить больше информации о Кибе.

Kiba была создана Thibaut Barrere , бывшим разработчиком камня activewarehouse-etl.

Обработка данных как конвейер

В отличном сообщении в блоге Thibaut рассказывает, как писать код для обработки данных. Вывод заключается в том, что он должен быть похож на конвейер , где источник подает данные в один конец, данные обрабатываются различными преобразованиями, а затем выходит на другой конец конвейера в пункт назначения.

kibapipeline

Изображение выше должно привести точку домой. Источник (MySQL) подает первый канал. Вы можете связать столько трансформируемых каналов вместе, чтобы составить ваш конвейер. Выше средняя труба может добавить что-то сверху или отфильтровать вещи снизу. Эти каналы могут, конечно, добавлять / удалять / изменять данные, но они также могут быть условными. Ваши преобразования могут реагировать на поступающие данные, делая то, что вам нужно, когда вам это нужно.

Кроме того, Kiba позволяет создавать многоразовые преобразования, то есть каналы, которые вы можете взять где угодно. Эти преобразования могут быть протестированы сами по себе, что принесет в ваш ETL полностью зрелый цикл разработки.

Помимо источников, трансформаций и мест назначения, Киба предлагает и другие вкусности. Проект ETL может включать в себя пре- и постпроцессоры. Предпроцессор запускается до доступа к источникам, а постпроцессор запускается после записи всех строк в место назначения.

Вот пример преобразования из документов Кибы:

# declare a ruby method here, for quick reusable logic def parse_french_date(date) Date.strptime(date, '%d/%m/%Y') end # or better, include a ruby file which loads reusable assets # eg: commonly used sources / destinations / transforms, under unit-test require_relative 'common' # declare a pre-processor: a block called before the first row is read pre_process do # do something end # declare a source where to take data from (you implement it - see notes below) source MyCsvSource, 'input.csv' # declare a row transform to process a given field transform do |row| row[:birth_date] = parse_french_date(row[:birth_date]) # return to keep in the pipeline row end # declare another row transform, dismissing rows conditionally by returning nil transform do |row| row[:birth_date].year < 2000 ? row : nil end # declare a row transform as a class, which can be tested properly transform ComplianceCheckTransform, eula: 2015 # before declaring a definition, maybe you'll want to retrieve credentials config = YAML.load(IO.read('config.yml')) # declare a destination - like source, you implement it (see below) destination MyDatabaseDestination, config['my_database'] # declare a post-processor: a block called after all rows are successfully processed post_process do # do something end 

За этим проектом ETL очень легко следовать, и он должен дать вам представление о том, как работает Kiba.

Закончено извлечение настроек, время трансформировать этот пост и загрузить демо

На этом этапе ваше понимание Кибы должно послужить вам хорошо, когда я прохожу свой проект ETL. Как я упоминал ранее, мне нужно перенести учетные записи пользователей из одной системы в другую. Вот разбивка:

Киба Роль Выполнено
Источник MySQL
преобразование Игнорировать аккаунты с плохой криптографией
преобразование Получить электронную почту, пароль и соль
Место назначения PostgreSQL

Это довольно просто. ETL извлечет пользователей из таблицы MySQL, отфильтрует их, а затем запишет в PostgreSQL.

Источник

Источник Киба — это класс, который:

  • Имеет конструктор, который устанавливает любые потребности, специфичные для источника. Для источника CSV это может быть имя файла.
  • Реализует each , приводя строки к предоставленному блоку.

Такая конструкция делает источники легко выделяемыми и невероятно проверяемыми. На самом деле они настолько просты, что вам не нужно много тестировать.

Мой источник базы данных MySQL и выглядит так:

Библиотека / user_source.rb

 require 'mysql2' require 'uri' class UserSource # connect_url should look like; # mysql://user:password@localhost/dbname def initialize(connect_url) @mysql = Mysql2::Client.new(connect_hash(connect_url)) end def each results = @mysql.query('select * from accounts', as: :hash, symbolize_keys: true) results.each do |row| yield(row) end end private def connect_hash(url) u = URI.parse(url) { host: u.host, username: u.user, password: u.password, port: u.port, database: u.path[1..-1] } end end 

Я инициализирую источник с URI MySQL. each метод делает простой запрос для получения учетных записей, а затем возвращает каждую строку в блок. Серьезно, насколько это легко? Худшая часть — это разбор URI.

Приведенный выше источник прост, чтобы сделать его более (я надеюсь) обучаемым. Вы можете сделать его гораздо более общим (например, MySQLSource ), передав запрос в конструктор. Также для больших нагрузок вы можете использовать потоковую передачу MySQL. Дело в том, что это легко сделать и решать только вам.

Место назначения

Kiba Destination — это класс, который:

  • Имеет конструктор по той же причине, что и источник. Любые потребности инициализации адресата могут быть переданы здесь.
  • Реализует метод write(row) для записи строки в место назначения.
  • Реализует метод close для очистки после себя.

Назначением в этом проекте является база данных PostgreSQL:

Библиотека / user_destination.rb

 require 'pg' class UserDestination # connect_url should look like; # mysql://user:pass@localhost/dbname def initialize(connect_url) @conn = PG.connect(connect_url) @conn.prepare('insert_user_stmt', 'insert into users (email, password_digest, created_at, updated_at) values ($1, $2, $3, $4)') end def write(row) time = Time.now @conn.exec_prepared('insert_user_stmt', [ row[:email], row[:password], row[:date_created], time ]) rescue PG::Error => ex puts "ERROR for #{row[:email]}" puts ex.message # Maybe, write to db table or file end def close @conn.close @conn = nil end end 

Как видите, при инициализации места назначения я создаю готовое утверждение. Это своего рода оптимизация PostgreSQL, и она показывает, как конструктор можно использовать для выполнения настройки для конкретного места назначения.

each простая функция записывает строку, выполняя подготовленный оператор. row — это хеш с ключами для нужных нам атрибутов. В таблице назначения также есть столбцы меток времени ( updated_at , created_at ), поэтому я отвечаю за них.

Хорошей идеей будет write в операторе пункт rescue , чтобы зафиксировать ошибки и отследить, какие записи умерли. Кроме того, если вы не обнаружите ошибки, миграция остановится на первой ошибке, что часто нежелательно.

Как я уже говорил о UserSource , этот класс можно сделать более универсальным во многих отношениях.

Теперь пришло время сделать наши трубы.

Трубопровод

Это очень простой (но очень реальный) пример ETL. transform просто берет атрибуты из источника и сопоставляет их с атрибутами пункта назначения, а затем возвращает новую строку. yawwwwns

 transform do |row| newrow = {} newrow[:email] = row[:Email] newrow[:password] = row[:Password] newrow end 

Kiba предлагает множество способов реализации преобразований, и вы можете их связывать. Цепные преобразования будут выполняться в том порядке, в котором они существуют в скрипте (который мы вскоре рассмотрим). Например, вышеупомянутое преобразование может быть изменено на это:

 transform do |row| row[:crypt] == "bcrypt" ? row : nil end transform do |row| newrow = {} newrow[:email] = row[:Email] newrow[:password] = row[:Password] newrow end 

В этом случае мы удаляем исходные строки из конвейера на основе атрибута. Ухоженная.

Также возможно создать класс Ruby, который выполняет преобразование. Вот так:

 class FilterMD5 def initialize(value_to_filter) @value = value_to_filter end def process(row) row[:crypt] == value_to_filter ? row : nil end end 

Затем преобразование изменяется на:

 transform FilterMD5, "md5" transform do |row| newrow = {} newrow[:email] = row[:Email] newrow[:password] = row[:Password] newrow end 

Опять же, FilterMD5 можно сделать более общим, возможно, приняв блок, который позволил бы использовать разные фильтры. Считайте, что домашнее задание.

Возможность поместить преобразование в класс делает написание спецификаций / тестов для этого преобразования тривиальным. Киба позволяет лучшие практики. держит руку пять

Пре- и постпроцессоры

Последний фрагмент Кибы, который я хотел бы охватить, — это пре- и постпроцессоры. Как вы можете себе представить, эти элементы запускаются до того, как какие-либо записи будут получены из источника, и после того, как все записи будут записаны в место назначения, соответственно. Любая общая настройка или очистка обрабатываются этими процессорами. В примере я использую его для сообщения о прогрессе:

 start_time = Time.now pre_process do puts "*** Start ACCOUNT MIGRATION #{start_time}***" end ... source, transforms, destination ... post_process do end_time = Time.now duration_in_minutes = (end_time - start_time)/60 puts "*** End ACCOUNT MIGRATION #{end_time}***" puts "*** Duration (min): #{duration_in_minutes.round(2)}" end 

Там. Теперь у меня есть хороший опыт работы с UX для сценариста, которым я являюсь. Путь, я.

Очевидно, мои примеры тривиальны, но вы можете представить себе случаи использования, когда реальная работа идет в эти блоки. Правильно?

Полный Монти

Вот мой сценарий Киба:

migrate_users.rb

 #!/usr/bin/env ruby $:.unshift "#{File.dirname(__FILE__)}/lib" require 'user_source' require 'user_destination' require 'filter_md5' mysql_url = ENV["MYSQL_URL"] source UserSource, mysql_url start_time = Time.now pre_process do puts "*** START ACCOUNT MIGRATION #{start_time}***" end transform FilterMD5 transform do |row| puts "OK" newrow = {} newrow[:email] = row[:Email] newrow[:password] = row[:Password] newrow[:salt] = row[:Salt] newrow[:old_id] = row[:AccountId] newrow end post_process do end_time = Time.now duration_in_minutes = (end_time - start_time)/60 puts "*** End ACCOUNT MIGRATION #{end_time}***" puts "*** Duration (min): #{duration_in_minutes.round(2)}" end destination UserDestination, ENV["PG_URL"] 

Исходный, целевой и файлы преобразования находятся в каталоге lib в моем проекте.

Запустить его

Запуск скрипта Kiba состоит из вызова исполняемого файла kiba и передачи имени вашего скрипта:

 kiba migrate_users.rb 

И пошло-поехало!

 *** START ACCOUNT MIGRATION 2015-06-25 12:53:10 -0400*** ....SOME ERRORS... *** End ACCOUNT MIGRATION 2015-06-25 12:54:11 -0400*** *** 11433 records moved in 1.02 minutes 

LOL для ETL DSL

Киба подарил мне много любви за создание ETL в Ruby. На мой взгляд, это очень хорошо продуманный, мощный DSL, заполняющий пустоту в экосистеме Ruby. Я призываю вас пройтись по сайту Kiba и узнать больше о библиотеке. Документы хороши, и есть сообщения в блоге, связанные с ETL и Kiba. Если вы в конечном итоге используете его, дайте знать Тибо. Я сделал, и он использовал меня в качестве отзыва на сайте. 🙂

идет на кухню, делает блины