Статьи

Счетчики в Apache Pig

При работе с огромным количеством необработанных данных ежедневно могут быть сценарии, в которых мы можем быть заинтересованы в сборе информации о данных, которые мы анализируем, например, сколько типов записей было обработано, сколько недействительных записей было найдено во время выполнения задания, и т. д. Эта функциональность обеспечивается счетчиками в Hadoop, которые являются легкими объектами, помогающими нам отслеживать прогресс Map и сокращать количество задач. Поскольку Apache Pig широко используется для потоков ETL в организациях, а Apache Pig создает задания mapreduce, мы можем использовать эти счетчики hadoop в Apache Pig для удовлетворения наших потребностей. В этом посте мы увидим, как мы можем собирать информацию о данных, с которыми мы хотим иметь дело, и использовать функциональность счетчиков hadoop в Pig и сохранять эти метрики счетчиков в постоянном хранилище, таком как файл, база данных и т. Д.,так что эта информация может быть позже просмотрена, чтобы понять больше о наших данных.   

Чтобы использовать счетчики hadoop, нам нужно написать pigudf и выполнить все связанные операции в udf. Наконец, когда все проверки завершены, мы можем сохранить эти метрики в постоянном хранилище, таком как файл или база данных, в этом посте мы будем хранить метрики счетчика в файле в HDFS.

Утилита Twitters Elephant Bird предоставляет методы для использования счетчиков Hadoop с Pig. Нам нужны две банки elephant-bird-pig и elephant-bird-hadoop-compat для размещения в classpath проектов. Их можно скачать из репозитория Maven. Я добавил ссылки для скачивания этих банок ниже.

скачать банку слона 

Давайте рассмотрим пример использования, в котором мы хотим проверить набор данных, содержащий имя пользователя, номер телефона, почту, род занятий, возраст и пол.

Из этого набора данных нам нравится понимать, сколько пользователей не предоставили свою информацию должным образом. Если какое-либо из полей не заполнено, мы будем увеличивать счетчик соответственно для этого поля, т.е. если пользователь не предоставил свой возраст, мы будем увеличивать счетчик, называемый » missing_age «, если пол пуст, то» missing_gender «и другие соответственно. Мы также проверим значения, например, возраст> 150 — недопустимые данные, и, аналогично, недопустимы значения поля пола, отличные от мужских или женских. Мы запишем все это и сохраним статистику в файле hadoop.  

Предположим, что это формат нашего набора данных.

Структура набора данных: имя, номер телефона, почта, род занятий, возраст, пол

Это наш образец набора данных

  Joe,1122334455,joe@xzy.com,student,20,male
  Jim,2211334455,jim@abc.com,engineer,26,male
  Lisa,3311224455,lisa@xyz.com,musician,25,female
  John,,john@abc.com,writer,120,mafe

В этом наборе данных в последней записи мы можем обнаружить, что номер телефона пользователя отсутствует, а его возраст указан неверно, и даже его пол неверен.

В следующем UDF мы проверили, отсутствуют ли какие-либо другие поля, например, имя, возраст, пол и т. Д., А также проверили, если возраст не превышает 100, и если поле пола имеет значения как мужские, так и женские. Если есть какие-либо пропущенные поля, то мы увеличиваем счетчик, скажем, если возраст отсутствует, то мы увеличиваем счетчик «MISSING_AGE» на 1, скажем, если возраст недействителен, тогда мы увеличиваем счетчик «INVALID_AGE», и аналогично мы делаем то же самое для других поля.

В следующем фрагменте кода вы можете заметить константу COUNTER_GROUP_NAME, в Hadoop все COUNTERS заполняются в COUNTER GROUP. Здесь мы создаем нашу собственную группу счетчиков клиентов под названием CUSTOMCOUNTERS, и все наши счетчики заполнены этой группой счетчиков. Я покажу это на скриншоте после запуска pigscript.

Вышеупомянутый UDF необходимо скомпилировать и упаковать в виде jar-файла и зарегистрировать с нашим pigscript

package com.dzone.pig;

import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import com.twitter.elephantbird.pig.util.PigCounterHelper;

public class PigCounters extends EvalFunc<String> {

private final String COUNTER_GROUP_NAME = "CUSTOMCOUNTERS";
PigCounterHelper counterHelper = new PigCounterHelper();
private final long incrValue = 1;
private final String COUNTERDELIMITER = "_";

public static enum COUNTERS {
MISSING, INVALID
};

public PigCounters() {};
public void incrementCounters(String counterName) {
counterHelper.incrCounter(COUNTER_GROUP_NAME, counterName, incrValue);
}

@Override
public String exec(Tuple input) throws IOException {
String name = (String) input.get(0);
String phone = (String) input.get(1);
String mail = (String) input.get(2);
String occupation = (String) input.get(3);
String age = (String) input.get(4);
String gender = (String) input.get(5);

if (name == null || name.isEmpty()) {
incrementCounters(COUNTERS.MISSING + COUNTERDELIMITER + "NAME");
}
if (phone == null || phone.isEmpty()) {
incrementCounters(COUNTERS.MISSING + COUNTERDELIMITER + "PHONE");
} else if (phone.length() != 10)
incrementCounters(COUNTERS.INVALID + COUNTERDELIMITER + "PHONE");
if (mail == null || mail.isEmpty()) {
incrementCounters(COUNTERS.MISSING + COUNTERDELIMITER + "MAIL");
}
if (occupation == null || occupation.isEmpty()) {
incrementCounters(COUNTERS.MISSING + COUNTERDELIMITER+ "OCCUPATION");
}
if (age == null || age.isEmpty()) {
incrementCounters(COUNTERS.MISSING + COUNTERDELIMITER + "AGE");
} else if (Integer.parseInt(age) > 100)
incrementCounters(COUNTERS.INVALID + COUNTERDELIMITER + "AGE");
if (gender == null || gender.isEmpty()) {
incrementCounters(COUNTERS.MISSING + COUNTERDELIMITER + "GENDER");
} else if (!(gender.equalsIgnoreCase("male") || (gender
.equalsIgnoreCase("female"))))
incrementCounters(COUNTERS.INVALID + COUNTERDELIMITER + "GENDER");
return "validated";
}
}

 С помощью вышеупомянутого UDF мы проверяем наши данные и счетчики приращений. Эти счетчики будут отображаться на странице СЧЕТЧИКИ для нашей работы под именем группы «CUSTOMCOUNTERS». Но наша цель — сохранить счетчики в файле hadoop.

Для этого нам нужно написать PIGLISTENER, который будет слушать нашу работу со свиньями и сохранять собранные счетчики, как только работа будет завершена.

Для этого Apache Pig предоставляет интерфейс с именем PIGPROCESSLISTENER, который прослушивает события, например, когда наша работа запускается, заканчивается, отправляется и вызывает эти методы соответствующим образом. Вы можете понять больше об этом, взглянув на следующий фрагмент.

В следующем фрагменте мы собираем все счетчики в группе «CUSTOMCOUNTERS» и сохраняем их в файле hadoop.

package com.dzone.pig;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PigListener implements PigProgressNotificationListener {

private final Logger log = LoggerFactory.getLogger(PigListener.class);
private final String COUNTER_GROUP_NAME = "CUSTOMCOUNTERS";
private HashMap<String, Long> countersToStore = new HashMap<String, Long>(0);
boolean isThereAnyCountersToStore = false;

public void initialPlanNotification(String scriptId, MROperPlan plan) {}
public void launchStartedNotification(String scriptId, int numJobsToLaunch) {}
public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted) {}
public void jobStartedNotification(String scriptId, String assignedJobId) {}
//This method is called after completion of each mapreduce job spawned by pig script
public void jobFinishedNotification(String scriptId, JobStats jobStats) {
Group counterGroup = jobStats.getHadoopCounters().getGroup(COUNTER_GROUP_NAME);
Iterator<Counter> countersInGroup = counterGroup.iterator();
if (counterGroup.size() > 0)
isThereAnyCountersToStore = true;
while (countersInGroup.hasNext()) {
Counter currentCounter = countersInGroup.next();
log.info("CounterName: " + currentCounter.getDisplayName() + "  "+ "CounterValue:" + currentCounter.getValue());
countersToStore.put(currentCounter.getDisplayName(),currentCounter.getValue());
}
List<InputStats> inputStatsList = jobStats.getInputs();
if (inputStatsList.size() > 0)
isThereAnyCountersToStore = true;
Iterator<InputStats> inputStatsIterator = inputStatsList.iterator();
while (inputStatsIterator.hasNext()) {
InputStats inputStats = inputStatsIterator.next();
countersToStore.put("TOTAL_INPUT_RECORDS",inputStats.getNumberRecords());
}
}
public void jobFailedNotification(String scriptId, JobStats jobStats) {}
public void outputCompletedNotification(String scriptId,OutputStats outputStats) {}
public void progressUpdatedNotification(String scriptId, int progress) {}
//This method is called once our pigscript completes
public void launchCompletedNotification(String scriptId,int numJobsSucceeded) {
if (isThereAnyCountersToStore)
try {
storeCountersInFile();
} catch (IOException e) {
e.printStackTrace();
} finally {
isThereAnyCountersToStore = false;
}
}
//This method stores the collected counters  in  file in hdfs.
private void storeCountersInFile() throws IOException {
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Path filenamePath = new Path("/tmp/output.txt");
try {
if (fs.exists(filenamePath)) {
fs.delete(filenamePath, true);
}
FSDataOutputStream fout = fs.create(filenamePath);
for (Map.Entry<String, Long> entry : countersToStore.entrySet()) {
fout.writeBytes(entry.getKey() + "," + entry.getValue());
fout.writeBytes("\n");
}
} finally {
fs.close();
}
}
}

Давайте позвоним нашему pigscript pigcounter_example.pig

Перед запуском скрипта не забудьте скопировать банки с птицами-слонами и наш UDF-файл, содержащий слушателя и валидатора, в папку библиотеки pig. т.е. для моей установки это / usr / lib / pig / lib.

После копирования необходимых jar-файлов наш скрипт может быть запущен с помощью следующей команды.

pig -Dpig.notification.listener=com.dzone.pig.PigListener -f pigcounter_example.pig

Свойство pig.notification.listener регистрирует слушателя с помощью нашего pigscript и вызывает методы, такие как jobfinishednotification, launchcompletednotification и т. д., когда наше задание pig завершается.

 Ниже приведен наш сценарий свиньи.

  REGISTER /home/deva/deva/xoanon/dzone/pig/PigCounters-0.0.1-SNAPSHOT.jar;
  REGISTER /home/deva/deva/xoanon/dzone/pig/elephant-bird-hadoop-compat-4.5.jar;
  REGISTER /home/deva/deva/xoanon/dzone/pig/elephant-bird-pig-4.5.jar;
  DEFINE Validator com.dzone.pig.PigCounters();

  input_data  = LOAD '/tmp/sampledata.csv' USING PigStorage(',') AS(name:chararray,phonenumber:chararray,mail:chararray,occupation:chararray,age:chararray,gender:chararray);

  --calling our UDF here to validate and increment counters accordingly

  validated_data = FOREACH input_data GENERATE Validator(*) AS validated;
  dump validated_data;

Здесь я приложил скриншот, чтобы показать вам, как наши счетчики отображаются на странице счетчиков нашей работы. После завершения задания вы можете увидеть выходной файл /tmp/output.txt в ваших hdf-файлах.