Apache Hadoop обычно используется для анализа данных. Это быстро для загрузки данных и масштабируемым. В предыдущем посте я показал, как
интегрировать MySQL с Hadoop . В этом посте я покажу, как экспортировать таблицу из MySQL в Hadoop, загрузить данные в Cloudera Impala (столбчатый формат) и запустить отчет в дополнение к этому. В приведенных ниже примерах я буду использовать данные о «своевременности полета» из моего предыдущего поста (
Повышение производительности MySQL с параллельным выполнением запросов ). Я использовал
Cloudera Manager v.4 установить Apache Hadoop и Impala. Для этого теста я (намеренно) использовал старое оборудование (серверы с 2006 года), чтобы показать, что Hadoop может использовать старое оборудование и продолжать масштабироваться. Тестовый кластер состоит из 6 датододов. Ниже приведены спецификации:
Цель | Спецификации сервера |
---|---|
Наменоде, Улей Метастор и т. Д. + Датододы | 2x PowerEdge 2950, 2 процессора L5335 с частотой 2,00 ГГц, 8 ядер, 16 ГБ ОЗУ, RAID 10 с 8 дисками SAS |
Только датододы | 4x PowerEdge SC1425, 2 процессора Xeon @ 3,00 ГГц, 2 ядра, 8 ГБ ОЗУ, один диск 4 ТБ |
Как видите, это довольно старые серверы; единственное, что я изменил, — это добавление диска емкостью 4 ТБ, чтобы можно было хранить больше данных. Hadoop обеспечивает избыточность на уровне сервера (он записывает 3 копии одного и того же блока во все датододы), поэтому нам не нужен RAID для данных датодов (необходима избыточность для наменодов).
Экспорт данных
Есть несколько способов экспортировать данные из MySQL в Hadoop. Для этого теста я просто экспортировал таблицу времени в текстовый файл с:
select * into outfile '/tmp/ontime.psv' FIELDS TERMINATED BY ',' from ontime;
(вы можете использовать «|» или любой другой символ в качестве разделителя). Кроме того, вы можете загрузить данные непосредственно с сайта www.transtats.bts.gov, используя этот простой скрипт:
for y in {1988..2013} do for i in {1..12} do u="http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_${y}_${i}.zip" wget $u -o ontime.log unzip On_Time_On_Time_Performance_${y}_${i}.zip done done
Загрузить в Hadoop HDFS
Первое, что нам нужно сделать, это загрузить данные в HDFS в виде набора файлов. Hive или Impala будет работать с каталогом, в который вы импортировали свои данные, и объединит все файлы в этом каталоге. В нашем случае легко просто скопировать все наши файлы в каталог внутри HDFS.
$ hdfs dfs -mkdir /data/ontime/ $ hdfs -v dfs -copyFromLocal On_Time_On_Time_Performance_*.csv /data/ontime/
Создать внешний стол в Impala
Теперь, когда у нас загружены все файлы данных, мы можем создать внешнюю таблицу:
CREATE EXTERNAL TABLE ontime_csv ( YearD int , Quarter tinyint , MonthD tinyint , DayofMonth tinyint , DayOfWeek tinyint , FlightDate string , UniqueCarrier string , AirlineID int , Carrier string , TailNum string , FlightNum string , OriginAirportID int , OriginAirportSeqID int , OriginCityMarketID int , Origin string , OriginCityName string , OriginState string , OriginStateFips string , OriginStateName string , OriginWac int , DestAirportID int , DestAirportSeqID int , DestCityMarketID int , Dest string , ... ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '/data/ontime';
Обратите внимание на ключевое слово «EXTERNAL» и LOCATION (LOCATION указывает на каталог внутри HDFS, а не на файл). Импала будет создавать только метаинформацию (не будет изменять таблицу). Мы можем запросить эту таблицу прямо сейчас, однако Impala необходимо будет сканировать все файлы (полное сканирование) на предмет запросов.
Пример:
[d30.local:21000] > select yeard, count(*) from ontime_psv group by yeard; Query: select yeard, count(*) from ontime_psv group by yeard +-------+----------+ | yeard | count(*) | +-------+----------+ | 2010 | 6450117 | | 2013 | 5349447 | | 2009 | 6450285 | | 2002 | 5271359 | | 2004 | 7129270 | | 1997 | 5411843 | | 2012 | 6096762 | | 2005 | 7140596 | | 1999 | 5527884 | | 2007 | 7455458 | | 1994 | 5180048 | | 2008 | 7009726 | | 1988 | 5202096 | | 2003 | 6488540 | | 1996 | 5351983 | | 1989 | 5041200 | | 2011 | 6085281 | | 1998 | 5384721 | | 1991 | 5076925 | | 2006 | 7141922 | | 1993 | 5070501 | | 2001 | 5967780 | | 1995 | 5327435 | | 1990 | 5270893 | | 1992 | 5092157 | | 2000 | 5683047 | +-------+----------+ Returned 26 row(s) in 131.38s
(Обратите внимание, что «сгруппировать по» не будет сортировать строки, в отличие от MySQL. Для сортировки нам потребуется добавить «ORDER BY yeard»)
Объясните план:
Query: explain select yeard, count(*) from ontime_csv group by yeard +-----------------------------------------------------------+ | Explain String | +-----------------------------------------------------------+ | PLAN FRAGMENT 0 | | PARTITION: UNPARTITIONED | | | | 4:EXCHANGE | | | | PLAN FRAGMENT 1 | | PARTITION: HASH_PARTITIONED: yeard | | | | STREAM DATA SINK | | EXCHANGE ID: 4 | | UNPARTITIONED | | | | 3:AGGREGATE (merge finalize) | | | output: SUM(COUNT(*)) | | | group by: yeard | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 2 | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 2 | | HASH_PARTITIONED: yeard | | | | 1:AGGREGATE | | | output: COUNT(*) | | | group by: yeard | | | | | 0:SCAN HDFS | | table=ontime.ontime_csv #partitions=1/1 size=45.68GB | +-----------------------------------------------------------+ Returned 31 row(s) in 0.13s
Как мы видим, он будет сканировать 45 ГБ данных.
Импала с колонным форматом и сжатием
Большое преимущество Impala заключается в том, что он поддерживает колоночный формат и сжатие. Я пробовал новый формат «паркет» с «сжатым» кодеком сжатия . Так как наша таблица очень широка (и не нормализована), это поможет использовать столбчатый формат . Чтобы воспользоваться преимуществами формата «паркет», нам нужно загрузить в него данные, что легко сделать, если у нас уже есть таблица внутри импалы и файлы внутри HDFS:
[d30.local:21000] > set PARQUET_COMPRESSION_CODEC=snappy; [d30.local:21000] > create table ontime_parquet_snappy LIKE ontime_parquet_snappy STORED AS PARQUET; [d30.local:21000] > insert into ontime_parquet_snappy select * from ontime_csv; Query: insert into ontime_parquet_snappy select * from ontime_csv Inserted 152657276 rows in 729.76s
Затем мы можем проверить наш запрос по новой таблице:
Query: explain select yeard, count(*) from ontime_parquet_snappy group by yeard +---------------------------------------------------------------------+ | Explain String | +---------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | PARTITION: UNPARTITIONED | | | | 4:EXCHANGE | | | | PLAN FRAGMENT 1 | | PARTITION: HASH_PARTITIONED: yeard | | | | STREAM DATA SINK | | EXCHANGE ID: 4 | | UNPARTITIONED | | | | 3:AGGREGATE (merge finalize) | | | output: SUM(COUNT(*)) | | | group by: yeard | | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 2 | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 2 | | HASH_PARTITIONED: yeard | | | | 1:AGGREGATE | | | output: COUNT(*) | | | group by: yeard | | | | | 0:SCAN HDFS | | table=ontime.ontime_parquet_snappy #partitions=1/1 size=3.95GB | +---------------------------------------------------------------------+ Returned 31 row(s) in 0.02s
Как мы видим, он будет сканировать гораздо меньший объем данных: 3,95 (со сжатием) по сравнению с 45 ГБ.
Полученные результаты:
Query: select yeard, count(*) from ontime_parquet_snappy group by yeard +-------+----------+ | yeard | count(*) | +-------+----------+ | 2010 | 6450117 | | 2013 | 5349447 | | 2009 | 6450285 | ... Returned 26 row(s) in 4.17s
И время отклика также намного лучше.
Пример сложного запроса Impala
Я использовал сложный запрос из моего предыдущего поста . Мне пришлось адаптировать его для использования с Impala: он не поддерживает нотацию «sum (ArrDelayMinutes> 30)», но «sum (if (ArrDelayMinutes> 30, 1, 0)» работает нормально.
select min(yeard), max(yeard), Carrier, count(*) as cnt, sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed, round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate FROM ontime_parquet_snappy WHERE DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI') and DestState not in ('AK', 'HI', 'PR', 'VI') and flightdate < '2010-01-01' GROUP by carrier HAVING cnt > 100000 and max(yeard) > 1990 ORDER by rate DESC LIMIT 1000;
Запрос специально разработан таким образом, чтобы он не использовал индексы: большинство условий отфильтровывают менее 30% данных.
Импала результаты:
+------------+------------+---------+----------+-----------------+------+ | min(yeard) | max(yeard) | carrier | cnt | flights_delayed | rate | +------------+------------+---------+----------+-----------------+------+ | 2003 | 2009 | EV | 1454777 | 237698 | 0.16 | | 2003 | 2009 | FL | 1082489 | 158748 | 0.15 | | 2006 | 2009 | XE | 1016010 | 152431 | 0.15 | | 2003 | 2009 | B6 | 683874 | 103677 | 0.15 | | 2006 | 2009 | YV | 740608 | 110389 | 0.15 | | 2003 | 2005 | DH | 501056 | 69833 | 0.14 | | 2001 | 2009 | MQ | 3238137 | 448037 | 0.14 | | 2004 | 2009 | OH | 1195868 | 160071 | 0.13 | | 2003 | 2006 | RU | 1007248 | 126733 | 0.13 | | 2003 | 2006 | TZ | 136735 | 16496 | 0.12 | | 1988 | 2009 | UA | 9593284 | 1197053 | 0.12 | | 1988 | 2009 | AA | 10600509 | 1185343 | 0.11 | | 1988 | 2001 | TW | 2659963 | 280741 | 0.11 | | 1988 | 2009 | CO | 6029149 | 673863 | 0.11 | | 2007 | 2009 | 9E | 577244 | 59440 | 0.10 | | 1988 | 2009 | US | 10276941 | 991016 | 0.10 | | 2003 | 2009 | OO | 2654259 | 257069 | 0.10 | | 1988 | 2009 | NW | 7601727 | 725460 | 0.10 | | 1988 | 2009 | DL | 11869471 | 1156267 | 0.10 | | 1988 | 2009 | AS | 1506003 | 146920 | 0.10 | | 1988 | 2005 | HP | 2607603 | 235675 | 0.09 | | 2005 | 2009 | F9 | 307569 | 28679 | 0.09 | | 1988 | 1991 | PA | 206841 | 19465 | 0.09 | | 1988 | 2009 | WN | 12722174 | 1107840 | 0.09 | +------------+------------+---------+----------+-----------------+------+ Returned 24 row(s) in 15.28s
15,28 секунды значительно быстрее, чем исходные результаты MySQL (15 минут 56,40 секунд без параллельного выполнения и 5 минут 47 с параллельным выполнением). Однако это не «сравнение яблок с яблоками»:
- MySQL будет сканировать 45 ГБ данных, а Impala с паркетом будет сканировать только 3,5 ГБ.
- MySQL будет работать на одном сервере, Hadoop + Impala будет работать параллельно на 6 серверах.
Тем не менее, Hadoop + Implala демонстрирует впечатляющую производительность и возможность масштабирования, что может помочь при анализе большого объема данных.
Вывод
Hadoop + Impala предоставит нам простой способ анализа больших наборов данных с использованием SQL с возможностью масштабирования даже на старом оборудовании.