Статьи

Apache PIG: язык обработки для Map / Reduce

В моей предыдущей статье я представил модель Map / Reduce как мощную модель параллелизма . Однако, хотя Map / Reduce является простым, мощным и предоставляет хорошую возможность для распараллеливания алгоритма, он основан на жесткой процедурной структуре, которая требует внедрения пользовательского пользовательского кода, и поэтому трудно понять общую картину с высокого уровня. , Вам необходимо углубиться в код реализации карты и сократить количество функций, чтобы выяснить, что происходит.

Желательно иметь декларативный язык более высокого уровня, который описывает модель параллельной обработки данных. Это похоже на идею SQL-запроса, где пользователь указывает «что» и оставляет «как» базовому процессору обработки. В этом посте мы рассмотрим возможность такого декларативного языка. Мы начнем с модели Map / Reduce и посмотрим, как ее можно обобщить в «Модель параллельной обработки данных».

Во-первых, давайте вернемся к Map / Reduce в более абстрактном смысле.

 

 

Модель обработки Map / Reduce включает следующие шаги …

  • Из многих распределенных хранилищ данных InputReader извлекает кортежи данных A = <a1, a2, …> и случайным образом передает их во многие задачи Map.
  • Для каждого кортежа A задача Map генерирует ноль для множества кортежей A ‘
  • Выход A ‘будет отсортирован по его ключу, A’ с тем же ключом достигнет той же задачи Сокращения
  • Задача Reduce агрегируется по группе кортежей A ‘(с тем же ключом), а затем превращает их в кортеж B = lower (array <A’>)
  • OutputWriter сохраняет кортеж данных B в распределенном хранилище данных.

Распараллеливание более сложного алгоритма обычно включает несколько этапов Map / Reduce, каждый этап может иметь свою задачу Map и Reduce.

 

Глядя на абстрактную модель Map / Reduce, есть некоторые сходства с моделью запросов SQL. Мы можем выразить вышеупомянутую модель Map / Reduce, используя SQL-подобный язык запросов.

INSERT INTO A FROM InputReader("dfs:/data/myInput")
INSERT INTO A'
SELECT flatten(map(*)) FROM A
INSERT INTO B
SELECT reduce(*) FROM A' GROUP BY A'.key
INSERT INTO "dfs:/data/myOutput" FROM B

 Аналогично, SQL-запросы также могут быть выражены различными формами функций map () и redu (). Давайте посмотрим на пару типичных примеров SQL-запросов.

 Простой запрос

SELECT a1, a2 FROM A
WHERE a3 > 5 AND a4 < 6

 Вот соответствующая функция Map и Reduce 

def map(tuple)
/* tuple is implemented as a map, key by attribute name */
if (tuple["a3"] > 5 && tuple["a4"] < 6)
key = random()
emit key, "a1" => tuple["a1"], "a2" => tuple["a2"]
end
end
def reduce(tuples)
tuples.each do |tuple|
store tuple
end
end

Запрос с группировкой

SELECT sum(a1), avg(a2) FROM A
GROUP BY a3, a4
HAVING count() < 10

Вот основная функция Map и Reduce 

def map(tuple)  
key = [tuple["a3"], tuple["a4"]]
emit key, "a1" => tuple["a1"], "a2" => tuple["a2"]
end
def reduce(tuples)  
sums = {"a1" => 0, "a2" => 0}
count = 0
tuples.each do |tuple|
count += 1
sums.each_key do |attr|
sums[attr] += tuple[attr]
end
end
if count < 10
/* omit denominator check for simplcity */
store {"type" => B, "b1" => sums["a1"], "b2" => sums["a2"] / count}
end
end

Запрос с Join

SELECT a2, p2   FROM A JOIN P   ON A.a1 = P.p1

Вот соответствующая функция Map и Reduce   

def map(tuple)  
if (tuple["type"] == A)
key = tuple["a1"]
emit key, "a2" => tuple["a2"]
elsif (tuple["type"] == P)
key = tuple["p1"]
emit key, "p2" => tuple["p2"]
end
end
def reduce(tuples)  
all_A_tuples = []
all_P_tuples = []
tuples.each do |tuple|
if (tuple["type"] == A)
all_A_tuples.add(tuple)
all_P_tuples.each do |p_tuple|
joined_tuple = p_tuple.merge(tuple)
joined_tuple["type"] = B
store joined_tuple
end
elsif (tuple["type"] == P)
/* do similar things */
end
end
end

Как видите, преобразование SQL-запроса в функцию Map / Reduce довольно просто. Мы помещаем следующую логику в функцию map ()

  • Выберите столбцы, которые появляются в предложении SELECT
  • Оцените предложение WHERE и отфильтруйте кортежи, которые не соответствуют условию
  • Вычислить ключ для предложения JOIN или предложения GROUP
  • Выбросить кортеж

С другой стороны, мы помещаем следующую логику в функцию redu ()

  • Вычислить совокупное значение столбцов появляется в предложении SELECT
  • Оцените предложение HAVING и отфильтруйте вещи
  • Вычислить декартово произведение предложения JOIN
  • Сохраните финальный кортеж

Поскольку мы увидели потенциальную возможность использовать «SQL-подобный» декларативный язык для выражения параллельной обработки данных и использовать модель Map / Reduce для ее выполнения, сообщество Hadoop с открытым исходным кодом работает над проектом Pig, чтобы разработать такой язык.

PIG похож на SQL следующим образом.

  • Кортеж PIG такой же, как запись SQL, содержащая несколько полей
  • Свинья имеет свой собственный набор
  • Подобно оптимизатору SQL, который компилирует запрос в план выполнения, компилятор PIG компилирует свой запрос в задачу Map / Reduce.

 

Однако между PIG (в его нынешнем виде) и языком SQL есть ряд важных отличий.

  • Хотя поля в записи SQL должны быть атомарными (содержать одно единственное значение), поля в кортеже PIG могут быть многозначными, например, коллекция других кортежей PIG или карта с ключом — это атомарные данные, а значение — что угодно.
  • В отличие от реляционной модели, где каждая запись БД должна иметь уникальную комбинацию полей данных, кортеж PIG не требует уникальности.
  • В отличие от SQL-запроса, когда входные данные необходимо физически загрузить в таблицы БД, PIG извлекает данные из своих исходных источников данных непосредственно во время выполнения.
  • Свинья лениво выполняется. Он использует механизм обратного отслеживания из своего оператора «store», чтобы определить, какой оператор должен быть выполнен.
  • PIG процедурный, а SQL декларативный. Фактически, PIG очень похож на план выполнения SQL-запросов.
  • PIG позволяет легко подключить пользовательские функции

Для более подробной информации, пожалуйста, обратитесь к
сайту проекта PIG .