В моей предыдущей статье я представил модель Map / Reduce как мощную модель параллелизма . Однако, хотя Map / Reduce является простым, мощным и предоставляет хорошую возможность для распараллеливания алгоритма, он основан на жесткой процедурной структуре, которая требует внедрения пользовательского пользовательского кода, и поэтому трудно понять общую картину с высокого уровня. , Вам необходимо углубиться в код реализации карты и сократить количество функций, чтобы выяснить, что происходит.
Желательно иметь декларативный язык более высокого уровня, который описывает модель параллельной обработки данных. Это похоже на идею SQL-запроса, где пользователь указывает «что» и оставляет «как» базовому процессору обработки. В этом посте мы рассмотрим возможность такого декларативного языка. Мы начнем с модели Map / Reduce и посмотрим, как ее можно обобщить в «Модель параллельной обработки данных».
Во-первых, давайте вернемся к Map / Reduce в более абстрактном смысле.
Модель обработки Map / Reduce включает следующие шаги …
- Из многих распределенных хранилищ данных InputReader извлекает кортежи данных A = <a1, a2, …> и случайным образом передает их во многие задачи Map.
- Для каждого кортежа A задача Map генерирует ноль для множества кортежей A ‘
- Выход A ‘будет отсортирован по его ключу, A’ с тем же ключом достигнет той же задачи Сокращения
- Задача Reduce агрегируется по группе кортежей A ‘(с тем же ключом), а затем превращает их в кортеж B = lower (array <A’>)
- OutputWriter сохраняет кортеж данных B в распределенном хранилище данных.
Распараллеливание более сложного алгоритма обычно включает несколько этапов Map / Reduce, каждый этап может иметь свою задачу Map и Reduce.
Глядя на абстрактную модель Map / Reduce, есть некоторые сходства с моделью запросов SQL. Мы можем выразить вышеупомянутую модель Map / Reduce, используя SQL-подобный язык запросов.
INSERT INTO A FROM InputReader("dfs:/data/myInput")
INSERT INTO A'
SELECT flatten(map(*)) FROM A
INSERT INTO B
SELECT reduce(*) FROM A' GROUP BY A'.key
INSERT INTO "dfs:/data/myOutput" FROM B
Аналогично, SQL-запросы также могут быть выражены различными формами функций map () и redu (). Давайте посмотрим на пару типичных примеров SQL-запросов.
Простой запрос
SELECT a1, a2 FROM A
WHERE a3 > 5 AND a4 < 6
Вот соответствующая функция Map и Reduce
def map(tuple)
/* tuple is implemented as a map, key by attribute name */
if (tuple["a3"] > 5 && tuple["a4"] < 6)
key = random()
emit key, "a1" => tuple["a1"], "a2" => tuple["a2"]
end
end
def reduce(tuples)
tuples.each do |tuple|
store tuple
end
end
Запрос с группировкой
SELECT sum(a1), avg(a2) FROM A
GROUP BY a3, a4
HAVING count() < 10
Вот основная функция Map и Reduce
def map(tuple)
key = [tuple["a3"], tuple["a4"]]
emit key, "a1" => tuple["a1"], "a2" => tuple["a2"]
end
def reduce(tuples)
sums = {"a1" => 0, "a2" => 0}
count = 0
tuples.each do |tuple|
count += 1
sums.each_key do |attr|
sums[attr] += tuple[attr]
end
end
if count < 10
/* omit denominator check for simplcity */
store {"type" => B, "b1" => sums["a1"], "b2" => sums["a2"] / count}
end
end
Запрос с Join
SELECT a2, p2 FROM A JOIN P ON A.a1 = P.p1
Вот соответствующая функция Map и Reduce
def map(tuple)
if (tuple["type"] == A)
key = tuple["a1"]
emit key, "a2" => tuple["a2"]
elsif (tuple["type"] == P)
key = tuple["p1"]
emit key, "p2" => tuple["p2"]
end
end
def reduce(tuples)
all_A_tuples = []
all_P_tuples = []
tuples.each do |tuple|
if (tuple["type"] == A)
all_A_tuples.add(tuple)
all_P_tuples.each do |p_tuple|
joined_tuple = p_tuple.merge(tuple)
joined_tuple["type"] = B
store joined_tuple
end
elsif (tuple["type"] == P)
/* do similar things */
end
end
end
Как видите, преобразование SQL-запроса в функцию Map / Reduce довольно просто. Мы помещаем следующую логику в функцию map ()
- Выберите столбцы, которые появляются в предложении SELECT
- Оцените предложение WHERE и отфильтруйте кортежи, которые не соответствуют условию
- Вычислить ключ для предложения JOIN или предложения GROUP
- Выбросить кортеж
С другой стороны, мы помещаем следующую логику в функцию redu ()
- Вычислить совокупное значение столбцов появляется в предложении SELECT
- Оцените предложение HAVING и отфильтруйте вещи
- Вычислить декартово произведение предложения JOIN
- Сохраните финальный кортеж
Поскольку мы увидели потенциальную возможность использовать «SQL-подобный» декларативный язык для выражения параллельной обработки данных и использовать модель Map / Reduce для ее выполнения, сообщество Hadoop с открытым исходным кодом работает над проектом Pig, чтобы разработать такой язык.
PIG похож на SQL следующим образом.
- Кортеж PIG такой же, как запись SQL, содержащая несколько полей
- Свинья имеет свой собственный набор
- Подобно оптимизатору SQL, который компилирует запрос в план выполнения, компилятор PIG компилирует свой запрос в задачу Map / Reduce.
Однако между PIG (в его нынешнем виде) и языком SQL есть ряд важных отличий.
- Хотя поля в записи SQL должны быть атомарными (содержать одно единственное значение), поля в кортеже PIG могут быть многозначными, например, коллекция других кортежей PIG или карта с ключом — это атомарные данные, а значение — что угодно.
- В отличие от реляционной модели, где каждая запись БД должна иметь уникальную комбинацию полей данных, кортеж PIG не требует уникальности.
- В отличие от SQL-запроса, когда входные данные необходимо физически загрузить в таблицы БД, PIG извлекает данные из своих исходных источников данных непосредственно во время выполнения.
- Свинья лениво выполняется. Он использует механизм обратного отслеживания из своего оператора «store», чтобы определить, какой оператор должен быть выполнен.
- PIG процедурный, а SQL декларативный. Фактически, PIG очень похож на план выполнения SQL-запросов.
- PIG позволяет легко подключить пользовательские функции
Для более подробной информации, пожалуйста, обратитесь к
сайту проекта PIG .