В моем предыдущем посте я описал, как мы можем использовать in-mapper combiner, чтобы сделать нашу M / R-программу более эффективной. В посте мы также увидели оба алгоритма M / R для расчета среднего с / без использования оптимизации объединителя в картографе.
В этом посте я публикую коды для обоих алгоритмов.
Программа M / R для расчета среднего значения без комбайнера в картографе приведена ниже:
package com.hadoop.imcdp; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AvgDriver extends Configured implements Tool{ public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> { String record; protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { record = value.toString(); String[] fields = record.split(","); Integer s_id = new Integer(fields[0]); Integer marks = new Integer(fields[2]); context.write(new IntWritable(s_id), new IntWritable(marks)); } // end of map method } // end of mapper class public static class ImcdpReduce extends Reducer<IntWritable, IntWritable, IntWritable, FloatWritable> { protected void reduce(IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException { Integer s_id = key.get(); Integer sum = 0; Integer cnt = 0; for (IntWritable value:values) { sum = sum + value.get(); cnt = cnt + 1; } Float avg_m = (float) (sum/cnt); context.write(new IntWritable(s_id), new FloatWritable(avg_m)); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); args = new GenericOptionsParser(conf, args).getRemainingArgs(); String input = args[0]; String output = args[1]; Job job = new Job(conf, "Avg"); job.setJarByClass(ImcdpMap.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(ImcdpMap.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(ImcdpReduce.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(FloatWritable.class); FileInputFormat.setInputPaths(job, new Path(input)); Path outPath = new Path(output); FileOutputFormat.setOutputPath(job, outPath); outPath.getFileSystem(conf).delete(outPath, true); job.waitForCompletion(true); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new AvgDriver(), args); System.exit(exitCode); } }
Программа M / R для расчета среднего значения с помощью сумматора в картографе приведена ниже:
package com.hadoop.imcdp; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ImcdpAvgDriver extends Configured implements Tool{ public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntPair> { String record; Map partial_sum = new HashMap<Integer, Integer>(); Map record_count = new HashMap<Integer, Integer>(); protected void map(LongWritable key, Text value, Mapper.Context context) { record = value.toString(); String[] fields = record.split(","); Integer s_id = new Integer(fields[0]); Integer marks = new Integer(fields[2]); if (partial_sum.containsKey(s_id)) { Integer sum = (Integer) partial_sum.get(s_id) + marks; partial_sum.put(s_id, sum); } else { partial_sum.put(s_id, marks); } if (record_count.containsKey(s_id)) { Integer count = (Integer) record_count.get(s_id) + 1; record_count.put(s_id, count); } else { record_count.put(s_id, 1); } } // end of map method protected void cleanup(Context context) throws IOException, InterruptedException { Iterator<Map.Entry<Integer, Integer>> itr1 = partial_sum.entrySet().iterator(); while (itr1.hasNext()) { Entry<Integer, Integer> entry1 = itr1.next(); Set record_count_set = record_count.entrySet(); Integer s_id_1 = entry1.getKey(); Integer partial_sum_1 = entry1.getValue(); Integer record_count_1 = (Integer) record_count.get(s_id_1); context.write(new IntWritable(s_id_1), new IntPair(partial_sum_1, record_count_1)); System.out.println(s_id_1+","+partial_sum_1+","+record_count_1); } } // end of cleanup } // end of mapper class public static class ImcdpReduce extends Reducer<IntWritable, IntPair, IntWritable, FloatWritable> { protected void reduce(IntWritable key, Iterable<IntPair> values, Reducer<IntWritable, IntPair, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException { Integer s_id = key.get(); Integer sum = 0; Integer cnt = 0; System.out.println(key+","+values); for (IntPair value:values) { sum = sum + value.getFirstInt(); cnt = cnt + value.getSecondInt(); } System.out.println(sum+","+cnt); Float avg_m = (float) (sum/cnt); context.write(new IntWritable(s_id), new FloatWritable(avg_m)); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); args = new GenericOptionsParser(conf, args).getRemainingArgs(); String input = args[0]; String output = args[1]; Job job = new Job(conf, "IMCDP"); job.setJarByClass(ImcdpMap.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(ImcdpMap.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntPair.class); job.setReducerClass(ImcdpReduce.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(FloatWritable.class); FileInputFormat.setInputPaths(job, new Path(input)); Path outPath = new Path(output); FileOutputFormat.setOutputPath(job, outPath); outPath.getFileSystem(conf).delete(outPath, true); job.waitForCompletion(true); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new ImcdpAvgDriver(), args); System.exit(exitCode); } }
Для выполнения программ на моем ноутбуке потребовалось 56 секунд и 42 секунды соответственно для 10 миллионов записей. Таким образом, мы можем увидеть улучшение на 33% во времени при использовании программы объединения в картографе.