В этом посте мы собираемся написать программу MapReduce, которая будет использовать входные данные Avro, а также создавать данные в формате Avro.
Мы напишем программу для расчета среднего балла студентов.
Подготовка данных
Схема для записей:
student.avsc { "type" : "record", "name" : "student_marks", "namespace" : "com.rishav.avro", "fields" : [ { "name" : "student_id", "type" : "int" }, { "name" : "subject_id", "type" : "int" }, { "name" : "marks", "type" : "int" } ] }
И некоторые примеры записей:
student.json {"student_id":1,"subject_id":63,"marks":19} {"student_id":2,"subject_id":64,"marks":74} {"student_id":3,"subject_id":10,"marks":94} {"student_id":4,"subject_id":79,"marks":27} {"student_id":1,"subject_id":52,"marks":95} {"student_id":2,"subject_id":34,"marks":16} {"student_id":3,"subject_id":81,"marks":17} {"student_id":4,"subject_id":60,"marks":52} {"student_id":1,"subject_id":11,"marks":66} {"student_id":2,"subject_id":84,"marks":39} {"student_id":3,"subject_id":24,"marks":39} {"student_id":4,"subject_id":16,"marks":0} {"student_id":1,"subject_id":65,"marks":75} {"student_id":2,"subject_id":5,"marks":52} {"student_id":3,"subject_id":86,"marks":50} {"student_id":4,"subject_id":55,"marks":42} {"student_id":1,"subject_id":30,"marks":21}
Теперь мы преобразуем приведенные выше примеры записей в формат avro и загрузим файл данных avro в HDFS:
java -jar avro-tools-1.7.5.jar fromjson student.json --schema-file student.avsc > student.avro hadoop fs -put student.avro student.avro
Программа Avro MapReduce
В моей программе я использовал класс Avro Java для схемы student_marks. Чтобы создать класс Java из файла схемы, используйте команду ниже:
java -jar avro-tools-1.7.5.jar compile schema student.avsc .
Затем добавьте сгенерированный класс Java в IDE.
Я написал программу MapReduce, которая читает файл данных Avro student.avro (передается в качестве аргумента), вычисляет средние оценки для каждого учащегося и сохраняет результаты также в формате Avro. Программа приведена ниже:
package com.rishav.avro.mapreduce; import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.avro.mapreduce.AvroKeyValueOutputFormat; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.rishav.avro.IntPair; import com.rishav.avro.student_marks; public class AvroAverageDriver extends Configured implements Tool{ public static class AvroAverageMapper extends Mapper<AvroKey<student_marks>, NullWritable, IntWritable, IntPair> { protected void map(AvroKey<student_marks> key, NullWritable value, Context context) throws IOException, InterruptedException { IntWritable s_id = new IntWritable(key.datum().getStudentId()); IntPair marks_one = new IntPair(key.datum().getMarks(), 1); context.write(s_id, marks_one); } } // end of mapper class public static class AvroAverageCombiner extends Reducer<IntWritable, IntPair, IntWritable, IntPair> { IntPair p_sum_count = new IntPair(); Integer p_sum = new Integer(0); Integer p_count = new Integer(0); protected void reduce(IntWritable key, Iterable<IntPair> values, Context context) throws IOException, InterruptedException { p_sum = 0; p_count = 0; for (IntPair value : values) { p_sum += value.getFirstInt(); p_count += value.getSecondInt(); } p_sum_count.set(p_sum, p_count); context.write(key, p_sum_count); } } // end of combiner class public static class AvroAverageReducer extends Reducer<IntWritable, IntPair, AvroKey<Integer>, AvroValue<Float>> { Integer f_sum = 0; Integer f_count = 0; protected void reduce(IntWritable key, Iterable<IntPair> values, Context context) throws IOException, InterruptedException { f_sum = 0; f_count = 0; for (IntPair value : values) { f_sum += value.getFirstInt(); f_count += value.getSecondInt(); } Float average = (float)f_sum/f_count; Integer s_id = new Integer(key.toString()); context.write(new AvroKey<Integer>(s_id), new AvroValue<Float>(average)); } } // end of reducer class @Override public int run(String[] rawArgs) throws Exception { if (rawArgs.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = new Job(super.getConf()); job.setJarByClass(AvroAverageDriver.class); job.setJobName("Avro Average"); String[] args = new GenericOptionsParser(rawArgs).getRemainingArgs(); Path inPath = new Path(args[0]); Path outPath = new Path(args[1]); FileInputFormat.setInputPaths(job, inPath); FileOutputFormat.setOutputPath(job, outPath); outPath.getFileSystem(super.getConf()).delete(outPath, true); job.setInputFormatClass(AvroKeyInputFormat.class); job.setMapperClass(AvroAverageMapper.class); AvroJob.setInputKeySchema(job, student_marks.getClassSchema()); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntPair.class); job.setCombinerClass(AvroAverageCombiner.class); job.setOutputFormatClass(AvroKeyValueOutputFormat.class); job.setReducerClass(AvroAverageReducer.class); AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT)); AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT)); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int result = ToolRunner.run(new AvroAverageDriver(), args); System.exit(result); } }
- В программе ключом ввода для картографа является AvroKey <student_marks>, а вводимое значение равно нулю. Выходной ключ метода карты — student_id, а выходное значение — IntPair, имеющее метки и 1.
- У нас также есть объединитель, который собирает частичные суммы для каждого student_id.
- Наконец, редуктор берет student_id и частичные суммы и считает их и использует их для вычисления среднего значения для каждого student_id. Редуктор записывает вывод в формате Avro.
Для настройки работы Avro мы добавили следующие свойства:
// set InputFormatClass to AvroKeyInputFormat and define input schema job.setInputFormatClass(AvroKeyInputFormat.class); AvroJob.setInputKeySchema(job, student_marks.getClassSchema()); // set OutputFormatClass to AvroKeyValueOutputFormat and key as INT type and value as FLOAT type job.setOutputFormatClass(AvroKeyValueOutputFormat.class); AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT)); AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));
Выполнение работы
Мы упаковываем нашу Java-программу в avro_mr.jar и добавляем Avro-jar-файлы в libjars и hadoop classpath, используя следующие команды:
export LIBJARS=avro-1.7.5.jar,avro-mapred-1.7.5-hadoop1.jar,paranamer-2.6.jar export HADOOP_CLASSPATH=avro-1.7.5.jar:avro-mapred-1.7.5-hadoop1.jar:paranamer-2.6.jar hadoop jar avro_mr.jar com.rishav.avro.mapreduce.AvroAverageDriver -libjars ${LIBJARS} student.avro output
Вы можете проверить вывод, используя команду avro-tool.
Чтобы включить сжатие snappy для вывода, добавьте следующие строки в метод run и добавьте snappy-java jar в libjars и hadoop classpath:
FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);