Статьи

Использование Apache Hadoop и Impala вместе с MySQL для анализа данных

Этот пост был изначально написан Александром Рубином для блога MySQL .

Apache Hadoop обычно используется для анализа данных. Это быстро для загрузки данных и масштабируемым. В предыдущем посте я показал, как
интегрировать MySQL с Hadoop . В этом посте я покажу, как экспортировать таблицу из MySQL в Hadoop, загрузить данные в Cloudera Impala (столбчатый формат) и запустить отчет в дополнение к этому. В приведенных ниже примерах я буду использовать данные о «своевременности полета» из моего предыдущего поста (
Повышение производительности MySQL с параллельным выполнением запросов ). Я использовал
Cloudera Manager v.4 установить Apache Hadoop и Impala. Для этого теста я (намеренно) использовал старое оборудование (серверы с 2006 года), чтобы показать, что Hadoop может использовать старое оборудование и продолжать масштабироваться. Тестовый кластер состоит из 6 датододов. Ниже приведены спецификации:

Цель Спецификации сервера
Наменоде, Улей Метастор и т. Д. + Датододы 2x PowerEdge 2950, ​​2 процессора L5335 с частотой 2,00 ГГц, 8 ядер, 16 ГБ ОЗУ, RAID 10 с 8 дисками SAS
Только датододы 4x PowerEdge SC1425, 2 процессора Xeon @ 3,00 ГГц, 2 ядра, 8 ГБ ОЗУ, один диск 4 ТБ

Как видите, это довольно старые серверы; единственное, что я изменил, — это добавление диска емкостью 4 ТБ, чтобы можно было хранить больше данных. Hadoop обеспечивает избыточность на уровне сервера (он записывает 3 копии одного и того же блока во все датододы), поэтому нам не нужен RAID для данных датодов (необходима избыточность для наменодов).

Экспорт данных

Есть несколько способов экспортировать данные из MySQL в Hadoop. Для этого теста я просто экспортировал таблицу времени в текстовый файл с:

select * into outfile '/tmp/ontime.psv' 
FIELDS TERMINATED BY ','
from ontime;

(вы можете использовать «|» или любой другой символ в качестве разделителя). Кроме того, вы можете загрузить данные непосредственно с  сайта www.transtats.bts.gov, используя этот простой скрипт:

for y in {1988..2013}
do
        for i in {1..12}
        do
                u="http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_${y}_${i}.zip"
                wget $u -o ontime.log  
                unzip On_Time_On_Time_Performance_${y}_${i}.zip
        done
done

Загрузить в Hadoop HDFS

Первое, что нам нужно сделать, это загрузить данные в HDFS в виде набора файлов. Hive или Impala будет работать с каталогом, в который вы импортировали свои данные, и объединит все файлы в этом каталоге. В нашем случае легко просто скопировать все наши файлы в каталог внутри HDFS.

$ hdfs dfs -mkdir /data/ontime/
$ hdfs -v dfs -copyFromLocal On_Time_On_Time_Performance_*.csv /data/ontime/

Создать внешний стол в Impala

Теперь, когда у нас загружены все файлы данных, мы можем создать внешнюю таблицу:

CREATE EXTERNAL TABLE ontime_csv (
YearD int ,
Quarter tinyint ,
MonthD tinyint ,
DayofMonth tinyint ,
DayOfWeek tinyint ,
FlightDate string ,
UniqueCarrier string ,
AirlineID int ,
Carrier string ,
TailNum string ,
FlightNum string ,
OriginAirportID int ,
OriginAirportSeqID int ,
OriginCityMarketID int ,
Origin string ,
OriginCityName string ,
OriginState string ,
OriginStateFips string ,
OriginStateName string ,
OriginWac int ,
DestAirportID int ,
DestAirportSeqID int ,
DestCityMarketID int ,
Dest string ,
...
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE 
LOCATION '/data/ontime';

Обратите внимание на ключевое слово «EXTERNAL» и LOCATION (LOCATION указывает на каталог внутри HDFS, а не на файл). Импала будет создавать только метаинформацию (не будет изменять таблицу). Мы можем запросить эту таблицу прямо сейчас, однако Impala необходимо будет сканировать все файлы (полное сканирование) на предмет запросов.

Пример:

[d30.local:21000] > select yeard, count(*) from ontime_psv  group by yeard;
Query: select yeard, count(*) from ontime_psv  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
| 2002  | 5271359  |
| 2004  | 7129270  |
| 1997  | 5411843  |
| 2012  | 6096762  |
| 2005  | 7140596  |
| 1999  | 5527884  |
| 2007  | 7455458  |
| 1994  | 5180048  |
| 2008  | 7009726  |
| 1988  | 5202096  |
| 2003  | 6488540  |
| 1996  | 5351983  |
| 1989  | 5041200  |
| 2011  | 6085281  |
| 1998  | 5384721  |
| 1991  | 5076925  |
| 2006  | 7141922  |
| 1993  | 5070501  |
| 2001  | 5967780  |
| 1995  | 5327435  |
| 1990  | 5270893  |
| 1992  | 5092157  |
| 2000  | 5683047  |
+-------+----------+
Returned 26 row(s) in 131.38s

(Обратите внимание, что «сгруппировать по» не будет сортировать строки, в отличие от MySQL. Для сортировки нам потребуется добавить «ORDER BY yeard»)

Объясните план:

Query: explain select yeard, count(*) from ontime_csv  group by yeard
+-----------------------------------------------------------+
| Explain String                                            |
+-----------------------------------------------------------+
| PLAN FRAGMENT 0                                           |
|   PARTITION: UNPARTITIONED                                |
|                                                           |
|   4:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 1                                           |
|   PARTITION: HASH_PARTITIONED: yeard                      |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 4                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   3:AGGREGATE (merge finalize)                            |
|   |  output: SUM(COUNT(*))                                |
|   |  group by: yeard                                      |
|   |                                                       |
|   2:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 2                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 2                                        |
|     HASH_PARTITIONED: yeard                               |
|                                                           |
|   1:AGGREGATE                                             |
|   |  output: COUNT(*)                                     |
|   |  group by: yeard                                      |
|   |                                                       |
|   0:SCAN HDFS                                             |
|      table=ontime.ontime_csv #partitions=1/1 size=45.68GB |
+-----------------------------------------------------------+
Returned 31 row(s) in 0.13s

Как мы видим, он будет сканировать 45 ГБ данных.

Импала с колонным форматом и сжатием

Большое преимущество Impala заключается в том, что он поддерживает колоночный формат и сжатие. Я пробовал новый формат «паркет» с «сжатым» кодеком сжатия . Так как наша таблица очень широка (и не нормализована), это поможет использовать столбчатый формат . Чтобы воспользоваться преимуществами формата «паркет», нам нужно загрузить в него данные, что легко сделать, если у нас уже есть таблица внутри импалы и файлы внутри HDFS:

[d30.local:21000] > set PARQUET_COMPRESSION_CODEC=snappy;
[d30.local:21000] > create table ontime_parquet_snappy LIKE ontime_parquet_snappy STORED AS PARQUET;
[d30.local:21000] > insert into ontime_parquet_snappy select * from ontime_csv;
Query: insert into ontime_parquet_snappy select * from ontime_csv
Inserted 152657276 rows in 729.76s

Затем мы можем проверить наш запрос по новой таблице:

Query: explain select yeard, count(*) from ontime_parquet_snappy  group by yeard
+---------------------------------------------------------------------+
| Explain String                                                      |
+---------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                     |
|   PARTITION: UNPARTITIONED                                          |
|                                                                     |
|   4:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 1                                                     |
|   PARTITION: HASH_PARTITIONED: yeard                                |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 4                                                  |
|     UNPARTITIONED                                                   |
|                                                                     |
|   3:AGGREGATE (merge finalize)                                      |
|   |  output: SUM(COUNT(*))                                          |
|   |  group by: yeard                                                |
|   |                                                                 |
|   2:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 2                                                     |
|   PARTITION: RANDOM                                                 |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 2                                                  |
|     HASH_PARTITIONED: yeard                                         |
|                                                                     |
|   1:AGGREGATE                                                       |
|   |  output: COUNT(*)                                               |
|   |  group by: yeard                                                |
|   |                                                                 |
|   0:SCAN HDFS                                                       |
|      table=ontime.ontime_parquet_snappy #partitions=1/1 size=3.95GB |
+---------------------------------------------------------------------+
Returned 31 row(s) in 0.02s

Как мы видим, он будет сканировать гораздо меньший объем данных: 3,95 (со сжатием) по сравнению с 45 ГБ.

Полученные результаты:

Query: select yeard, count(*) from ontime_parquet_snappy  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
...
Returned 26 row(s) in 4.17s

И время отклика также намного лучше.

Пример сложного запроса Impala

Я использовал сложный запрос из моего предыдущего поста . Мне пришлось адаптировать его для использования с Impala: он не поддерживает нотацию «sum (ArrDelayMinutes> 30)», но «sum (if (ArrDelayMinutes> 30, 1, 0)» работает нормально.

select
   min(yeard), max(yeard), Carrier, count(*) as cnt,
   sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed,
   round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate
FROM ontime_parquet_snappy
WHERE
   DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI')
   and DestState not in ('AK', 'HI', 'PR', 'VI')
   and flightdate < '2010-01-01'
GROUP by carrier
HAVING cnt > 100000 and max(yeard) > 1990
ORDER by rate DESC
LIMIT 1000;

Запрос специально разработан таким образом, чтобы он не использовал индексы: большинство условий отфильтровывают менее 30% данных.

Импала результаты:

+------------+------------+---------+----------+-----------------+------+
| min(yeard) | max(yeard) | carrier | cnt      | flights_delayed | rate |
+------------+------------+---------+----------+-----------------+------+
| 2003       | 2009       | EV      | 1454777  | 237698          | 0.16 |
| 2003       | 2009       | FL      | 1082489  | 158748          | 0.15 |
| 2006       | 2009       | XE      | 1016010  | 152431          | 0.15 |
| 2003       | 2009       | B6      | 683874   | 103677          | 0.15 |
| 2006       | 2009       | YV      | 740608   | 110389          | 0.15 |
| 2003       | 2005       | DH      | 501056   | 69833           | 0.14 |
| 2001       | 2009       | MQ      | 3238137  | 448037          | 0.14 |
| 2004       | 2009       | OH      | 1195868  | 160071          | 0.13 |
| 2003       | 2006       | RU      | 1007248  | 126733          | 0.13 |
| 2003       | 2006       | TZ      | 136735   | 16496           | 0.12 |
| 1988       | 2009       | UA      | 9593284  | 1197053         | 0.12 |
| 1988       | 2009       | AA      | 10600509 | 1185343         | 0.11 |
| 1988       | 2001       | TW      | 2659963  | 280741          | 0.11 |
| 1988       | 2009       | CO      | 6029149  | 673863          | 0.11 |
| 2007       | 2009       | 9E      | 577244   | 59440           | 0.10 |
| 1988       | 2009       | US      | 10276941 | 991016          | 0.10 |
| 2003       | 2009       | OO      | 2654259  | 257069          | 0.10 |
| 1988       | 2009       | NW      | 7601727  | 725460          | 0.10 |
| 1988       | 2009       | DL      | 11869471 | 1156267         | 0.10 |
| 1988       | 2009       | AS      | 1506003  | 146920          | 0.10 |
| 1988       | 2005       | HP      | 2607603  | 235675          | 0.09 |
| 2005       | 2009       | F9      | 307569   | 28679           | 0.09 |
| 1988       | 1991       | PA      | 206841   | 19465           | 0.09 |
| 1988       | 2009       | WN      | 12722174 | 1107840         | 0.09 |
+------------+------------+---------+----------+-----------------+------+
Returned 24 row(s) in 15.28s

15,28 секунды значительно быстрее, чем исходные результаты MySQL (15 минут 56,40 секунд без параллельного выполнения и 5 минут 47 с параллельным выполнением). Однако это не «сравнение яблок с яблоками»:

  • MySQL будет сканировать 45 ГБ данных, а Impala с паркетом будет сканировать только 3,5 ГБ.
  • MySQL будет работать на одном сервере, Hadoop + Impala будет работать параллельно на 6 серверах.

Тем не менее, Hadoop + Implala демонстрирует впечатляющую производительность и возможность масштабирования, что может помочь при анализе большого объема данных.

Вывод

Hadoop + Impala предоставит нам простой способ анализа больших наборов данных с использованием SQL с возможностью масштабирования даже на старом оборудовании.