Статьи

MapReduce в файлах данных Avro

В этом посте мы собираемся написать программу MapReduce, которая будет использовать входные данные Avro, а также создавать данные в формате Avro.

Мы напишем программу для расчета среднего балла студентов.

Подготовка данных

Схема для записей:

student.avsc
{
  "type" : "record",
  "name" : "student_marks",
  "namespace" : "com.rishav.avro",
  "fields" : [ {
  "name" : "student_id",
  "type" : "int"
  }, {
  "name" : "subject_id",
  "type" : "int"
  }, {
  "name" : "marks",
  "type" : "int"
  } ]
}

И некоторые примеры записей:

student.json
{"student_id":1,"subject_id":63,"marks":19}
{"student_id":2,"subject_id":64,"marks":74}
{"student_id":3,"subject_id":10,"marks":94}
{"student_id":4,"subject_id":79,"marks":27}
{"student_id":1,"subject_id":52,"marks":95}
{"student_id":2,"subject_id":34,"marks":16}
{"student_id":3,"subject_id":81,"marks":17}
{"student_id":4,"subject_id":60,"marks":52}
{"student_id":1,"subject_id":11,"marks":66}
{"student_id":2,"subject_id":84,"marks":39}
{"student_id":3,"subject_id":24,"marks":39}
{"student_id":4,"subject_id":16,"marks":0}
{"student_id":1,"subject_id":65,"marks":75}
{"student_id":2,"subject_id":5,"marks":52}
{"student_id":3,"subject_id":86,"marks":50}
{"student_id":4,"subject_id":55,"marks":42}
{"student_id":1,"subject_id":30,"marks":21}

Теперь мы преобразуем приведенные выше примеры записей в формат avro и загрузим файл данных avro в HDFS:

java -jar avro-tools-1.7.5.jar fromjson student.json --schema-file student.avsc > student.avro
hadoop fs -put student.avro student.avro

Программа Avro MapReduce

В моей программе я использовал класс Avro Java для схемы student_marks. Чтобы создать класс Java из файла схемы, используйте команду ниже:

     java -jar avro-tools-1.7.5.jar compile schema student.avsc .

Затем добавьте сгенерированный класс Java в IDE.

Я написал программу MapReduce, которая читает файл данных Avro student.avro (передается в качестве аргумента), вычисляет средние оценки для каждого учащегося и сохраняет результаты также в формате Avro. Программа приведена ниже:

package com.rishav.avro.mapreduce;

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.rishav.avro.IntPair;
import com.rishav.avro.student_marks;

public class AvroAverageDriver extends Configured implements Tool{

	public static class AvroAverageMapper extends 
	Mapper<AvroKey<student_marks>, NullWritable, IntWritable, IntPair> {
		protected void map(AvroKey<student_marks> key, NullWritable value, Context context) 
				throws IOException, InterruptedException {
			IntWritable s_id = new IntWritable(key.datum().getStudentId());
			IntPair marks_one = new IntPair(key.datum().getMarks(), 1);
			context.write(s_id, marks_one);
		}
	} // end of mapper class

	public static class AvroAverageCombiner extends 
	Reducer<IntWritable, IntPair, IntWritable, IntPair> {
		IntPair p_sum_count = new IntPair();
		Integer p_sum = new Integer(0);
		Integer p_count = new Integer(0);
		protected void reduce(IntWritable key, Iterable<IntPair> values, Context context) 
				throws IOException, InterruptedException {
			p_sum = 0;
			p_count = 0;
			for (IntPair value : values) {
				p_sum += value.getFirstInt();
				p_count += value.getSecondInt();
			}
			p_sum_count.set(p_sum, p_count);
			context.write(key, p_sum_count);
		}
	} // end of combiner class 

	public static class AvroAverageReducer extends 
	Reducer<IntWritable, IntPair, AvroKey<Integer>, AvroValue<Float>> {
		Integer f_sum = 0;
		Integer f_count = 0;
		
		protected void reduce(IntWritable key, Iterable<IntPair> values, Context context) 
				throws IOException, InterruptedException {
			f_sum = 0;
			f_count = 0;
			for (IntPair value : values) {
				f_sum += value.getFirstInt();
				f_count += value.getSecondInt();
			}
			Float average = (float)f_sum/f_count;
			Integer s_id = new Integer(key.toString());
			context.write(new AvroKey<Integer>(s_id), new AvroValue<Float>(average));
		}
	} // end of reducer class 

	@Override
	public int run(String[] rawArgs) throws Exception {
		if (rawArgs.length != 2) {
			System.err.printf("Usage: %s [generic options] <input> <output>\n",
					getClass().getName());
			ToolRunner.printGenericCommandUsage(System.err);
			return -1;
		}
		
		Job job = new Job(super.getConf());
		job.setJarByClass(AvroAverageDriver.class);
		job.setJobName("Avro Average");
		
		String[] args = new GenericOptionsParser(rawArgs).getRemainingArgs();
		Path inPath = new Path(args[0]);
		Path outPath = new Path(args[1]);

		FileInputFormat.setInputPaths(job, inPath);
		FileOutputFormat.setOutputPath(job, outPath);
		outPath.getFileSystem(super.getConf()).delete(outPath, true);

		job.setInputFormatClass(AvroKeyInputFormat.class);
		job.setMapperClass(AvroAverageMapper.class);
		AvroJob.setInputKeySchema(job, student_marks.getClassSchema());
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(IntPair.class);
		
		job.setCombinerClass(AvroAverageCombiner.class);
		
		job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
		job.setReducerClass(AvroAverageReducer.class);
		AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
		AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));

		return (job.waitForCompletion(true) ? 0 : 1);
	}

	public static void main(String[] args) throws Exception {
		int result = ToolRunner.run(new AvroAverageDriver(), args);
		System.exit(result);
	}
}
  • В программе ключом ввода для картографа является AvroKey <student_marks>, а вводимое значение равно нулю. Выходной ключ метода карты — student_id, а выходное значение — IntPair, имеющее метки и 1.
  • У нас также есть объединитель, который собирает частичные суммы для каждого student_id.
  • Наконец, редуктор берет student_id и частичные суммы и считает их и использует их для вычисления среднего значения для каждого student_id. Редуктор записывает вывод в формате Avro.

Для настройки работы Avro мы добавили следующие свойства:

// set InputFormatClass to AvroKeyInputFormat and define input schema
  job.setInputFormatClass(AvroKeyInputFormat.class);
  AvroJob.setInputKeySchema(job, student_marks.getClassSchema());

// set OutputFormatClass to AvroKeyValueOutputFormat and key as INT type and value as FLOAT type
  job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
  AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
  AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));

Выполнение работы

Мы упаковываем нашу Java-программу в avro_mr.jar и добавляем Avro-jar-файлы в libjars и hadoop classpath, используя следующие команды:

export LIBJARS=avro-1.7.5.jar,avro-mapred-1.7.5-hadoop1.jar,paranamer-2.6.jar
export HADOOP_CLASSPATH=avro-1.7.5.jar:avro-mapred-1.7.5-hadoop1.jar:paranamer-2.6.jar
hadoop jar avro_mr.jar com.rishav.avro.mapreduce.AvroAverageDriver -libjars ${LIBJARS} student.avro output

Вы можете проверить вывод, используя команду avro-tool.

Чтобы включить сжатие snappy для вывода, добавьте следующие строки в метод run и добавьте snappy-java jar в libjars и hadoop classpath:

FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);