Статьи

От свиньи к искре: легкий путь к искре для разработчиков Apache Pig

Как аналитик данных, который в основном использовал Apache Pig в прошлом, мне в конечном итоге понадобилось программировать более сложные задания, для которых требовалось использование Apache Spark, более продвинутого и гибкого языка. Поначалу Spark может показаться немного пугающим, но этот пост в блоге покажет, что переход на Spark (особенно PySpark) довольно прост.

Однако я не сторонник того, чтобы вы переходили с Apache Pig на Spark во всех случаях. Свинья это замечательный язык. Это просто, но эффективно, когда речь идет о преобразовании данных с помощью проекций и агрегации, а производительность Pig не может быть лучше стандартных задач Map / Reduce.

Apache Pig обладает отличными функциями, но…

Мне нравится думать о Pig как о конвейере команд Map / Reduce высокого уровня. Как бывший программист SQL, я нахожу это довольно интуитивно понятным, и в моей организации наши задания Hadoop по-прежнему в основном разрабатываются в Pig.

Свинья обладает множеством качеств: она стабильна, очень хорошо масштабируется и изначально интегрируется с Has metastore HCatalog. Атомное описание каждого шага сводит к минимуму концептуальные ошибки, которые часто встречаются в сложном коде SQL.

Но иногда у Pig есть некоторые ограничения, которые делают его плохой парадигмой программирования для удовлетворения ваших потребностей. Три основных ограничения:

Pig — это конвейер, который не предлагает циклов или косвенных кодов (IF..THEN), которые иногда могут быть обязательными в вашем коде. Как прекрасно сказано в статье Джая Ранганатана и Матея Захария:


В то время как скриптовые платформы, такие как Apache Pig, также предоставляют много высокоуровневых операторов, Spark позволяет вам получать доступ к этим операторам в контексте полного языка программирования — таким образом, вы можете использовать операторы управления, функции и классы, как в обычном программировании. среда.

Наконец, третье ограничение Pig связано с форматами входных данных: в то время как Pig хорош для CSV и HCatalog, он кажется немного менее удобным для чтения и обработки некоторых других форматов данных, таких как JSON (через JsonLoader), тогда как Spark интегрирует их изначально.

Попробуйте Apache Spark

Время искупаться в Spark! Свинья и Искра имеют общую модель программирования, которая позволяет легко переходить от одного к другому. По сути, вы работаете через неизменные преобразования, идентифицируемые псевдонимом (Pig) или переменной RDD (Spark). Преобразования обычно представляют собой проекции (карты), фильтры или агрегаты, такие как GroupBy, сортировки и т. Д.

Этот общий подход к программированию означает, что для разработчика Pig кривая обучения Spark довольно быстрая.

PySpark — вполне естественный выбор для аналитика данных, у которого уже есть некоторые базовые навыки Python, но код будет похож на другой вариант Spark, такой как Java или Scala.

Полный пример

В качестве иллюстрации, давайте возьмем пример сценария Pig, который загружает файл журнала, фильтрует его за определенный день, вычисляет количество записей журнала, сгруппированных по элементу, и добавляет описание элемента из другого файла:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* load a log file of user sessions. Filter for a specific date and count entries per item
*/
  
f0 = LOAD 'logfile' using PigStorage('\t') AS (log_date:chararray, item_id:chararray, some_stuff:chararray);
  
f1 = FILTER f0 BY log_date == '20160515';
  
f2 = FOREACH f1 GENERATE item_id;
  
f3 = GROUP f2 BY item_id;
  
f4 = FOREACH f3 GENERATE group AS item_id, COUNT(f2) AS nb_entries;
  
/* add item name
*/
  
item1 = LOAD 'item' using PigStorage('\t') AS (item_id:chararray, item_name:chararray);
  
join1 = JOIN f4 BY item_id LEFT, item1 BY item_id;
  
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;
  
STORE result INTO 'result_file' USING PigStorage('\t');

Код довольно прост, и каждый шаг выполняет одно преобразование.

Теперь в Spark мы начнем с необработанного Spark, использующего низкоуровневые RDD, чтобы показать сходство с кодом Pig. В коде все подробно описывается по одному псевдониму за раз, но, очевидно, рабочий код будет более компактным.

Raw Spark (с использованием RDD)

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
conf = SparkConf()
sc = SparkContext(conf=conf)
  
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
  
f1 = f0.filter(lambda x: x[0] == '20160515')
  
f3 = f1.groupBy(lambda (log_date, item_id, some_stuff): item_id)
f4 = f3.map (lambda (item_id, iterable): (item_id, len(iterable)))
  
# add item name
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
  
# no need to set the key item_id on both parts before performing the join,
# It's already on first place on each part.
  
join1 = f4.leftOuterJoin(item1)
  
result = join1.map(lambda (item_id, (nb_entries, item_name)): (item_id, item_name, str(nb_entries)))
  
# creating a line of tab separated fields, and save it in the result file
result_to_store = result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')

Здесь мы видим похожую схему кода между Pig и Spark, которая облегчает разработчику Pig программирование в Spark. Однако один недостаток заключается в том, что для относительно простых операций, подобных этой, Pig все еще более продуктивен, чем Spark, даже если время выполнения лучше (но не на удивление лучше) со Spark.

Теперь, когда мы знакомимся с этим низкоуровневым СДР, код можно улучшить с помощью DataFrames и SparkSQL. Предыдущий код может быть переписан в более читаемой форме:

Spark с DataFrames и SparkSQL

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
conf = SparkConf()
sc = SparkContext(conf=conf)
  
sqlContext = SQLContext(sc)
  
f0 = sc.textFile('logfile').map(lambda x: x.split('\t'))
  
fpFields = [ \
   StructField('log_date', StringType(), True), \
   StructField('item_id', StringType(), True), \
   StructField('some_stuff', StringType(), True) \
]
  
fpSchema = StructType(fpFields)
df_f0 = sqlContext.createDataFrame(f0, fpSchema)
df_f0.registerTempTable('log')
  
f1_df = sqlContext.sql(
   "SELECT log.item_id, count(*) AS nb_entries \
      FROM log \
     WHERE log_date = '20160515'\
  GROUP BY item_id"
)
f1_df.registerTempTable('log_agg')
# items dataframe
  
item1 = sc.textFile('item').map(lambda x: x.split('\t'))
  
itemFields = [ \
   StructField('item_id', StringType(), True), \
   StructField('item_name', StringType(), True) \
]
  
itemSchema = StructType(itemFields)
df_item1 = sqlContext.createDataFrame(item1, itemSchema)
  
df_item1.registerTempTable('item')
  
result = sqlContext.sql(
   'SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
      FROM log_agg \
  LEFT OUTER JOIN item ON log_agg.item_id = item.item_id'
)
  
result_to_store = result.rdd \
     .map (lambda record : '\t'.join(record))
  
result_to_store.saveAsTextFile('result_file')

Я уверен, что есть еще более компактные и элегантные способы сделать это в Spark SQL, но это схема.

Теперь мы назвали поля, безопасность типов и компактный код SQL, который более читабелен для аналитика данных. Производительность увеличилась, и это лучшая альтернатива Свинья.

Недостатком является то, что каждый кусок SQL теперь является черным ящиком, который можно протестировать только целиком, что может оказаться сложным, если результат отличается от ожидаемого или если время выполнения медленное. Затем разработчик должен спроектировать шаги, которые все еще читаемы и могут быть выполнены как отдельные блоки кода.

Загрузка данных из Hive metastore HCatalog

Если бы наши данные были сохранены в Hive HCatalog, все метаданные DataFrame были бы унаследованы от метастаза, и код Spark был бы еще проще:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
  
f1_df = sqlContext.sql(
   "SELECT item_id, count(*) AS nb_entries \
   FROM my_db.log \
   WHERE log_date = '20160515' \
   GROUP BY item_id"
)
  
f1_df.registerTempTable('log_agg')
  
result = sqlContext.sql(
   "SELECT log_agg.item_id, item_name, format_number(nb_entries, 0) \
      FROM log_agg \
LEFT OUTER JOIN my_db.item item ON log_agg.item_id = item.item_id"
)
  
result_to_store = result.rdd \
   .map (lambda record : '\t'.join(record))
  
result_to_store.saveAsTextFile(outputFileName)

Теперь это более компактный и читабельный кусок кода 🙂

Теперь давайте добавим преимущество в пользу Spark: пользовательские функции.

Пользовательские функции

Как указывалось ранее, в Spark, очевидно, нет необходимости в UDF; вы бы просто написали функцию как метод Python:

в Свинья:

1
2
3
4
5
6
/* the function below has been written and deployed in a jar file */
DEFINE myFancyUdf com.mydomain.myfunction1;
  
...
  
log1 = FOREACH log0 GENERATE field1, myFancyUdf (field1l);

В Spark:

1
2
3
4
5
def myFancyUdf(f1):
   someStuff
   return result
  
log1 = log0.map (lambda field1: (field1, myFancyUdf(field1))

Более продвинутые темы

В этом разделе давайте рассмотрим более мощные функции Pig в Spark на двух примерах:

Объединения на стороне карты

Одной из удобных функций Pig является объединение на стороне карты, когда одна из таблиц для объединения достаточно мала, чтобы ее можно было отправить каждому работнику для участия в задании «Карта» (не требуя более дорогого задания «Уменьшить»). Это удобно выполнить с помощью «реплицированной» подсказки в JOIN.

Представьте, что в нашем предыдущем примере таблица ‘item’ достаточно мала, чтобы поместиться в памяти. Псевдоним join1 становится:

1
2
3
join1 = JOIN f4 BY item_id, item1 BY item_id USING ‘replicated;
  
result = FOREACH join1 GENERATE f4::item_id, item_name, nb_entries;

В Spark это выполняется довольно просто с помощью переменных трансляции :

01
02
03
04
05
06
07
08
09
10
# broadcast items
item_bc = sc.broadcast(item.collect())
  
 
'''
gets item name from its id
'''
 
def getItemName (item_id_to_match): # we know there will be only one result, so we take the first from the list
  (id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]

Таблица предметов транслируется на каждом рабочем узле. Затем функция getItemName () находит в передаваемой таблице, какая запись содержит данный item_id, и возвращает ее имя. Эта функция вызывается на стороне карты задания Spark для каждой обрабатываемой записи.

Полный код теперь выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
'''
gets item name from its id
'''
 
 
def getItemName (item_id_to_match):
 # we know there will be only one result, so we take the first from the
   (id, name) = filter(lambda (id, name): id == item_id_to_match, item_bc.value)[0]
   return name
  
f1_df = sqlContext.sql(
  "SELECT item_id, count(*) AS nb_entries \
     FROM my_db.log \
    WHERE log_date = '20160515' \
   GROUP BY item_id"
)
  
item_df = sqlContext.sql(
   "SELECT item_id, item_name \
      FROM my_db.item"
)
  
item_bc = sc.broadcast(item_df.rdd.collect())
  
result = f1_df.rdd.map (lambda= result.map (lambda record : '\t'.join(record))
result_to_store.saveAsTextFile('result_file')

Функция окна: получить n первых вхождений отсортированного списка сгруппированных элементов

Иногда требуется найти первые-первые записи таблицы, сгруппированные по общему признаку. Из файлов журнала нашего примера давайте для каждого элемента получим 10 самых последних записей (в SQL это была бы функция управления окнами, такая как PARTITION BY).

В Pig это может быть выполнено с помощью такого кода:

1
2
3
4
5
6
7
8
9
f0 = LOAD ‘logfile’ using PigStorage('\t') AS (log_date:char array, item_id:chararray, some_stuff:chararray);
  
f1 = GROUP f0 BY item_id;
  
f2 = FOREACH f1 {
   o = ORDER f0 BY log_date DESC;
   l = LIMIT o 10;
   GENERATE FLATTEN(l) AS (log_date, item_id, some_stuff);
}

В Spark это также возможно либо с помощью низкоуровневого RDD, либо с возможностями SparkSQL Windowing .

Начнем с низкоуровневого решения RDD:

01
02
03
04
05
06
07
08
09
10
11
12
# create a tuple with the key for the GroupBy
f1 = f0.map (lambda (log_date, item_id, some_stuff): (item_id, (log_date, some_stuff)))
  
f2 = f1.groupByKey()
  
# result of the GroupBy is a tuple (item_id, iterable over grouped items)
# we sort the iterable according to log_date and retain only first 10 elements
f3 = f2.map (lambda (item_id, iter1): (item_id, sorted(list(iter1), key=lambda (log_date, item_id, some_stuff):log_date, reverse=True)[:10]))
  
# transform tuples of (item_id, [(log_date, item_id, some_stuff), ...]) into tuples of (log_date, item_id, some_stuff)
f4 = f3.flatMapValues(lambda x:x) \
.map (lambda (item_id, (log_date, some_stuff)):(log_date, item_id, some_stuff)

Это не очень элегантно, но делает работу.

Тогда решение SparkSQL:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
f1_df = sqlContext.sql(
'SELECT \
  log_date, \
  item_id,  \
  some_stuff  \
FROM (  \
  SELECT  \
  log_date, \
  item_id,  \
  some_stuff, \
  dense_rank() OVER (PARTITION BY item_id ORDER BY log_date DESC) as rank \
FROM my_db.log) tmp \
WHERE rank <= 10')
  
f2 = f1_df.rdd.map (lambda row: (row.log_date, row.item_id, row.some_stuff))

Намного лучше!

Вывод

Я добровольно исключил из этого блога некоторые интересные темы, такие как развертывание, отладка, мониторинг выполнения, динамическое распределение ресурсов, настройка размера раздела и разделения, выборка и т. Д. Цель этого конкретного поста в блоге — показать разработчикам Pig, как начать кодирование. в искре; Я надеюсь, что с этой точки зрения вы найдете это полезным. Если у вас есть какие-либо дополнительные вопросы, пожалуйста, задавайте их в разделе комментариев ниже.