Статьи

Pivotal Hadoop Distribution и HAWQ Realtime Query Engine

Вступление

SQL на Hadoop и поддержка интерактивных специальных запросов в Hadoop пользуются все большим спросом, и все поставщики предоставляют свои ответы на эти требования. В мире открытого кода Impala, Apache Drill от Cloudera (при поддержке MapR), инициативы Stinger компании Hortonworks конкурируют на этом рынке, если упомянуть лишь нескольких ключевых игроков. Существуют также сильные предложения от поставщиков BI и аналитики, таких как Pivotal (HAWQ), Teradata (SQL-H) или IBM (BigSQL). В этой статье мы рассмотрим Pivotal Hadoop Distribution (Pivotal HD) и HAWQ, интерактивный механизм распределенных SQL-запросов Pivotal.

Начало работы с Pivotal HD

Pivotal HD содержит наиболее известные компоненты с открытым исходным кодом, такие как HDFS, MapReduce, YARN, Hive, Pig, HBase, Flume, Sqoop и Mahout. Также доступны дополнительные компоненты, такие как Командный центр, Unified Storage Services, Data Loader, Spring и HAWQ в качестве дополнения. (У Pivotal есть предложение под названием GemFire ​​XD, которое представляет собой распределенную сетку данных в памяти, но это выходит за рамки нашего текущего обсуждения).PivotalHD_ArchitectDiagram

Давайте рассмотрим пример использования Pivotal HD для ответа на следующий вопрос: какова была самая высокая цена акций Apple, Google и Nokia и когда эти акции достигли пиковой стоимости? Сначала мы разработаем алгоритм MapReduce для вычисления этих значений, а затем запустим SQL-запросы в HAWQ, чтобы получить тот же результат. Наша тестовая среда основана на виртуальной машине Pivotal HD Single Node, работающей на VMWare VMPlayer, и использует 64-битный дистрибутив CentOS 6.4. Виртуальная машина Pivotal HD не содержит Eclipse, поэтому нам пришлось загружать ее отдельно с eclipse.org . После того, как мы установили среду, следующим шагом будет создание проекта maven.

$ mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=highest_stock_price -DartifactId=highest_stock_price

Эта команда создаст pom.xml, где у нас есть основные настройки проекта и junit, добавленные в качестве зависимости. Затем нам нужно отредактировать pom.xml и добавить другие соответствующие зависимости и настройки сборки. После этого мы можем начать писать наше приложение Hadoop в Eclipse. Код также загружен в
Github (https://github.com/iszegedi/Pivotal-HD-and-HAWQ-blog) для ознакомления.
поворотное-затмение-DEV

Ключевыми классами Java являются HighestStockPriceDriver.java, который является основным файлом драйвера для нашего приложения MapReduce, HighestStockPriceMapper.java, который содержит функцию map () и HighestStockPriceReducer.java, который выполняет функцию redu (). Затем мы можем скомпилировать код и упаковать его в файл jar:

$ mvn clean compile
$ mvn -DskipTests package

Следующим шагом является копирование наших наборов данных в каталог Hadoop HDFS.

$ hadoop fs -mkdir /stock_demo/input
$ hadoop fs -put *.csv /stock_demo/input/
$ hadoop fs -ls /stock_demo/input/
Found 3 items
-rw-r--r--   1 gpadmin hadoop     403395 2013-12-31 00:25 /stock_demo/input/apple.csv
-rw-r--r--   1 gpadmin hadoop     134696 2013-12-31 00:25 /stock_demo/input/google.csv
-rw-r--r--   1 gpadmin hadoop     248405 2013-12-31 00:25 /stock_demo/input/nokia.csv

Формат файлов (apple.csv, nokia.csv, google.csv) выглядит следующим образом (столбцы: Символ, Дата, Открыть, Высокий, Низкий, Закрыть, Объем, Адж Закрыть):

$ head -5 apple.csv
AAPL,2013-09-06,498.44,499.38,489.95,498.22,12788700,498.22
AAPL,2013-09-05,500.25,500.68,493.64,495.27,8402100,495.27
AAPL,2013-09-04,499.56,502.24,496.28,498.69,12299300,498.69
AAPL,2013-09-03,493.10,500.60,487.35,488.58,11854600,488.58
AAPL,2013-08-30,492.00,492.95,486.50,487.22,9724900,487.22

Теперь мы готовы запустить наш алгоритм MapReduce для наборов данных:

$ hadoop jar target/highest_stock_price-1.0.jar highest_stock_price/HighestStockPriceDriver /stock_demo/input/ /stock_demo/output/

$ hadoop fs -cat /stock_demo/output/part*
AAPL:	2012-09-19	685.76
GOOG:	2013-07-15	924.69
NOK:	2000-06-19	42.24

Мы можем проверить состояние задания Hadoop с помощью следующей команды:

$ hadoop job -status job_1388420266428_0001
DEPRECATED: Use of this script to execute mapred command is deprecated.
Instead use the mapred command for it.
13/12/31 00:31:15 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.YarnClientImpl is inited.
13/12/31 00:31:15 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.YarnClientImpl is started.
13/12/31 00:31:17 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
Job: job_1388420266428_0001
Job File: hdfs://pivhdsne:8020/user/history/done/2013/12/31/000000/job_1388420266428_0001_conf.xml
Job Tracking URL : http://localhost:19888/jobhistory/job/job_1388420266428_0001
Uber job : false
Number of maps: 3
Number of reduces: 1
map() completion: 1.0
reduce() completion: 1.0
Job state: SUCCEEDED
....
....

Это покажет нам, что было 3 маппера и 1 редуктор. Он также покажет количество входных и выходных записей и байтов.

Интерактивный распределенный механизм запросов HAWQ

Распространенные жалобы в отношении классических алгоритмов Hadoop MapReduce заключаются в том, что они требуют довольно обширного опыта работы с Java и достаточно приспособлены для пакетной обработки данных, они не очень подходят для предварительного анализа данных с использованием специальных интерактивных запросов. Вот где HAWQ может прийти на помощь. HAWQ — это массивно параллельный механизм SQL-запросов. Базовый механизм основан на PostgreSQL (версия 8.2.15, на момент написания этой статьи), поэтому он может поддерживать стандартные операторы SQL из коробки. Ключевыми компонентами архитектуры являются ведущий HAWQ, сегменты HAWQ, хранилище HAWQ и соединение HAWQ.HAWQ-архитектура

Мастер HAWQ отвечает за прием соединений от клиентов, а также управляет системными таблицами, содержащими метаданные о самом HAWQ (однако никакие пользовательские данные не хранятся на мастере). Затем мастер анализирует и оптимизирует запросы и разрабатывает план выполнения, который затем отправляется сегментам. Сегменты HAWQ являются единицами обработки, они отвечают за выполнение операций локальной базы данных на своих собственных наборах данных.HAWQ-запрос-исполнение

HAWQ хранит все пользовательские данные в HDFS. HAWQ interconnect относится к межпроцессному обмену данными между сегментами на основе UDP. Теперь давайте посмотрим, как мы можем ответить на тот же вопрос о ценах на акции, который мы сделали с нашей работой MapReduce. Сначала нам нужно войти в систему нашего клиента (psql, того же клиента, которого мы хорошо знаем из баз данных PostgeSQL) и создать нашу схему и таблицу:

$ psql
psql (8.2.15)
Type "help" for help.

gpadmin=# create schema stock_demo;
gpadmin=# create table stock_demo.stock
gpadmin-# (
gpadmin(# symbol TEXT,
gpadmin(# date TEXT,
gpadmin(# open NUMERIC(6,2),
gpadmin(# high NUMERIC(6,2),
gpadmin(# low NUMERIC(6,2),
gpadmin(# close NUMERIC(6,2),
gpadmin(# volume INTEGER,
gpadmin(# adjclose NUMERIC(6,2)
gpadmin(# )
gpadmin-# with (appendonly=true) distributed randomly;

Следующим шагом является загрузка данных в эту таблицу HAWQ. Для этого мы можем использовать следующие команды:

$ cat google.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"
$ cat nokia.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"
$ cat apple.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"

Теперь мы можем войти снова в наш psql-клиент и выполнить SQL-запросы:

gpadmin=# select count(*) from stock_demo.stock;
 count 
-------
 14296
(1 row)

gpadmin=# select symbol, date, adjclose from stock_demo.stock where adjclose in
gpadmin-# ( select max(adjclose) as max_adj_close from stock_demo.stock 
gpadmin(#   group by symbol )
gpadmin-# order by symbol;
 symbol |    date    | adjclose 
--------+------------+----------
 AAPL   | 2012-09-19 |   685.76
 GOOG   | 2013-07-15 |   924.69
 NOK    | 2000-06-19 |    42.24
(3 rows)

Эти SQL-запросы основывались на внутренней таблице HAWQ, поэтому нам пришлось загружать в нее данные из нашей локальной файловой системы. HAWQ также поддерживает понятие внешних таблиц с использованием PXF (Pivotal eXtension Framework). Это внешний табличный интерфейс в HAWQ, который позволяет считывать данные непосредственно из каталогов HDFS. У него есть концепция фрагментаторов, средств доступа и распознавателей, которые используются для разделения файлов данных на более мелкие куски и считывания их в HAWQ без необходимости явной загрузки их во внутренние таблицы HAWQ. Если мы хотим использовать внешнюю таблицу, нам нужно создать ее, используя следующую инструкцию SQL:

gpadmin=# create external table stock_demo.stock_pxf
gpadmin-# (
gpadmin(# symbol TEXT,
gpadmin(# date TEXT,
gpadmin(# open NUMERIC(6,2),
gpadmin(# high NUMERIC(6,2),
gpadmin(# low NUMERIC(6,2),
gpadmin(# close NUMERIC(6,2),
gpadmin(# volume INTEGER,
gpadmin(# adjclose NUMERIC(6,2)
gpadmin(# )
gpadmin-# location ('pxf://pivhdsne:50070/stock_demo/input/*.csv?Fragmenter=HdfsDataFragmenter&Accessor=TextFileAccessor&Resolver=TextResolver')
gpadmin-# format 'TEXT' (delimiter = E'\,');

Затем мы можем выполнить те же запросы к внешней таблице, что и раньше:

gpadmin=# select count(*) from stock_demo.stock_pxf;
 count 
-------
 14296
(1 row)

gpadmin=# select symbol, date, adjclose from stock_demo.stock_pxf where adjclose in 
gpadmin-# ( select max(adjclose) as max_adj_close from stock_demo.stock_pxf
gpadmin(#   group by symbol )
gpadmin-# order by symbol;
 symbol |    date    | adjclose 
--------+------------+----------
 AAPL   | 2012-09-19 |   685.76
 GOOG   | 2013-07-15 |   924.69
 NOK    | 2000-06-19 |    42.24
(3 rows)

Вывод

SQL на Hadoop набирает обороты, растет спрос на возможность выполнения специальных интерактивных запросов, а также пакетной обработки данных поверх Hadoop. Большинство ключевых игроков в мире больших данных начали предлагать решения для удовлетворения этих потребностей. 2014 год, кажется, интересный, чтобы увидеть, как эти предложения будут развиваться.