
MapReduce в файлах данных Avro

В этом посте мы собираемся написать программу MapReduce, которая будет использовать входные данные Avro, а также создавать данные в формате Avro.

Мы напишем программу для расчета среднего балла студентов.

Подготовка данных

Схема для записей:

  "type" : "record",
  "name" : "student_marks",
  "namespace" : "com.rishav.avro",
  "fields" : [ {
  "name" : "student_id",
  "type" : "int"
  }, {
  "name" : "subject_id",
  "type" : "int"
  }, {
  "name" : "marks",
  "type" : "int"
  } ]

И некоторые примеры записей:


Теперь мы преобразуем приведенные выше примеры записей в формат avro и загрузим файл данных avro в HDFS:

java -jar avro-tools-1.7.5.jar fromjson student.json --schema-file student.avsc > student.avro
hadoop fs -put student.avro student.avro

Программа Avro MapReduce

В моей программе я использовал класс Avro Java для схемы student_marks. Чтобы создать класс Java из файла схемы, используйте команду ниже:

     java -jar avro-tools-1.7.5.jar compile schema student.avsc .

Затем добавьте сгенерированный класс Java в IDE.

Я написал программу MapReduce, которая читает файл данных Avro student.avro (передается в качестве аргумента), вычисляет средние оценки для каждого учащегося и сохраняет результаты также в формате Avro. Программа приведена ниже:

package com.rishav.avro.mapreduce;

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.rishav.avro.IntPair;
import com.rishav.avro.student_marks;

public class AvroAverageDriver extends Configured implements Tool{

	public static class AvroAverageMapper extends 
	Mapper<AvroKey<student_marks>, NullWritable, IntWritable, IntPair> {
		protected void map(AvroKey<student_marks> key, NullWritable value, Context context) 
				throws IOException, InterruptedException {
			IntWritable s_id = new IntWritable(key.datum().getStudentId());
			IntPair marks_one = new IntPair(key.datum().getMarks(), 1);
			context.write(s_id, marks_one);
	} // end of mapper class

	public static class AvroAverageCombiner extends 
	Reducer<IntWritable, IntPair, IntWritable, IntPair> {
		IntPair p_sum_count = new IntPair();
		Integer p_sum = new Integer(0);
		Integer p_count = new Integer(0);
		protected void reduce(IntWritable key, Iterable<IntPair> values, Context context) 
				throws IOException, InterruptedException {
			p_sum = 0;
			p_count = 0;
			for (IntPair value : values) {
				p_sum += value.getFirstInt();
				p_count += value.getSecondInt();
			p_sum_count.set(p_sum, p_count);
			context.write(key, p_sum_count);
	} // end of combiner class 

	public static class AvroAverageReducer extends 
	Reducer<IntWritable, IntPair, AvroKey<Integer>, AvroValue<Float>> {
		Integer f_sum = 0;
		Integer f_count = 0;
		protected void reduce(IntWritable key, Iterable<IntPair> values, Context context) 
				throws IOException, InterruptedException {
			f_sum = 0;
			f_count = 0;
			for (IntPair value : values) {
				f_sum += value.getFirstInt();
				f_count += value.getSecondInt();
			Float average = (float)f_sum/f_count;
			Integer s_id = new Integer(key.toString());
			context.write(new AvroKey<Integer>(s_id), new AvroValue<Float>(average));
	} // end of reducer class 

	public int run(String[] rawArgs) throws Exception {
		if (rawArgs.length != 2) {
			System.err.printf("Usage: %s [generic options] <input> <output>\n",
			return -1;
		Job job = new Job(super.getConf());
		job.setJobName("Avro Average");
		String[] args = new GenericOptionsParser(rawArgs).getRemainingArgs();
		Path inPath = new Path(args[0]);
		Path outPath = new Path(args[1]);

		FileInputFormat.setInputPaths(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		outPath.getFileSystem(super.getConf()).delete(outPath, true);

		AvroJob.setInputKeySchema(job, student_marks.getClassSchema());
		AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
		AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));

		return (job.waitForCompletion(true) ? 0 : 1);

	public static void main(String[] args) throws Exception {
		int result = ToolRunner.run(new AvroAverageDriver(), args);
  • В программе ключом ввода для картографа является AvroKey <student_marks>, а вводимое значение равно нулю. Выходной ключ метода карты — student_id, а выходное значение — IntPair, имеющее метки и 1.
  • У нас также есть объединитель, который собирает частичные суммы для каждого student_id.
  • Наконец, редуктор берет student_id и частичные суммы и считает их и использует их для вычисления среднего значения для каждого student_id. Редуктор записывает вывод в формате Avro.

Для настройки работы Avro мы добавили следующие свойства:

// set InputFormatClass to AvroKeyInputFormat and define input schema
  AvroJob.setInputKeySchema(job, student_marks.getClassSchema());

// set OutputFormatClass to AvroKeyValueOutputFormat and key as INT type and value as FLOAT type
  AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
  AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));

Выполнение работы

Мы упаковываем нашу Java-программу в avro_mr.jar и добавляем Avro-jar-файлы в libjars и hadoop classpath, используя следующие команды:

export LIBJARS=avro-1.7.5.jar,avro-mapred-1.7.5-hadoop1.jar,paranamer-2.6.jar
export HADOOP_CLASSPATH=avro-1.7.5.jar:avro-mapred-1.7.5-hadoop1.jar:paranamer-2.6.jar
hadoop jar avro_mr.jar com.rishav.avro.mapreduce.AvroAverageDriver -libjars ${LIBJARS} student.avro output

Вы можете проверить вывод, используя команду avro-tool.

Чтобы включить сжатие snappy для вывода, добавьте следующие строки в метод run и добавьте snappy-java jar в libjars и hadoop classpath:

FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);