Статьи

Apache Pig: Начало

В последнее время я экспериментировал с использованием Pig для некоторых данных MBS Fannie-Mae. Хотя я не против написания программ MapReduce для обработки данных (особенно довольно простых задач, которые я сейчас выполняю), я действительно ценю то, что «волшебная» свинья делает под одеялом, вы могли бы сказать. Если вы не знаете, Pig, член экосистемы Hadoop (а теперь это первоклассный проект Apache на  pig.apache.org), является основой для анализа больших наборов данных. В этом мини-руководстве мы увидим, как Pig работает с Hadoop и HDFS, и насколько вы можете достичь всего лишь с помощью нескольких строк сценария. Я использую версию Pig 0.10.0 на Hadoop 1.1.0 (на Ubuntu 12.04, на VirtualBox 4.2.4, на Windows 7SP1, на третьем этаже трехуровневого уровня на 1728 м над уровнем моря, но это может измениться — увидеть  эту историю  про еще одну «СВИНЬЮ»). 

В этом уроке я предполагаю, что у вас установлены Hadoop и Pig и что вы запускаете Hadoop хотя бы в псевдораспределенном режиме. Если вы действительно новичок в обеих темах, я бы порекомендовал сначала взглянуть на соответствующие веб-сайты Apache, и для развертывания и запуска Hadoop он действительно не лучше, чем посты Майкла Нолла на эту тему. Для одноузлового кластера (которого достаточно для следования этому учебному пособию), см. Его пост  Запуск Hadoop в Ubuntu Linux (одноузловой кластер) . Я читаю Hadoop Тома Уайта  : Полное руководство , которое содержит очень полезную главу о свинье.

Во-первых, набор данных

Прежде чем мы начнем, давайте посмотрим на данные, которые мы будем анализировать. На веб-сайте Fannie Mae вы можете найти страницу самые последние проблемы с ипотечным пулом  (нажмите «Статистика новых проблем»). Файлы с разделителями по трубе доступны для каждого дня, для которого доступны данные о проблемах. На этой странице меня больше всего интересует статистика пула новых проблем, которую я Я сокращу NIPS. Эти файлы интересны тем, что они содержат записи в нескольких различных форматах (ссылка на приведенной выше странице относится к документу, описывающему различные форматы записей, найденные в файле NIPS). Таким образом, при разборе файла NIPS Вам нужно сначала просмотреть 2-й столбец данных, а затем обратиться к файлу описания формата файла, чтобы интерпретировать данные.

В качестве примера я смотрю на последние несколько строк файла NIPS от 9 ноября. Мы включили только строки для одного CUSIP, AQ7340:

AQ7340|01|3138MPEN1|11/01/2012|FNMS 02.5000 CI-AQ7340|$2,218,111.00|2.5||12/25/2012|U.S. BANK N.A.|U.S. BANK N.A.|9|247169.44|11/01/2027|||||||3.092|||1|180|179|92|779|0.0||0.0|CI  ||76.17|97AQ7340|02|MAX|375000.0|3.5|94.0|813|180|2|180
AQ7340|02|75%|317250.0|3.25|93.0|796|180|1|180
AQ7340|02|MED|241800.0|3.0|92.0|786|180|0|180
AQ7340|02|25%|209725.0|3.0|91.0|773|180|0|179
AQ7340|02|MIN|179500.0|2.875|90.0|697|180|0|178
AQ7340|03|REFINANCE|9|100.0|$2,218,111.30
AQ7340|04|1|9|100.0|$2,218,111.30
AQ7340|05|PRINCIPAL RESIDENCE|9|100.0|$2,218,111.30
AQ7340|08|2012|9|100.0|$2,218,111.30
AQ7340|09|GEORGIA|1|8.98|$199,118.84
AQ7340|09|ILLINOIS|1|9.52|$211,250.00
AQ7340|09|MICHIGAN|2|19.13|$424,312.98
AQ7340|09|MINNESOTA|2|24.93|$552,916.34
AQ7340|09|MISSOURI|1|10.82|$239,984.67
AQ7340|09|WASHINGTON|2|26.62|$590,528.47
AQ7340|10|U.S. BANK N.A.|9|100.0|$2,218,111.30
AQ7340|17|BROKER|1|16.91|$375,000.00
AQ7340|17|CORRESPONDENT|6|59.27|$1,314,611.30
AQ7340|17|RETAIL|2|23.83|$528,500.00

В этом руководстве мы будем обрабатывать файлы NIPS, чтобы сложить итоговые неоплаченные остатки (UPB) в расчете на каждого штата. Ссылаясь на
описание макета файла NIPS , я вижу, что мне нужно посмотреть записи, где поле № 2 — «09». Что мы хотим сделать с этими данными, так это накапливать сумму в долларах каждой UPB в ключе «состояния» по всему файлу NIPS или по набору файлов NIPS и выводить итоги по состоянию, когда мы закончим.

Это не огромные наборы данных, конечно. Но с целью создания интересного учебного пособия мы обработаем небольшой объем данных NIPS и посмотрим, в каких штатах наблюдается наибольшая активность по ипотечным кредитам (по крайней мере, в том, что касается новых проблем Fannie-Mae). Главное здесь — научиться обрабатывать данные и использовать возможности Pig.

Загрузка данных в HDFS

Я начну с загрузки всех доступных данных со страницы NIPS Fannie Mae в мою локальную файловую систему. Во время этого урока в него вошли данные с 23 августа (2012) по 21 ноября. Этот набор обеспечивает чуть менее 400K строк вывода. Следующим шагом является копирование из моего локального хранилища в HDFS:


$ bin / hadoop dfs -copyFromLocal / home / hduser / dev / pigExamples / nipsData / user / hduser / pigExample

Мы можем проверить передачу в HDFS с помощью:

$ bin / hadoop dfs -ls / user / hduser / pigExample

Изучение одного файла с помощью Pig

Мы начнем с загрузки одного файла и попыток отфильтровать строки с типом записи не «09».
Я предполагаю, что вы установили Pig, и он настроен для доступа к HDFS. Запустите интерпретатор Pig:
hduser@ubuntu:~$ pig
2012-11-24 16:56:19,305 [main] INFO  org.apache.pig.Main — Apache Pig version 0.10.0 (r1328203) compiled Apr 19 2012, 22:54:12
2012-11-24 16:56:19,306 [main] INFO  org.apache.pig.Main — Logging error messages to: /home/hduser/pig_1353801379300.log
2012-11-24 16:56:19,518 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine — Connecting to hadoop file system at: hdfs://localhost:9000
2012-11-24 16:56:19,858 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine — Connecting to map-reduce job tracker at: localhost:9001
grunt> 
You can see from the output that Pig knows I’m running Hadoop in (pseudo-)distributed mode.  If you don’t see these, verify your PIG_CLASSPATH is set.  Next I’m going to load a single file from HDFS into Pig.  Pig assumes the field delimiter is a tab; since our file is pipe («|») delimited, we will use 
PigStorage to override the default:
grunt> NIPS_9Nov = load ‘pigExample/nips_11092012.txt’ using PigStorage(‘|’);
(Note that the path is relative to ‘/user/hduser’).  This load will not occur until the data is required; for example, right now a «dump» would cause the file to be loaded.  In fact, if you type
grunt> dump NIPS_9Nov;
you will see a flurry of activity, related to the Hadoop MapReduce task(s) being created on your behalf, culminating with the actual output of the parsed-on-pipe-symbol text, of which the last few lines look like the following:
(AQ7340,09,GEORGIA,1,8.98,$199,118.84)
(AQ7340,09,ILLINOIS,1,9.52,$211,250.00)
(AQ7340,09,MICHIGAN,2,19.13,$424,312.98)
(AQ7340,09,MINNESOTA,2,24.93,$552,916.34)
(AQ7340,09,MISSOURI,1,10.82,$239,984.67)
(AQ7340,09,WASHINGTON,2,26.62,$590,528.47)
(AQ7340,10,U.S. BANK N.A.,9,100.0,$2,218,111.30)
(AQ7340,17,BROKER,1,16.91,$375,000.00)
(AQ7340,17,CORRESPONDENT,6,59.27,$1,314,611.30)
(AQ7340,17,RETAIL,2,23.83,$528,500.00)
This is good; this is what we want.  Next we’ll want to look only at the record-type=09 fields, then accumulate balances on a per-state level.  In a fresh Pig shell, enter the following:
grunt> nips_9nov = load ‘/user/hduser/pigExample/nips_11092012.txt’ using PigStorage(‘|’) as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
grunt> fr_9nov = filter nips_9nov by (recordType == 9);
grunt> dump fr_9nov;
This will produce the same output as before, only restricting it to the «record type = 09» fields.  Again, here is the tail of this output:
(AQ7337,9,WYOMING,1,2.7,$170,619.65)
(AQ7340,9,GEORGIA,1,8.98,$199,118.84)
(AQ7340,9,ILLINOIS,1,9.52,$211,250.00)
(AQ7340,9,MICHIGAN,2,19.13,$424,312.98)
(AQ7340,9,MINNESOTA,2,24.93,$552,916.34)
(AQ7340,9,MISSOURI,1,10.82,$239,984.67)
(AQ7340,9,WASHINGTON,2,26.62,$590,528.47)
At this point I’m going to change direction a little and put the Pig statements into a script, so it is a little easier to catch the output.  Create a new file called «pigTest.pig» and add the following lines to it:
nips_9nov = load ‘/user/hduser/pigExample/nips_11092012.txt’ using PigStorage(‘|’) as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr_9nov = filter nips_9nov by (recordType == 9);
dump fr_9nov;
Save the file and invoke it with:
pig -f pigTest.pig &> pigTest.log
Some of Pig’s output goes to stderr, so you’ll want to capture both stdout and stderr to your log file.  Open the log file and scroll down to:
 Job Stats (time in seconds):
and look at the next two lines, the first of which is a header.  Note that Pig only generated a Map job and no Reducejobs (Maps = 1, Reduces = 0, Feature = «MAP_ONLY»).  Since we are only loading records and filtering them based on a field characteristic, no Reduce job was necessary.
Next, we’ll want to parse the aggregate unpaid balances for each mortgage, sum them by state, and output the totals.  The aggregate UPB is in the form of a human-readable, not-much-fun-to-parse bytearray (e.g. $3,759,464.16).  To treat these as floats we’ll have to do a little cleanup.  This may not be terribly efficient, but I used a nested «REPLACE» function call:
fr_clean = foreach fr_9nov generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, ‘\$’, »), ‘,’, ») as upbFloat;
Note that if you enter this expression in the Pig shell, you’ll need two additional escape («\») characters in front of the dollar sign (which, as in the java.lang.String.replaceAll()method, is interpreted as a regex). In a script, you’ll need to escape both the dollar sign and the backslash.  Trust me.  fr_clean will now contain cleaned-up unpaid balances that look like real floats.  In the Pig shell, you can verify the schema of the relation (but not that the data will parse, as this has not happened yet) with the following:
grunt> describe fr_clean;
2012-11-26 23:21:45,570 [main] WARN  org.apache.pig.PigServer — Encountered Warning IMPLICIT_CAST_TO_CHARARRAY 1 time(s).
2012-11-26 23:21:45,570 [main] WARN  org.apache.pig.PigServer — Encountered Warning USING_OVERLOADED_FUNCTION 1 time(s).
fr_clean: {poolNumber: bytearray,state: bytearray,numberOfLoans: int,percentageUpb: float,upbFloat: float}
The final steps to output the states (and the District of Columbia) with the total unpaid balances of all new issues (for this file, in millions of dollars) are:
grunt> byState = group fr_clean by state;
grunt> totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0;
grunt> dump totalUpb;
I’ve glossed over these steps, but basically you are grouping by state and summing the unpaid balances on a per-state basis, scaling the totals by one million.  After the dump call is completed, we get 51 lines of output, the last few of which are here:
(CALIFORNIA,1021.2734624101563)
(NEW JERSEY,103.9833925234375)
(NEW MEXICO,18.8126310078125)
(WASHINGTON,153.9220293671875)
(CONNECTICUT,33.7019688515625)
(MISSISSIPPI,24.3124981796875)
(NORTH DAKOTA,7.280279875)
(PENNSYLVANIA,147.6614224453125)
(RHODE ISLAND,15.24327924609375)
(SOUTH DAKOTA,10.51592517578125)
(MASSACHUSETTS,156.1397877109375)
(NEW HAMPSHIRE,16.52243540234375)
(WEST VIRGINIA,8.3394678828125)
(NORTH CAROLINA,129.42278906640624)
(SOUTH CAROLINA,70.23617646875)
(DISTRICT OF COLUMBIA,18.288814109375)
In other words, California totaled slightly more than one billion dollars for the pools issued on the 9th of November in 2012.  
To wrap things up a little, I’ll next run from a Pig script file.  I mentioned earlier we need to be a little careful about the escape character in the «REPLACE» call.  Here’s the script to process a single file:
nips_9nov = load ‘/user/hduser/pigExample/nips_11092012.txt’ using PigStorage(‘|’) as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr_9nov = filter nips_9nov by (recordType == 9);
fr_clean = foreach fr_9nov generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, ‘\\\$’, »), ‘,’, ») as upbFloat;
byState = group fr_clean by state;
totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0;
dump totalUpb;
Processing the entire dataset
There’s not much left to do here but run against the entire dataset, which in our case is about three months’ worth of new-issues files.  A slight modification to the script:
nips = load ‘/user/hduser/pigExample’ using PigStorage(‘|’) as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr = filter nips by (recordType == 9);
fr_clean = foreach fr generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, ‘\\\$’, »), ‘,’, ») as upbFloat;
byState = group fr_clean by state;
totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0 as total;
sortedUpb = order totalUpb by total;
dump sortedUpb;
results in similar data (sorted in ascending order of total aggregate UPB), of course with only larger numbers.  For example, we see that during a three-month period starting in late August 2012, new Fannie-Mae pools representing $629M were issued for properties in Alaska.  You can also see from the output file that one Map and one Reduce job were created, and I have to admit, quite a number of records dropped (due to failure to parse):
2012-11-26 23:51:38,269 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher — Encountered Warning ACCESSING_NON_EXISTENT_FIELD 2396 time(s).
2012-11-26 23:51:38,269 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher — Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 30869 time(s).
On first inspection, it appears that 2396 «record type = 9» records actually didn’t have enough fields to provide an aggregate unpaid balance column, and that I failed to successfully convert quite a few balances.  I did not investigate these records; however, such records generally tell you that you need to modify your parse logic.  In other words — a good topic for another post!