При работе с огромным количеством необработанных данных ежедневно могут быть сценарии, в которых мы можем быть заинтересованы в сборе информации о данных, которые мы анализируем, например, сколько типов записей было обработано, сколько недействительных записей было найдено во время выполнения задания, и т. д. Эта функциональность обеспечивается счетчиками в Hadoop, которые являются легкими объектами, помогающими нам отслеживать прогресс Map и сокращать количество задач. Поскольку Apache Pig широко используется для потоков ETL в организациях, а Apache Pig создает задания mapreduce, мы можем использовать эти счетчики hadoop в Apache Pig для удовлетворения наших потребностей. В этом посте мы увидим, как мы можем собирать информацию о данных, с которыми мы хотим иметь дело, и использовать функциональность счетчиков hadoop в Pig и сохранять эти метрики счетчиков в постоянном хранилище, таком как файл, база данных и т. Д.,так что эта информация может быть позже просмотрена, чтобы понять больше о наших данных.
Чтобы использовать счетчики hadoop, нам нужно написать pigudf и выполнить все связанные операции в udf. Наконец, когда все проверки завершены, мы можем сохранить эти метрики в постоянном хранилище, таком как файл или база данных, в этом посте мы будем хранить метрики счетчика в файле в HDFS.
Утилита Twitters Elephant Bird предоставляет методы для использования счетчиков Hadoop с Pig. Нам нужны две банки elephant-bird-pig и elephant-bird-hadoop-compat для размещения в classpath проектов. Их можно скачать из репозитория Maven. Я добавил ссылки для скачивания этих банок ниже.
Давайте рассмотрим пример использования, в котором мы хотим проверить набор данных, содержащий имя пользователя, номер телефона, почту, род занятий, возраст и пол.
Из этого набора данных нам нравится понимать, сколько пользователей не предоставили свою информацию должным образом. Если какое-либо из полей не заполнено, мы будем увеличивать счетчик соответственно для этого поля, т.е. если пользователь не предоставил свой возраст, мы будем увеличивать счетчик, называемый » missing_age «, если пол пуст, то» missing_gender «и другие соответственно. Мы также проверим значения, например, возраст> 150 — недопустимые данные, и, аналогично, недопустимы значения поля пола, отличные от мужских или женских. Мы запишем все это и сохраним статистику в файле hadoop.
Предположим, что это формат нашего набора данных.
Структура набора данных: имя, номер телефона, почта, род занятий, возраст, пол
Это наш образец набора данных
Joe,1122334455,[email protected],student,20,male
Jim,2211334455,[email protected],engineer,26,male
Lisa,3311224455,[email protected],musician,25,female
John,,[email protected],writer,120,mafe
В этом наборе данных в последней записи мы можем обнаружить, что номер телефона пользователя отсутствует, а его возраст указан неверно, и даже его пол неверен.
В следующем UDF мы проверили, отсутствуют ли какие-либо другие поля, например, имя, возраст, пол и т. Д., А также проверили, если возраст не превышает 100, и если поле пола имеет значения как мужские, так и женские. Если есть какие-либо пропущенные поля, то мы увеличиваем счетчик, скажем, если возраст отсутствует, то мы увеличиваем счетчик «MISSING_AGE» на 1, скажем, если возраст недействителен, тогда мы увеличиваем счетчик «INVALID_AGE», и аналогично мы делаем то же самое для других поля.
В следующем фрагменте кода вы можете заметить константу COUNTER_GROUP_NAME, в Hadoop все COUNTERS заполняются в COUNTER GROUP. Здесь мы создаем нашу собственную группу счетчиков клиентов под названием CUSTOMCOUNTERS, и все наши счетчики заполнены этой группой счетчиков. Я покажу это на скриншоте после запуска pigscript.
package com.dzone.pig;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PigListener implements PigProgressNotificationListener {
private final Logger log = LoggerFactory.getLogger(PigListener.class);
private final String COUNTER_GROUP_NAME = "CUSTOMCOUNTERS";
private HashMap<String, Long> countersToStore = new HashMap<String, Long>(0);
boolean isThereAnyCountersToStore = false;
public void initialPlanNotification(String scriptId, MROperPlan plan) {}
public void launchStartedNotification(String scriptId, int numJobsToLaunch) {}
public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) {}
public void jobStartedNotification(String scriptId, String assignedJobId) {}
//This method is called after completion of each mapreduce job spawned by pig script
public void jobFinishedNotification(String scriptId, JobStats jobStats) {
Group counterGroup = jobStats.getHadoopCounters().getGroup(COUNTER_GROUP_NAME);
Iterator<Counter> countersInGroup = counterGroup.iterator();
if (counterGroup.size() > 0)
isThereAnyCountersToStore = true;
while (countersInGroup.hasNext()) {
Counter currentCounter = countersInGroup.next();
log.info("CounterName: " + currentCounter.getDisplayName() + " "+ "CounterValue:" + currentCounter.getValue());
countersToStore.put(currentCounter.getDisplayName(),currentCounter.getValue());
}
List<InputStats> inputStatsList = jobStats.getInputs();
if (inputStatsList.size() > 0)
isThereAnyCountersToStore = true;
Iterator<InputStats> inputStatsIterator = inputStatsList.iterator();
while (inputStatsIterator.hasNext()) {
InputStats inputStats = inputStatsIterator.next();
countersToStore.put("TOTAL_INPUT_RECORDS",inputStats.getNumberRecords());
}
}
public void jobFailedNotification(String scriptId, JobStats jobStats) {}
public void outputCompletedNotification(String scriptId,OutputStats outputStats) {}
public void progressUpdatedNotification(String scriptId, int progress) {}
//This method is called once our pigscript completes
public void launchCompletedNotification(String scriptId,int numJobsSucceeded) {
if (isThereAnyCountersToStore)
try {
storeCountersInFile();
} catch (IOException e) {
e.printStackTrace();
} finally {
isThereAnyCountersToStore = false;
}
}
//This method stores the collected counters in file in hdfs.
private void storeCountersInFile() throws IOException {
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Path filenamePath = new Path("/tmp/output.txt");
try {
if (fs.exists(filenamePath)) {
fs.delete(filenamePath, true);
}
FSDataOutputStream fout = fs.create(filenamePath);
for (Map.Entry<String, Long> entry : countersToStore.entrySet()) {
fout.writeBytes(entry.getKey() + "," + entry.getValue());
fout.writeBytes("\n");
}
} finally {
fs.close();
}
}
}
Давайте позвоним нашему pigscript pigcounter_example.pig
Перед запуском скрипта не забудьте скопировать банки с птицами-слонами и наш UDF-файл, содержащий слушателя и валидатора, в папку библиотеки pig. т.е. для моей установки это / usr / lib / pig / lib.
После копирования необходимых jar-файлов наш скрипт может быть запущен с помощью следующей команды.
pig -Dpig.notification.listener=com.dzone.pig.PigListener -f pigcounter_example.pig
Свойство pig.notification.listener регистрирует слушателя с помощью нашего pigscript и вызывает методы, такие как jobfinishednotification, launchcompletednotification и т. д., когда наше задание pig завершается.
Ниже приведен наш сценарий свиньи.
REGISTER /home/deva/deva/xoanon/dzone/pig/PigCounters-0.0.1-SNAPSHOT.jar;
REGISTER /home/deva/deva/xoanon/dzone/pig/elephant-bird-hadoop-compat-4.5.jar;
REGISTER /home/deva/deva/xoanon/dzone/pig/elephant-bird-pig-4.5.jar;
DEFINE Validator com.dzone.pig.PigCounters();
input_data = LOAD '/tmp/sampledata.csv' USING PigStorage(',') AS(name:chararray,phonenumber:chararray,mail:chararray,occupation:chararray,age:chararray,gender:chararray);
--calling our UDF here to validate and increment counters accordingly
validated_data = FOREACH input_data GENERATE Validator(*) AS validated;
dump validated_data;
Здесь я приложил скриншот, чтобы показать вам, как наши счетчики отображаются на странице счетчиков нашей работы. После завершения задания вы можете увидеть выходной файл /tmp/output.txt в ваших hdf-файлах.