Статьи

Hadoop на практике


Hadoop на практике

Алекс Холмс

Работа с простыми форматами данных, такими как файлы журналов, проста и поддерживается в MapReduce. В этой статье, основанной на главе 3 Hadoop на практике, автор Алекс Холмс показывает, как работать с повсеместно распространенными форматами сериализации данных, такими как XML и JSON. 

Обработка общих форматов сериализации

XML и JSON являются стандартными форматами обмена данными. Их повсеместное распространение в нашей отрасли подтверждается их широким внедрением в области хранения и обмена данными. XML существует с 1998 года как механизм представления данных, которые могут читать как машины, так и люди. Он стал универсальным языком для обмена данными между системами и сегодня используется многими стандартами, такими как SOAP и RSS, и используется в качестве открытого формата данных для таких продуктов, как Microsoft Office.

Техника 1: MapReduce и XML

Наша цель — использовать XML в качестве источника данных для задания MapReduce. Мы собираемся предположить, что XML-документы, которые должны быть обработаны, имеют большой размер, и в результате мы хотим иметь возможность обрабатывать их параллельно с несколькими сопоставителями, работающими над одним и тем же входным файлом.

проблема

Параллельно работать над одним XML-файлом в MapReduce сложно, поскольку XML не содержит маркера синхронизации в своем формате данных. Следовательно, как мы работаем с форматом файла, который по своей природе не разделяем как XML?

Решение

MapReduce не содержит встроенной поддержки XML, поэтому мы должны обратиться к другому проекту Apache, Mahout, системе машинного обучения, которая предоставляет XML InputFormat. Чтобы продемонстрировать XML InputFormat, давайте напишем задание MapReduce, которое использует XML-формат Mahout для чтения имен и значений свойств из Hadoop.

конфигурационные файлы. Нашим первым шагом является настройка конфигурации работы.

conf.set("xmlinput.start", "");            #1 
conf.set("xmlinput.end", "");             #2 
job.setInputFormatClass(XmlInputFormat.class);       #3

# 1 Определяет строковую форму начального тега XML. Наша работа состоит в том, чтобы принимать файлы конфигурации Hadoop в качестве входных данных, где каждая запись конфигурации использует тег «property».
# 2 Определяет строковую форму конечного тега XML.
# 3 Устанавливает класс формата ввода Mahout XML.

Из кода видно, что XML InputFormat Mahout является зачаточным; ты должен сказать

это точная последовательность начальных и конечных тегов XML, которые будут искать в файле. Глядя на источник

InputFormat подтверждает это:

private boolean next(LongWritable key, Text value)
    throws IOException {
  if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
    try {
      buffer.write(startTag);
      if (readUntilMatch(endTag, true)) {
        key.set(fsin.getPos());
        value.set(buffer.getData(), 0, buffer.getLength());
        return true;
      }
    } finally {
      buffer.reset();
    }
  }
  return false;
}

 

Далее нам нужно написать Mapper для использования входного формата Mahout XML. Мы предоставляем элемент XML в

Текстовая форма, поэтому нам нужно использовать синтаксический анализатор XML для извлечения содержимого из XML.

public static class Map extends Mapper<LongWritable, Text,
    Text, Text> {
  @Override
  protected void map(LongWritable key, Text value,
                     Mapper.Context context)
      throws
      IOException, InterruptedException {
    String document = value.toString();
    System.out.println("‘" + document + "‘");
    try {
      XMLStreamReader reader =
          XMLInputFactory.newInstance().createXMLStreamReader(new
              ByteArrayInputStream(document.getBytes()));
      String propertyName = "";
      String propertyValue = "";
      String currentElement = "";
      while (reader.hasNext()) {
        int code = reader.next();
        switch (code) {
          case START_ELEMENT:
            currentElement = reader.getLocalName();
            break;
          case CHARACTERS:
            if (currentElement.equalsIgnoreCase("name")) {
              propertyName += reader.getText();
            } else if (currentElement.equalsIgnoreCase("value")) {
              propertyValue += reader.getText();
            }
            break;
        }
      }
      reader.close();
      context.write(propertyName.trim(), propertyValue.trim());
    } catch (Exception e) {
      log.error("Error processing ‘" + document + "‘", e);
    }
  }
}

Нашей карте дан экземпляр Text, который содержит строковое представление данных между начальным и конечным тегами. В нашем коде мы используем встроенный в Java синтаксический анализатор Streaming API for XML (StAX), чтобы извлечь ключ и значение для каждого свойства и вывести их. Если мы запустим нашу работу MapReduce для core-site.xml Cloudera и отследим вывод, мы увидим вывод, который вы увидите ниже.

$ hadoop fs -put $HADOOP_HOME/conf/core-site.xml core-site.xml

$ bin/run.sh com.manning.hip.ch3.xml.HadoopPropertyXMLMapReduce \
  core-site.xml output

$ hadoop fs -cat output/part*
fs.default.name hdfs://localhost:8020
hadoop.tmp.dir /var/lib/hadoop-0.20/cache/${user.name}
hadoop.proxyuser.oozie.hosts *
hadoop.proxyuser.oozie.groups *

 

Этот вывод показывает, что мы успешно работали с XML как форматом сериализации ввода с MapReduce! Мало того, мы можем поддерживать огромные файлы XML, так как InputFormat поддерживает разделение XML.

ПИСЬМО XML

Успешно прочитав XML, следующий вопрос будет, как мы пишем XML? В нашем редукторе у нас есть обратные вызовы

которые происходят до и после вызова нашего основного метода Reduce, который мы можем использовать для создания начального и конечного тега

public static class Reduce
    extends Reducer<Text, Text, Text, Text> {

  @Override
  protected void setup(
      Context context)
      throws IOException, InterruptedException {
    context.write(new Text("<configuration>"), null);            #1
  }

  @Override
  protected void cleanup(
      Context context)
      throws IOException, InterruptedException {
    context.write(new Text("</configuration>"), null);           #2
  }

  private Text outputKey = new Text();
  public void reduce(Text key, Iterable<Text> values,
                     Context context)
      throws IOException, InterruptedException {
    for (Text value : values) {
      outputKey.set(constructPropertyXml(key, value));           #3
      context.write(outputKey, null);                            #4
    }
  }

  public static String constructPropertyXml(Text name, Text value) {
    StringBuilder sb = new StringBuilder();
    sb.append("<property><name>").append(name)
        .append("</name><value>").append(value)     
        .append("</value></property>");
    return sb.toString();
  }
}

# 1 Использует метод установки для записи начального тега корневого элемента.
# 2 Использует метод очистки для записи конечного тега корневого элемента.
# 3 Создает дочерний элемент XML для каждой комбинации ключ / значение, которую мы получаем в Reducer. # 4 Издает элемент XML.

Это также может быть встроено в OutputFormat.

PIG

Если вы хотите работать с XML в Pig, библиотека Piggybank (пользовательская библиотека полезного кода Pig) содержит XMLLoader. Он работает аналогично нашей технике и захватывает весь контент между начальным и конечным тегом и предоставляет его в виде единого поля байтового массива в кортеже Pig.

Улей

В настоящее время, похоже, нет способа работать с XML в Hive. Вы должны написать собственный SerDe [1] .

обсуждение

XML InputFormat Mahout, безусловно, поможет вам работать с XML. Однако он очень чувствителен к точному совпадению строк с именами начального и конечного элементов. Если тег элемента может содержать атрибуты со значениями переменных или генерация элемента не может контролироваться и может привести к использованию квалификаторов пространства имен XML, тогда этот подход может не сработать. Также проблематичными будут ситуации, когда указанное вами имя элемента используется в качестве дочернего элемента-потомка.

Если у вас есть контроль над тем, как XML представлен во входных данных, это упражнение можно упростить, если в каждой строке будет один элемент XML. Это позволит вам использовать встроенные текстовые входные форматы MapReduce (такие как TextInputFormat), которые обрабатывают каждую строку как запись и разделяются соответствующим образом, чтобы сохранить это разграничение.

Другой вариант, который стоит рассмотреть, — это этап предварительной обработки, где вы можете преобразовать исходный XML в отдельную строку для каждого элемента XML или преобразовать его в совершенно другой формат данных, такой как SequenceFile или Avro, оба из которых решают проблему разделения для вы.

Существует потоковый класс StreamXmlRecordReader, который позволяет вам работать с XML в вашем потоковом коде.

У нас есть сведения о том, как работать с XML, поэтому давайте перейдем к другому популярному формату сериализации, JSON. JSON разделяет машинно-читабельные черты XML и существует с начала 2000-х годов. Он менее многословен, чем XML, и не обладает богатыми возможностями ввода и проверки, доступными в XML.

Техника 2: MapReduce и JSON

Наша методика описывает, как вы можете работать с JSON в MapReduce. Мы также рассмотрим метод, с помощью которого файл JSON может быть разделен для одновременного чтения.

проблема

На рисунке 1 показана проблема использования JSON в MapReduce. Если вы работаете с большими файлами JSON, вам необходимо разделить их. Но, учитывая случайное смещение в файле, как мы можем определить начало следующего элемента JSON, особенно при работе с JSON, который имеет несколько иерархий, как в примере ниже?

Рисунок 1 Пример проблемы с JSON и несколькими входными разбиениями

Решение

JSON сложнее разбить на отдельные сегменты, чем формат, такой как XML, потому что у JSON нет токена (например, конечного тега в XML) для обозначения начала или конца записи.

ElephantBird [2] , проект с открытым исходным кодом, содержащий некоторые полезные утилиты для работы со сжатием LZO, имеет LzoJsonInputFormat, который может читать JSON, но требует, чтобы входной файл был сжат LZOP. Мы будем использовать этот код в качестве шаблона для нашего собственного JSON InputFormat, у которого нет требования сжатия LZOP.

 

Мы обманываем наше решение, поскольку предполагаем, что каждая запись JSON находится на отдельной строке. Наш JsonRecordFormat прост и ничего не делает, кроме как создает и возвращает JsonRecordReader, поэтому мы пропустим этот код. JsonRecordReader отправляет пары ключ / значение LongWritable, MapWritable в Mapper, где Map — это карта имен элементов JSON и их значений. Давайте посмотрим, как работает этот RecordReader. Он использует LineRecordReader, который является встроенным считывателем MapReduce, который генерирует запись для каждой строки. Чтобы преобразовать строку в MapWritable, он использует следующий метод.

public static boolean decodeLineToJson(JSONParser parser, Text line,
                                       MapWritable value) {
  try {
    JSONObject jsonObj = (JSONObject)parser.parse(line.toString());
    for (Object key: jsonObj.keySet()) {
      Text mapKey = new Text(key.toString());
      Text mapValue = new Text();
      if (jsonObj.get(key) != null) {
        mapValue.set(jsonObj.get(key).toString());
      }

      value.put(mapKey, mapValue);
    }
    return true;
  } catch (ParseException e) {
    LOG.warn("Could not json-decode string: " + line, e);
    return false;
  } catch (NumberFormatException e) {
    LOG.warn("Could not parse field into number: " + line, e);
    return false;
  }
}

Он использует анализатор json-simple
[3] для анализа строки в объекте JSON, а затем перебирает ключи и помещает ключи и значения в MapWritable. Mapper получает данные JSON в парах LongWritable, MapWriable и может соответствующим образом обрабатывать данные. Код для задания MapReduce очень прост. Мы собираемся продемонстрировать код, используя JSON ниже.

{
  "results" :
    [
      {
        "created_at" : "Thu, 29 Dec 2011 21:46:01 +0000",
        "from_user" : "grep_alex",
        "text" : "RT @kevinweil: After a lot of hard work by ..."
      },
      {
        "created_at" : "Mon, 26 Dec 2011 21:18:37 +0000",
        "from_user" : "grep_alex",
        "text" : "@miguno pull request has been merged, thanks again!"
      }
    ]
}

Поскольку наша технология предполагает использование объекта JSON на строку, фактический файл JSON, с которым мы будем работать, показан ниже.

{"created_at" : "Thu, 29 Dec 2011 21:46:01 +0000","from_user" : ...
{"created_at" : "Mon, 26 Dec 2011 21:18:37 +0000","from_user" : ...

Мы скопируем файл JSON в HDFS и запустим наш код MapReduce. Наш код MapReduce просто записывает каждый JSON

ключ / значение к выходу.

$ hadoop fs -put test-data/ch3/singleline-tweets.json \
  singleline-tweets.json

$ bin/run.sh com.manning.hip.ch3.json.JsonMapReduce \
  singleline-tweets.json output

$ fs -cat output/part*
text RT @kevinweil: After a lot of hard work by ...
from_user grep_alex
created_at Thu, 29 Dec 2011 21:46:01 +0000
text @miguno pull request has been merged, thanks again!
from_user grep_alex
created_at Mon, 26 Dec 2011 21:18:37 +0000

 

ПИСЬМО JSON

Подход, аналогичный тому, который мы рассмотрели для написания XML, также может быть использован для написания JSON.

 

PIG

ElephantBird содержит JsonLoader и LzoJsonLoader, которые можно использовать для работы с JSON в Pig. Это также работает для JSON, который основан на строке. Каждый кортеж Pig содержит поле для каждого элемента JSON в строке в виде chararray.

 

Улей

Hive содержит DelimitedJSONSerDe, который может сериализовать JSON, но, к сожалению, не десериализовать его, поэтому вы не можете загружать данные в Hive с помощью этого SerDe.

 

обсуждение

Наше решение работает с предположением, что входные данные JSON структурированы со строкой для каждого объекта JSON. Как бы мы работали с объектами JSON, которые находятся в нескольких строках? Авторы имеют экспериментальный проект на GitHub [4] , который работает с несколькими входными разбиениями в одном файле JSON. Ключом к этому подходу является поиск определенного члена JSON и извлечение содержащего его объекта. Есть проект Google Code под названием hive-json-serde [5] , который может поддерживать сериализацию и десериализацию.

Резюме

Как видите, использование XML и JSON в MapReduce не очень удобно и имеет жесткие требования к тому, как выкладываются ваши данные. Поддержка их в MapReduce сложна и подвержена ошибкам, так как они не поддаются естественному расщеплению. Альтернативные форматы файлов, такие как Avro и SequenceFiles, имеют встроенную поддержку разделяемости.

Если вы хотите приобрести Hadoop на практике, участники DZone могут получить скидку 38%, введя промо-код: dzone38 во время оформления заказа на Manning.com .

[1] SerDe — это сокращенная форма Serializer / Deserializer, механизма, позволяющего Hive считывать и записывать данные в HDFS.

[2] https://github.com/kevinweil/elephant-bird

[3] http://code.google.com/p/json-simple/

[4] Многострочный JSON InputFormat. https://github.com/alexholmes/json-mapreduce.

[5] http://code.google.com/p/hive-json-serde/

Источник: http://www.manning.com/holmes/