Статьи

Использование сервера InfiniDB MySQL с кластером Hadoop для анализа данных

Этот пост был изначально написан Александром Рубиным для MySQL Performance Blog .

В моем предыдущем посте о Hadoop и Impala  я оценивал производительность аналитических запросов в Impala.

На этот раз я попробовал  InfiniDB для Hadoop  (версия с открытым исходным кодом) на современном оборудовании с кластером Hadoop из 8 узлов. Одним из основных преимуществ (по крайней мере для меня) InifiniDB для Hadoop является то, что он хранит данные внутри кластера Hadoop, но использует сервер MySQL для выполнения запросов. Это позволяет легко «перенести» существующие аналитические инструменты. Результаты довольно интересные и многообещающие.

Быстрый How-To

Документация InfiniDB не очень понятна для пошаговых инструкций, поэтому я создал это краткое руководство:

  1. Установите кластер Hadoop (минимальная установка будет работать). Я использовал Cloudera Manager ( CDH5 ) для сравнения скорости InfiniDB с Cloudera Impala. Установите инструменты в разделах «Предварительные требования» руководства  InfiniDB для Hadoop
  2. Установите InfiniDB для двоичных файлов Hadoop  на 1 узел Hadoop (вы можете выбрать любой узел). Это установит InfiniDB и его версию MySQL (на основе MySQL 5.1).
  3. После установки вам будут предложены переменные для установки и запуска сценария postConfigure. Пример:
export JAVA_HOME=/usr/java/jdk1.6.0_31
export LD_LIBRARY_PATH=/usr/java/jdk1.6.0_31/jre/lib/amd64/server
. /root/setenv-hdfs-20
/usr/local/Calpont/bin/postConfigure

4. Сценарий postConfigure задаст вопросы. Пара неутомимых нот:

  • Обязательно используйте HDFS как «тип хранения данных».
  • Модуль производительности 1 (pm1) должен указывать на хост (имя хоста и IP), на котором выполняется скрипт postConfigure. Другие pm должны указывать на другие узлы Hadoop

Когда установка будет завершена, вы сможете войти на сервер MySQL, он использует скрипт ibdmysql, который будет вызывать mysql cli с правильными сокетом и портом. Убедитесь, что infiniDB включен, запустив «show engine», InfiniDB должен быть в списке.

Следующим шагом будет импорт данных.

Импорт данных

Сначала нам нужно создать таблицу MySQL с «engine = InfiniDB»:

CREATE TABLE `ontime` (
  `YearD` int(11) NOT NULL,
  `Quarter` tinyint(4) DEFAULT NULL,
  `MonthD` tinyint(4) DEFAULT NULL,
  `DayofMonth` tinyint(4) DEFAULT NULL,
  `DayOfWeek` tinyint(4) DEFAULT NULL,
  `FlightDate` date DEFAULT NULL,
...
) ENGINE=InfiniDB DEFAULT CHARSET=latin1

Во-вторых, я использовал cpimport для загрузки данных. Оказалось, что гораздо эффективнее и проще загружать 1 большой файл, чем 20 × 12 меньших файлов (исходные данные «времени выполнения» — 1 файл в месяц), поэтому я экспортировал данные  «время выполнения» из таблицы MySQL  и создал 1 большой файл «ontime.psv».

Я использовал следующую команду для экспорта данных в InfiniDB:

[root@n0 ontime]# /usr/local/Calpont/bin/cpimport -s '|' ontime ontime ontime.psv
2014-05-20 15:12:58 (18787) INFO : Running distributed import (mode 1) on all PMs...
2014-05-20 15:25:28 (18787) INFO : For table ontime.ontime: 155083620 rows processed and 155083620 rows inserted.
2014-05-20 15:25:28 (18787) INFO : Bulk load completed, total run time : 751.561 seconds

Данные хранятся в Hadoop:

[root@n0 ontime]# hdfs dfs -du -h /usr/local/Calpont
1.4 G /usr/local/Calpont/data1
1.4 G /usr/local/Calpont/data2
1.4 G /usr/local/Calpont/data3
1.4 G /usr/local/Calpont/data4
1.4 G /usr/local/Calpont/data5
1.4 G /usr/local/Calpont/data6
1.4 G /usr/local/Calpont/data7
1.4 G /usr/local/Calpont/data8

Общий размер данных составляет 8 × 1,4G = 11,2G (сжатый). Для сравнения размер того же набора данных в формате Impala Parquet составляет 3,6G. Оригинальный размер был ~ 60G.

[root@n0 ontime]# hdfs dfs -du -h /user/hive/warehouse
3.6 G /user/hive/warehouse/ontime_parquet_snappy

Теперь мы можем запустить 2 запроса, которые я тестировал ранее:

1. Простой групповой

mysql> select yeard, count(*) from ontime group by yeard order by yeard;
+-------+----------+
| yeard | count(*) |
+-------+----------+
|  1988 |  5202096 |
|  1989 |  5041200 |
|  1990 |  5270893 |
|  1991 |  5076925 |
|  1992 |  5092157 |
|  1993 |  5070501 |
|  1994 |  5180048 |
|  1995 |  5327435 |
|  1996 |  5351983 |
|  1997 |  5411843 |
|  1998 |  5384721 |
|  1999 |  5527884 |
|  2000 |  5683047 |
|  2001 |  5967780 |
|  2002 |  5271359 |
|  2003 |  6488540 |
|  2004 |  7129270 |
|  2005 |  7140596 |
|  2006 |  7141922 |
|  2007 |  7455458 |
|  2008 |  7009726 |
|  2009 |  6450285 |
|  2010 |  6450117 |
|  2011 |  6085281 |
|  2012 |  6096762 |
|  2013 |  6369482 |
|  2014 |  1406309 |
+-------+----------+
27 rows in set (0.22 sec)

2. Сложный запрос из моего оригинального поста:

mysql> select min(yeard), max(yeard), Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and flightdate < '2010-01-01' GROUP by carrier HAVING cnt > 100000 and max(yeard) > 1990 ORDER by rate DESC, cnt desc LIMIT  1000;
+------------+------------+---------+----------+-----------------+------+
| min(yeard) | max(yeard) | Carrier | cnt      | flights_delayed | rate |
+------------+------------+---------+----------+-----------------+------+
|       2003 |       2009 | EV      |  1454777 |          237698 | 0.16 |
|       2003 |       2009 | FL      |  1082489 |          158748 | 0.15 |
|       2006 |       2009 | YV      |   740608 |          110389 | 0.15 |
|       2006 |       2009 | XE      |  1016010 |          152431 | 0.15 |
|       2003 |       2009 | B6      |   683874 |          103677 | 0.15 |
|       2001 |       2009 | MQ      |  3238137 |          448037 | 0.14 |
|       2003 |       2005 | DH      |   501056 |           69833 | 0.14 |
|       2004 |       2009 | OH      |  1195868 |          160071 | 0.13 |
|       2003 |       2006 | RU      |  1007248 |          126733 | 0.13 |
|       1988 |       2009 | UA      |  9593284 |         1197053 | 0.12 |
|       2003 |       2006 | TZ      |   136735 |           16496 | 0.12 |
|       1988 |       2001 | TW      |  2656286 |          280283 | 0.11 |
|       1988 |       2009 | AA      | 10568437 |         1183786 | 0.11 |
|       1988 |       2009 | CO      |  6023831 |          673354 | 0.11 |
|       1988 |       2009 | DL      | 11866515 |         1156048 | 0.10 |
|       2003 |       2009 | OO      |  2654259 |          257069 | 0.10 |
|       1988 |       2009 | AS      |  1506003 |          146920 | 0.10 |
|       2007 |       2009 | 9E      |   577244 |           59440 | 0.10 |
|       1988 |       2009 | US      | 10276862 |          990995 | 0.10 |
|       1988 |       2009 | NW      |  7601727 |          725460 | 0.10 |
|       1988 |       2005 | HP      |  2607603 |          235675 | 0.09 |
|       1988 |       2009 | WN      | 12722174 |         1107840 | 0.09 |
|       2005 |       2009 | F9      |   307569 |           28679 | 0.09 |
|       1988 |       1991 | PA      |   203401 |           19263 | 0.09 |
+------------+------------+---------+----------+-----------------+------+
24 rows in set (0.86 sec)

Тот же запрос в Impala (на том же оборудовании) выполняется в течение 7,18 секунд:

  [n8.local:21000] > select min(yeard), max(yeard), Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_parquet_snappy WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and flightdate < '2010-01-01' GROUP by carrier HAVING cnt > 100000 and max(yeard) > 1990 ORDER by rate DESC LIMIT  1000;
  
Query: select min(yeard), max(yeard), Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_parquet_snappy WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and flightdate < '2010-01-01' GROUP by carrier HAVING cnt > 100000 and max(yeard) > 1990 ORDER by rate DESC LIMIT  1000
  +------------+------------+---------+----------+-----------------+------+
  | min(yeard) | max(yeard) | carrier | cnt      | flights_delayed | rate |
  +------------+------------+---------+----------+-----------------+------+
  | 2003       | 2009       | EV      | 1454777  | 237698          | 0.16 |
  | 2003       | 2009       | FL      | 1082489  | 158748          | 0.15 |
  | 2006       | 2009       | XE      | 1016010  | 152431          | 0.15 |
  | 2006       | 2009       | YV      | 740608   | 110389          | 0.15 |
  | 2003       | 2009       | B6      | 683874   | 103677          | 0.15 |
  | 2001       | 2009       | MQ      | 3238137  | 448037          | 0.14 |
  | 2003       | 2005       | DH      | 501056   | 69833           | 0.14 |
  | 2004       | 2009       | OH      | 1195868  | 160071          | 0.13 |
  | 2003       | 2006       | RU      | 1007248  | 126733          | 0.13 |
  | 1988       | 2009       | UA      | 9593284  | 1197053         | 0.12 |
  | 2003       | 2006       | TZ      | 136735   | 16496           | 0.12 |
  | 1988       | 2001       | TW      | 2656286  | 280283          | 0.11 |
  | 1988       | 2009       | CO      | 6023831  | 673354          | 0.11 |
  | 1988       | 2009       | AA      | 10568437 | 1183786         | 0.11 |
  | 1988       | 2009       | US      | 10276862 | 990995          | 0.10 |
  | 2007       | 2009       | 9E      | 577244   | 59440           | 0.10 |
  | 1988       | 2009       | DL      | 11866515 | 1156048         | 0.10 |
  | 2003       | 2009       | OO      | 2654259  | 257069          | 0.10 |
  | 1988       | 2009       | NW      | 7601727  | 725460          | 0.10 |
  | 1988       | 2009       | AS      | 1506003  | 146920          | 0.10 |
  | 1988       | 1991       | PA      | 203401   | 19263           | 0.09 |
  | 1988       | 2009       | WN      | 12722174 | 1107840         | 0.09 |
  | 1988       | 2005       | HP      | 2607603  | 235675          | 0.09 |
  | 2005       | 2009       | F9      | 307569   | 28679           | 0.09 |
  +------------+------------+---------+----------+-----------------+------+
  Returned 24 row(s) in 7.18s

Вывод и графики

Для резюме я создал следующие диаграммы:

Простой запрос:

Как мы видим, InfiniDB выглядит здесь неплохо. Он также использует протокол MySQL, поэтому существующее приложение, использующее MySQL, сможет работать здесь без каких-либо дополнительных «соединителей».

Одно замечание, касающееся моего примера запроса: «сложный» запрос разработан таким образом, что затруднит использование какого-либо конкретного набора индексов; этот запрос должен будет просканировать> 70% таблицы для генерации набора результатов. Вот почему это так медленно в MySQL по сравнению с колоночными базами данных. Другая «проблема» заключается в том, что таблица очень широка, и большинство столбцов объявлены как varchar (таблица не нормализована), что делает ее большой в MySQL. Все это сделает его идеальным для хранения и сжатия столбцов. Другие случаи могут не показывать такую ​​огромную разницу.

Пока я проводил тестирование с небольшими данными (60G), я планирую запустить тест больших данных.

Этот пост был изначально написан Александром Рубиным для MySQL Performance Blog .