В этом посте мы собираемся написать программу MapReduce, которая будет использовать входные данные Avro, а также создавать данные в формате Avro.
Мы напишем программу для расчета среднего балла студентов.
Подготовка данных
Схема для записей:
student.avsc
{
"type" : "record",
"name" : "student_marks",
"namespace" : "com.rishav.avro",
"fields" : [ {
"name" : "student_id",
"type" : "int"
}, {
"name" : "subject_id",
"type" : "int"
}, {
"name" : "marks",
"type" : "int"
} ]
}
И некоторые примеры записей:
student.json
{"student_id":1,"subject_id":63,"marks":19}
{"student_id":2,"subject_id":64,"marks":74}
{"student_id":3,"subject_id":10,"marks":94}
{"student_id":4,"subject_id":79,"marks":27}
{"student_id":1,"subject_id":52,"marks":95}
{"student_id":2,"subject_id":34,"marks":16}
{"student_id":3,"subject_id":81,"marks":17}
{"student_id":4,"subject_id":60,"marks":52}
{"student_id":1,"subject_id":11,"marks":66}
{"student_id":2,"subject_id":84,"marks":39}
{"student_id":3,"subject_id":24,"marks":39}
{"student_id":4,"subject_id":16,"marks":0}
{"student_id":1,"subject_id":65,"marks":75}
{"student_id":2,"subject_id":5,"marks":52}
{"student_id":3,"subject_id":86,"marks":50}
{"student_id":4,"subject_id":55,"marks":42}
{"student_id":1,"subject_id":30,"marks":21}
Теперь мы преобразуем приведенные выше примеры записей в формат avro и загрузим файл данных avro в HDFS:
java -jar avro-tools-1.7.5.jar fromjson student.json --schema-file student.avsc > student.avro hadoop fs -put student.avro student.avro
Программа Avro MapReduce
В моей программе я использовал класс Avro Java для схемы student_marks. Чтобы создать класс Java из файла схемы, используйте команду ниже:
java -jar avro-tools-1.7.5.jar compile schema student.avsc .
Затем добавьте сгенерированный класс Java в IDE.
Я написал программу MapReduce, которая читает файл данных Avro student.avro (передается в качестве аргумента), вычисляет средние оценки для каждого учащегося и сохраняет результаты также в формате Avro. Программа приведена ниже:
package com.rishav.avro.mapreduce;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.rishav.avro.IntPair;
import com.rishav.avro.student_marks;
public class AvroAverageDriver extends Configured implements Tool{
public static class AvroAverageMapper extends
Mapper<AvroKey<student_marks>, NullWritable, IntWritable, IntPair> {
protected void map(AvroKey<student_marks> key, NullWritable value, Context context)
throws IOException, InterruptedException {
IntWritable s_id = new IntWritable(key.datum().getStudentId());
IntPair marks_one = new IntPair(key.datum().getMarks(), 1);
context.write(s_id, marks_one);
}
} // end of mapper class
public static class AvroAverageCombiner extends
Reducer<IntWritable, IntPair, IntWritable, IntPair> {
IntPair p_sum_count = new IntPair();
Integer p_sum = new Integer(0);
Integer p_count = new Integer(0);
protected void reduce(IntWritable key, Iterable<IntPair> values, Context context)
throws IOException, InterruptedException {
p_sum = 0;
p_count = 0;
for (IntPair value : values) {
p_sum += value.getFirstInt();
p_count += value.getSecondInt();
}
p_sum_count.set(p_sum, p_count);
context.write(key, p_sum_count);
}
} // end of combiner class
public static class AvroAverageReducer extends
Reducer<IntWritable, IntPair, AvroKey<Integer>, AvroValue<Float>> {
Integer f_sum = 0;
Integer f_count = 0;
protected void reduce(IntWritable key, Iterable<IntPair> values, Context context)
throws IOException, InterruptedException {
f_sum = 0;
f_count = 0;
for (IntPair value : values) {
f_sum += value.getFirstInt();
f_count += value.getSecondInt();
}
Float average = (float)f_sum/f_count;
Integer s_id = new Integer(key.toString());
context.write(new AvroKey<Integer>(s_id), new AvroValue<Float>(average));
}
} // end of reducer class
@Override
public int run(String[] rawArgs) throws Exception {
if (rawArgs.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(super.getConf());
job.setJarByClass(AvroAverageDriver.class);
job.setJobName("Avro Average");
String[] args = new GenericOptionsParser(rawArgs).getRemainingArgs();
Path inPath = new Path(args[0]);
Path outPath = new Path(args[1]);
FileInputFormat.setInputPaths(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(super.getConf()).delete(outPath, true);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setMapperClass(AvroAverageMapper.class);
AvroJob.setInputKeySchema(job, student_marks.getClassSchema());
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntPair.class);
job.setCombinerClass(AvroAverageCombiner.class);
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
job.setReducerClass(AvroAverageReducer.class);
AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT));
AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new AvroAverageDriver(), args);
System.exit(result);
}
}
- В программе ключом ввода для картографа является AvroKey <student_marks>, а вводимое значение равно нулю. Выходной ключ метода карты — student_id, а выходное значение — IntPair, имеющее метки и 1.
- У нас также есть объединитель, который собирает частичные суммы для каждого student_id.
- Наконец, редуктор берет student_id и частичные суммы и считает их и использует их для вычисления среднего значения для каждого student_id. Редуктор записывает вывод в формате Avro.
Для настройки работы Avro мы добавили следующие свойства:
// set InputFormatClass to AvroKeyInputFormat and define input schema job.setInputFormatClass(AvroKeyInputFormat.class); AvroJob.setInputKeySchema(job, student_marks.getClassSchema()); // set OutputFormatClass to AvroKeyValueOutputFormat and key as INT type and value as FLOAT type job.setOutputFormatClass(AvroKeyValueOutputFormat.class); AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.INT)); AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.FLOAT));
Выполнение работы
Мы упаковываем нашу Java-программу в avro_mr.jar и добавляем Avro-jar-файлы в libjars и hadoop classpath, используя следующие команды:
export LIBJARS=avro-1.7.5.jar,avro-mapred-1.7.5-hadoop1.jar,paranamer-2.6.jar
export HADOOP_CLASSPATH=avro-1.7.5.jar:avro-mapred-1.7.5-hadoop1.jar:paranamer-2.6.jar
hadoop jar avro_mr.jar com.rishav.avro.mapreduce.AvroAverageDriver -libjars ${LIBJARS} student.avro output
Вы можете проверить вывод, используя команду avro-tool.
Чтобы включить сжатие snappy для вывода, добавьте следующие строки в метод run и добавьте snappy-java jar в libjars и hadoop classpath:
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);