Учебники

MapReduce — Partitioner

Секционер работает как условие при обработке входного набора данных. Фаза разбиения происходит после фазы Map и до фазы Reduce.

Количество секционеров равно количеству редукторов. Это означает, что разделитель разделит данные в соответствии с числом редукторов. Следовательно, данные, передаваемые с одного разделителя, обрабатываются одним редуктором.

Разметка

Разделитель разделяет пары ключ-значение промежуточных выходов Map. Он разделяет данные, используя пользовательское условие, которое работает как хэш-функция. Общее количество разделов совпадает с количеством заданий Reducer для задания. Давайте рассмотрим пример, чтобы понять, как работает разделитель.

Внедрение MapReduce Partitioner

Для удобства предположим, что у нас есть небольшая таблица Employee со следующими данными. Мы будем использовать этот пример данных в качестве нашего входного набора данных, чтобы продемонстрировать, как работает разделитель.

Я бы название Возраст Пол Оплата труда
1201 Гопал 45 мужчина 50000
1202 Маниша 40 женский 50000
1203 Халил 34 мужчина 30000
1204 Prasanth 30 мужчина 30000
1205 Киран 20 мужчина 40000
1206 Лакшми 25 женский 35000
1207 Бхавайя 20 женский 15000
1208 Reshma 19 женский 15000
1209 kranthi 22 мужчина 22000
+1210 Сатиш 24 мужчина 25000
1211 Кришна 25 мужчина 25000
1212 Аршад 28 мужчина 20000
1213 Лаванья 18 женский 8000

Мы должны написать заявление для обработки входного набора данных, чтобы найти работника с наибольшим окладом по полу в разных возрастных группах (например, ниже 20 лет, от 21 до 30 лет, выше 30 лет).

Входные данные

Приведенные выше данные сохраняются в файле input.txt в каталоге «/ home / hadoop / hadoopPartitioner» и передаются в качестве входных данных.

1201 Гопал 45 мужчина 50000
1202 Маниша 40 женский 51000
1203 Khaleel 34 мужчина 30000
1204 Prasanth 30 мужчина 31000
1205 Киран 20 мужчина 40000
1206 Лакшми 25 женский 35000
1207 Бхавайя 20 женский 15000
1208 Reshma 19 женский 14 000
1209 kranthi 22 мужчина 22000
+1210 Сатиш 24 мужчина 25000
1211 Кришна 25 мужчина 26000
1212 Аршад 28 мужчина 20000
1213 Лаванья 18 женский 8000

Основываясь на данном входе, ниже приводится алгоритмическое объяснение программы.

Задачи карты

Задача карты принимает пары ключ-значение в качестве входных данных, пока у нас есть текстовые данные в текстовом файле. Входные данные для этой задачи карты следующие:

Ввод — ключом будет шаблон, такой как «любая специальная клавиша + имя файла + номер строки» (пример: ключ = @ input1), а значением будут данные в этой строке (пример: значение = 1201 \ t gopal \ t 45 \ т мужской \ т 50000).

Метод — Работа этой задачи карты заключается в следующем —

  • Прочитайте значение (запись данных), которое поступает в качестве входного значения из списка аргументов в строке.

  • Используя функцию split, разделите пол и сохраните в строковую переменную.

Прочитайте значение (запись данных), которое поступает в качестве входного значения из списка аргументов в строке.

Используя функцию split, разделите пол и сохраните в строковую переменную.

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Отправьте информацию о поле и значение записи в виде пары выходной ключ-значение из задачи сопоставления в задачу разбиения .

Отправьте информацию о поле и значение записи в виде пары выходной ключ-значение из задачи сопоставления в задачу разбиения .

context.write(new Text(gender), new Text(value));
  • Повторите все вышеперечисленные шаги для всех записей в текстовом файле.

Повторите все вышеперечисленные шаги для всех записей в текстовом файле.

Вывод. Вы получите половые данные и данные записи в виде пар ключ-значение.

Задача Partitioner

Задача секционера принимает пары ключ-значение из задачи карты в качестве входных данных. Разделение подразумевает разделение данных на сегменты. В соответствии с заданными условными критериями разделов входные парные данные ключ-значение могут быть разделены на три части на основе возрастных критериев.

Ввод — все данные в коллекции пар ключ-значение.

ключ = значение поля пола в записи.

значение = значение всей записи данных этого пола.

Метод . Процесс логики разбиения выполняется следующим образом.

  • Считайте значение поля возраста из пары ключ-значение.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Проверьте значение возраста с соблюдением следующих условий.

    • Возраст не более 20
    • Возраст больше 20 и меньше или равно 30.
    • Возраст старше 30.

Проверьте значение возраста с соблюдением следующих условий.

if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Вывод . Все данные пар ключ-значение сегментированы на три набора пар ключ-значение. Редуктор работает индивидуально на каждую коллекцию.

Уменьшить задачи

Количество задач секционирования равно количеству задач редуктора. Здесь у нас есть три задачи секционирования, и, следовательно, у нас есть три задачи редуктора, которые нужно выполнить.

Ввод — Редуктор будет выполняться три раза с различным набором пар ключ-значение.

ключ = значение поля пола в записи.

значение = все данные записи этого пола.

Метод — следующая логика будет применяться к каждой коллекции.

  • Прочитайте значение поля Зарплата каждой записи.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Проверьте зарплату с помощью переменной max. Если str [4] является максимальной зарплатой, присвойте str [4] значение max, в противном случае пропустите шаг.

Проверьте зарплату с помощью переменной max. Если str [4] является максимальной зарплатой, присвойте str [4] значение max, в противном случае пропустите шаг.

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Повторите шаги 1 и 2 для каждой коллекции ключей (мужские и женские — ключевые коллекции). Выполнив эти три шага, вы найдете одну максимальную зарплату из коллекции ключей для мужчин и одну максимальную зарплату из коллекции ключей для женщин.

Повторите шаги 1 и 2 для каждой коллекции ключей (мужские и женские — ключевые коллекции). Выполнив эти три шага, вы найдете одну максимальную зарплату из коллекции ключей для мужчин и одну максимальную зарплату из коллекции ключей для женщин.

context.write(new Text(key), new IntWritable(max));

Вывод. Наконец, вы получите набор данных пары ключ-значение в трех коллекциях разных возрастных групп. Он содержит максимальную зарплату из коллекции мужчин и максимальную зарплату из коллекции женщин в каждой возрастной группе соответственно.

После выполнения задач Map, Partitioner и Reduce три набора данных пары ключ-значение сохраняются в трех разных файлах в качестве выходных данных.

Все три задачи рассматриваются как задания MapReduce. Следующие требования и спецификации этих заданий должны быть указаны в Конфигурациях —

  • Название работы
  • Форматы ввода и вывода ключей и значений
  • Отдельные классы для задач Map, Reduce и Partitioner
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Пример программы

Следующая программа показывает, как реализовать разделители для заданных критериев в программе MapReduce.

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Сохраните приведенный выше код как PartitionerExample.java в «/ home / hadoop / hadoopPartitioner». Компиляция и выполнение программы приведены ниже.

Компиляция и выполнение

Предположим, мы находимся в домашнем каталоге пользователя Hadoop (например, / home / hadoop).

Следуйте инструкциям ниже, чтобы скомпилировать и выполнить вышеуказанную программу.

Шаг 1 — Загрузите Hadoop-core-1.2.1.jar, который используется для компиляции и запуска программы MapReduce. Вы можете скачать банку с mvnrepository.com .

Допустим, загруженная папка имеет вид «/ home / hadoop / hadoopPartitioner»

Шаг 2 — Следующие команды используются для компиляции программы PartitionerExample.java и создания jar для программы.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Шаг 3 — Используйте следующую команду для создания входного каталога в HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Шаг 4 — Используйте следующую команду, чтобы скопировать входной файл с именем input.txt во входной каталог HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Шаг 5 — Используйте следующую команду, чтобы проверить файлы во входном каталоге.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Шаг 6 — Используйте следующую команду, чтобы запустить приложение Top salary, взяв входные файлы из входного каталога.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Подождите некоторое время, пока файл не запустится. После выполнения выходные данные содержат несколько входных разбиений, задач сопоставления и задач редуктора.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

Шаг 7 — Используйте следующую команду для проверки результирующих файлов в выходной папке.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Вы найдете выходные данные в трех файлах, потому что вы используете в своей программе три секционера и три редуктора.

Шаг 8 — Используйте следующую команду, чтобы увидеть выходные данные в файле Part-00000 . Этот файл создан HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Выход в Part-00000

Female   15000
Male     40000

Используйте следующую команду, чтобы увидеть выходные данные в файле Part-00001 .

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Выход в Part-00001

Female   35000
Male    31000

Используйте следующую команду, чтобы увидеть выходные данные в файле Part-00002 .

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Выход в Part-00002