Учебники

HCatalog – Формат ввода-вывода

Интерфейсы HCatInputFormat и HCatOutputFormat используются для чтения данных из HDFS и после обработки записывают полученные данные в HDFS с помощью задания MapReduce. Давайте разработаем интерфейсы формата ввода и вывода.

HCatInputFormat

HCatInputFormat используется с заданиями MapReduce для чтения данных из таблиц, управляемых HCatalog. HCatInputFormat предоставляет Hadoop 0.20 MapReduce API для чтения данных, как если бы они были опубликованы в таблице.

Sr.No. Название и описание метода
1

public static HCatInputFormat setInput (Job job, String dbName, String tableName) выдает IOException

Установите входы для использования в работе. Он запрашивает метастаз с заданной входной спецификацией и сериализует соответствующие разделы в конфигурации задания для задач MapReduce.

2

public static HCatInputFormat setInput (Config конфигурации, String dbName, String tableName) выдает IOException

Установите входы для использования в работе. Он запрашивает метастаз с заданной входной спецификацией и сериализует соответствующие разделы в конфигурации задания для задач MapReduce.

3

public HCatInputFormat setFilter (String filter) выбрасывает IOException

Установите фильтр на входной таблице.

4

public HCatInputFormat setProperties (Свойства свойств) бросает IOException

Установите свойства для формата ввода.

public static HCatInputFormat setInput (Job job, String dbName, String tableName) выдает IOException

Установите входы для использования в работе. Он запрашивает метастаз с заданной входной спецификацией и сериализует соответствующие разделы в конфигурации задания для задач MapReduce.

public static HCatInputFormat setInput (Config конфигурации, String dbName, String tableName) выдает IOException

Установите входы для использования в работе. Он запрашивает метастаз с заданной входной спецификацией и сериализует соответствующие разделы в конфигурации задания для задач MapReduce.

public HCatInputFormat setFilter (String filter) выбрасывает IOException

Установите фильтр на входной таблице.

public HCatInputFormat setProperties (Свойства свойств) бросает IOException

Установите свойства для формата ввода.

API-интерфейс HCatInputFormat включает следующие методы:

  • setInput
  • setOutputSchema
  • getTableSchema

Чтобы использовать HCatInputFormat для чтения данных, сначала создайте экземпляр InputJobInfo с необходимой информацией из считываемой таблицы, а затем вызовите setInput с InputJobInfo .

Вы можете использовать метод setOutputSchema, чтобы включить схему проекции , чтобы указать поля вывода. Если схема не указана, будут возвращены все столбцы в таблице. Вы можете использовать метод getTableSchema, чтобы определить схему таблицы для указанной входной таблицы.

HCatOutputFormat

HCatOutputFormat используется с заданиями MapReduce для записи данных в таблицы, управляемые HCatalog. HCatOutputFormat предоставляет Hadoop 0.20 MapReduce API для записи данных в таблицу. Когда задание MapReduce использует HCatOutputFormat для записи вывода, используется выходной формат по умолчанию, настроенный для таблицы, и новый раздел публикуется в таблице после завершения задания.

Sr.No. Название и описание метода
1

public static void setOutput (Config конфигурации, учетные данные Credentials, OutputJobInfo outputJobInfo) выбрасывает IOException

Установите информацию о выходе для записи для работы. Он запрашивает сервер метаданных, чтобы найти StorageHandler для использования в таблице. Выдает ошибку, если раздел уже опубликован.

2

public static void setSchema (Config конфигурации, схема HCatSchema) выдает IOException

Установите схему для данных, записываемых в раздел. Схема таблицы используется по умолчанию для раздела, если это не вызывается.

3

public RecordWriter <WritableComparable <?>, HCatRecord> getRecordWriter (контекст TaskAttemptContext) выбрасывает IOException, InterruptedException

Получить запись писателя для работы. Он использует стандартный выходной формат StorageHandler для получения записи.

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) выбрасывает IOException, InterruptedException

Получите выходной коммиттер для этого выходного формата. Это гарантирует, что вывод фиксируется правильно.

public static void setOutput (Config конфигурации, учетные данные Credentials, OutputJobInfo outputJobInfo) выбрасывает IOException

Установите информацию о выходе для записи для работы. Он запрашивает сервер метаданных, чтобы найти StorageHandler для использования в таблице. Выдает ошибку, если раздел уже опубликован.

public static void setSchema (Config конфигурации, схема HCatSchema) выдает IOException

Установите схему для данных, записываемых в раздел. Схема таблицы используется по умолчанию для раздела, если это не вызывается.

public RecordWriter <WritableComparable <?>, HCatRecord> getRecordWriter (контекст TaskAttemptContext) выбрасывает IOException, InterruptedException

Получить запись писателя для работы. Он использует стандартный выходной формат StorageHandler для получения записи.

public OutputCommitter getOutputCommitter (TaskAttemptContext context) выбрасывает IOException, InterruptedException

Получите выходной коммиттер для этого выходного формата. Это гарантирует, что вывод фиксируется правильно.

API-интерфейс HCatOutputFormat включает следующие методы:

  • setOutput
  • setSchema
  • getTableSchema

Первый вызов в HCatOutputFormat должен быть установлен setOutput ; любой другой вызов выдаст исключение, сообщающее, что формат вывода не инициализирован.

Схема для записываемых данных указывается методом setSchema . Вы должны вызвать этот метод, предоставив схему данных, которые вы пишете. Если ваши данные имеют ту же схему, что и схема таблицы, вы можете использовать HCatOutputFormat.getTableSchema (), чтобы получить схему таблицы, а затем передать ее в setSchema () .

пример

Следующая программа MapReduce считывает данные из одной таблицы, которая, как предполагается, имеет целое число во втором столбце («столбец 1»), и подсчитывает, сколько экземпляров каждого отдельного значения она находит. То есть он делает эквивалент « выберите col1, count (*) из группы $ table по col1; ».

Например, если значения во втором столбце равны {1, 1, 1, 3, 3, 5}, программа выдаст следующий вывод значений и считает:

1, 3
3, 2
5, 1

Давайте теперь посмотрим на код программы –

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
		
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
	
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
			
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
			
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
	
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
		
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
		
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
		
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
	
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

Перед компиляцией вышеуказанной программы вам необходимо скачать несколько jar- файлов и добавить их в classpath для этого приложения. Вам необходимо загрузить все банки Hive и HCatalog (HCatalog-core-0.5.0.jar, hive-metastore-0.10.0.jar, libthrift-0.7.0.jar, hive-exec-0.10.0.jar, libfb303-0.7.0.jar, jdo2-api-2.3-ec.jar, slf4j-api-1.6.1.jar).

Используйте следующие команды, чтобы скопировать эти файлы JAR из локального в HDFS и добавить их в путь к классам .

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

Используйте следующую команду, чтобы скомпилировать и выполнить данную программу.

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

Теперь проверьте выходной каталог (hdfs: user / tmp / hive) для вывода (part_0000, part_0001).