Статьи

In-Mapper Combiner Program для расчета среднего

В моем предыдущем посте я описал, как мы можем использовать in-mapper combiner, чтобы сделать нашу M / R-программу более эффективной. В посте мы также увидели оба алгоритма M / R для расчета среднего с / без использования оптимизации объединителя в картографе.

В этом посте я публикую коды для обоих алгоритмов.

Программа M / R для расчета среднего значения без комбайнера в картографе приведена ниже:

package com.hadoop.imcdp;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvgDriver extends Configured implements Tool{

	public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
		
		String record;
		
		protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
			record = value.toString();
			String[] fields = record.split(",");
			
			Integer s_id = new Integer(fields[0]);
			Integer marks = new Integer(fields[2]);
			context.write(new IntWritable(s_id), new IntWritable(marks));
		} // end of map method
	} // end of mapper class
	

	public static class ImcdpReduce extends Reducer<IntWritable, IntWritable, IntWritable, FloatWritable> {
		
		protected void reduce(IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException {
			Integer s_id = key.get();
			Integer sum = 0;
			Integer cnt = 0;
			
			for (IntWritable value:values) {
				sum = sum + value.get();
				cnt = cnt + 1;
			}
			
			Float avg_m = (float) (sum/cnt);
			context.write(new IntWritable(s_id), new FloatWritable(avg_m));
		}
	}
	
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();
		args = new GenericOptionsParser(conf, args).getRemainingArgs();
		String input = args[0];
		String output = args[1];
		
		Job job = new Job(conf, "Avg");
		job.setJarByClass(ImcdpMap.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setMapperClass(ImcdpMap.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setReducerClass(ImcdpReduce.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(FloatWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path(input));
		Path outPath = new Path(output);
		FileOutputFormat.setOutputPath(job, outPath);
		outPath.getFileSystem(conf).delete(outPath, true);
		
		job.waitForCompletion(true);
		return (job.waitForCompletion(true) ? 0 : 1);
	}
	
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new AvgDriver(), args);
        System.exit(exitCode);
    }
}

Программа M / R для расчета среднего значения с помощью сумматора в картографе приведена ниже:

package com.hadoop.imcdp;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ImcdpAvgDriver extends Configured implements Tool{

	public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntPair> {
		
		String record;
		Map partial_sum = new HashMap<Integer, Integer>();
		Map record_count = new HashMap<Integer, Integer>();
		
		protected void map(LongWritable key, Text value, Mapper.Context context) {
			record = value.toString();
			String[] fields = record.split(",");
			
			Integer s_id = new Integer(fields[0]);
			Integer marks = new Integer(fields[2]);
			
			if (partial_sum.containsKey(s_id)) {
				Integer sum = (Integer) partial_sum.get(s_id) + marks;
				partial_sum.put(s_id, sum);
			} else {
				partial_sum.put(s_id, marks);
			}
			
			if (record_count.containsKey(s_id)) {
				Integer count = (Integer) record_count.get(s_id) + 1;
				record_count.put(s_id, count);
			} else {
				record_count.put(s_id, 1);
			}
		} // end of map method
		
		protected void cleanup(Context context) throws IOException, InterruptedException {
			Iterator<Map.Entry<Integer, Integer>> itr1 = partial_sum.entrySet().iterator();
			
			while (itr1.hasNext()) {
				Entry<Integer, Integer> entry1 = itr1.next();
				Set record_count_set = record_count.entrySet();
				Integer s_id_1 = entry1.getKey();
				Integer partial_sum_1 = entry1.getValue();
				Integer record_count_1 = (Integer) record_count.get(s_id_1);

				context.write(new IntWritable(s_id_1), new IntPair(partial_sum_1, record_count_1));
				System.out.println(s_id_1+","+partial_sum_1+","+record_count_1);
			}
		} // end of cleanup
	} // end of mapper class
	

	public static class ImcdpReduce extends Reducer<IntWritable, IntPair, IntWritable, FloatWritable> {
		
		protected void reduce(IntWritable key, Iterable<IntPair> values, Reducer<IntWritable, IntPair, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException {
			Integer s_id = key.get();
			Integer sum = 0;
			Integer cnt = 0;
			System.out.println(key+","+values);
			for (IntPair value:values) {
				sum = sum + value.getFirstInt();
				cnt = cnt + value.getSecondInt();
			}
			
			System.out.println(sum+","+cnt);
			Float avg_m = (float) (sum/cnt);
			context.write(new IntWritable(s_id), new FloatWritable(avg_m));
		}
	}
	
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();
		args = new GenericOptionsParser(conf, args).getRemainingArgs();
		String input = args[0];
		String output = args[1];
		
		Job job = new Job(conf, "IMCDP");
		job.setJarByClass(ImcdpMap.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setMapperClass(ImcdpMap.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(IntPair.class);
		
		job.setReducerClass(ImcdpReduce.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(FloatWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path(input));
		Path outPath = new Path(output);
		FileOutputFormat.setOutputPath(job, outPath);
		outPath.getFileSystem(conf).delete(outPath, true);
		
		job.waitForCompletion(true);
		return (job.waitForCompletion(true) ? 0 : 1);
	}
	
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new ImcdpAvgDriver(), args);
        System.exit(exitCode);
    }
}

Для выполнения программ на моем ноутбуке потребовалось 56 секунд и 42 секунды соответственно для 10 миллионов записей. Таким образом, мы можем увидеть улучшение на 33% во времени при использовании программы объединения в картографе.