Секционер работает как условие при обработке входного набора данных. Фаза разбиения происходит после фазы Map и до фазы Reduce.
Количество секционеров равно количеству редукторов. Это означает, что разделитель разделит данные в соответствии с числом редукторов. Следовательно, данные, передаваемые с одного разделителя, обрабатываются одним редуктором.
Разметка
Разделитель разделяет пары ключ-значение промежуточных выходов Map. Он разделяет данные, используя пользовательское условие, которое работает как хэш-функция. Общее количество разделов совпадает с количеством заданий Reducer для задания. Давайте рассмотрим пример, чтобы понять, как работает разделитель.
Внедрение MapReduce Partitioner
Для удобства предположим, что у нас есть небольшая таблица Employee со следующими данными. Мы будем использовать этот пример данных в качестве нашего входного набора данных, чтобы продемонстрировать, как работает разделитель.
Я бы | название | Возраст | Пол | Оплата труда |
---|---|---|---|---|
1201 | Гопал | 45 | мужчина | 50000 |
1202 | Маниша | 40 | женский | 50000 |
1203 | Халил | 34 | мужчина | 30000 |
1204 | Prasanth | 30 | мужчина | 30000 |
1205 | Киран | 20 | мужчина | 40000 |
1206 | Лакшми | 25 | женский | 35000 |
1207 | Бхавайя | 20 | женский | 15000 |
1208 | Reshma | 19 | женский | 15000 |
1209 | kranthi | 22 | мужчина | 22000 |
+1210 | Сатиш | 24 | мужчина | 25000 |
1211 | Кришна | 25 | мужчина | 25000 |
1212 | Аршад | 28 | мужчина | 20000 |
1213 | Лаванья | 18 | женский | 8000 |
Мы должны написать заявление для обработки входного набора данных, чтобы найти работника с наибольшим окладом по полу в разных возрастных группах (например, ниже 20 лет, от 21 до 30 лет, выше 30 лет).
Входные данные
Приведенные выше данные сохраняются в файле input.txt в каталоге «/ home / hadoop / hadoopPartitioner» и передаются в качестве входных данных.
1201 | Гопал | 45 | мужчина | 50000 |
1202 | Маниша | 40 | женский | 51000 |
1203 | Khaleel | 34 | мужчина | 30000 |
1204 | Prasanth | 30 | мужчина | 31000 |
1205 | Киран | 20 | мужчина | 40000 |
1206 | Лакшми | 25 | женский | 35000 |
1207 | Бхавайя | 20 | женский | 15000 |
1208 | Reshma | 19 | женский | 14 000 |
1209 | kranthi | 22 | мужчина | 22000 |
+1210 | Сатиш | 24 | мужчина | 25000 |
1211 | Кришна | 25 | мужчина | 26000 |
1212 | Аршад | 28 | мужчина | 20000 |
1213 | Лаванья | 18 | женский | 8000 |
Основываясь на данном входе, ниже приводится алгоритмическое объяснение программы.
Задачи карты
Задача карты принимает пары ключ-значение в качестве входных данных, пока у нас есть текстовые данные в текстовом файле. Входные данные для этой задачи карты следующие:
Ввод — ключом будет шаблон, такой как «любая специальная клавиша + имя файла + номер строки» (пример: ключ = @ input1), а значением будут данные в этой строке (пример: значение = 1201 \ t gopal \ t 45 \ т мужской \ т 50000).
Метод — Работа этой задачи карты заключается в следующем —
-
Прочитайте значение (запись данных), которое поступает в качестве входного значения из списка аргументов в строке.
-
Используя функцию split, разделите пол и сохраните в строковую переменную.
Прочитайте значение (запись данных), которое поступает в качестве входного значения из списка аргументов в строке.
Используя функцию split, разделите пол и сохраните в строковую переменную.
String[] str = value.toString().split("\t", -3); String gender=str[3];
-
Отправьте информацию о поле и значение записи в виде пары выходной ключ-значение из задачи сопоставления в задачу разбиения .
Отправьте информацию о поле и значение записи в виде пары выходной ключ-значение из задачи сопоставления в задачу разбиения .
context.write(new Text(gender), new Text(value));
-
Повторите все вышеперечисленные шаги для всех записей в текстовом файле.
Повторите все вышеперечисленные шаги для всех записей в текстовом файле.
Вывод. Вы получите половые данные и данные записи в виде пар ключ-значение.
Задача Partitioner
Задача секционера принимает пары ключ-значение из задачи карты в качестве входных данных. Разделение подразумевает разделение данных на сегменты. В соответствии с заданными условными критериями разделов входные парные данные ключ-значение могут быть разделены на три части на основе возрастных критериев.
Ввод — все данные в коллекции пар ключ-значение.
ключ = значение поля пола в записи.
значение = значение всей записи данных этого пола.
Метод . Процесс логики разбиения выполняется следующим образом.
- Считайте значение поля возраста из пары ключ-значение.
String[] str = value.toString().split("\t"); int age = Integer.parseInt(str[2]);
-
Проверьте значение возраста с соблюдением следующих условий.
- Возраст не более 20
- Возраст больше 20 и меньше или равно 30.
- Возраст старше 30.
Проверьте значение возраста с соблюдением следующих условий.
if(age<=20) { return 0; } else if(age>20 && age<=30) { return 1 % numReduceTasks; } else { return 2 % numReduceTasks; }
Вывод . Все данные пар ключ-значение сегментированы на три набора пар ключ-значение. Редуктор работает индивидуально на каждую коллекцию.
Уменьшить задачи
Количество задач секционирования равно количеству задач редуктора. Здесь у нас есть три задачи секционирования, и, следовательно, у нас есть три задачи редуктора, которые нужно выполнить.
Ввод — Редуктор будет выполняться три раза с различным набором пар ключ-значение.
ключ = значение поля пола в записи.
значение = все данные записи этого пола.
Метод — следующая логика будет применяться к каждой коллекции.
- Прочитайте значение поля Зарплата каждой записи.
String [] str = val.toString().split("\t", -3); Note: str[4] have the salary field value.
-
Проверьте зарплату с помощью переменной max. Если str [4] является максимальной зарплатой, присвойте str [4] значение max, в противном случае пропустите шаг.
Проверьте зарплату с помощью переменной max. Если str [4] является максимальной зарплатой, присвойте str [4] значение max, в противном случае пропустите шаг.
if(Integer.parseInt(str[4])>max) { max=Integer.parseInt(str[4]); }
-
Повторите шаги 1 и 2 для каждой коллекции ключей (мужские и женские — ключевые коллекции). Выполнив эти три шага, вы найдете одну максимальную зарплату из коллекции ключей для мужчин и одну максимальную зарплату из коллекции ключей для женщин.
Повторите шаги 1 и 2 для каждой коллекции ключей (мужские и женские — ключевые коллекции). Выполнив эти три шага, вы найдете одну максимальную зарплату из коллекции ключей для мужчин и одну максимальную зарплату из коллекции ключей для женщин.
context.write(new Text(key), new IntWritable(max));
Вывод. Наконец, вы получите набор данных пары ключ-значение в трех коллекциях разных возрастных групп. Он содержит максимальную зарплату из коллекции мужчин и максимальную зарплату из коллекции женщин в каждой возрастной группе соответственно.
После выполнения задач Map, Partitioner и Reduce три набора данных пары ключ-значение сохраняются в трех разных файлах в качестве выходных данных.
Все три задачи рассматриваются как задания MapReduce. Следующие требования и спецификации этих заданий должны быть указаны в Конфигурациях —
- Название работы
- Форматы ввода и вывода ключей и значений
- Отдельные классы для задач Map, Reduce и Partitioner
Configuration conf = getConf(); //Create Job Job job = new Job(conf, "topsal"); job.setJarByClass(PartitionerExample.class); // File Input and Output paths FileInputFormat.setInputPaths(job, new Path(arg[0])); FileOutputFormat.setOutputPath(job,new Path(arg[1])); //Set Mapper class and Output format for key-value pair. job.setMapperClass(MapClass.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //set partitioner statement job.setPartitionerClass(CaderPartitioner.class); //Set Reducer class and Input/Output format for key-value pair. job.setReducerClass(ReduceClass.class); //Number of Reducer tasks. job.setNumReduceTasks(3); //Input and Output format for data job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
Пример программы
Следующая программа показывает, как реализовать разделители для заданных критериев в программе MapReduce.
package partitionerexample; import java.io.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class PartitionerExample extends Configured implements Tool { //Map class public static class MapClass extends Mapper<LongWritable,Text,Text,Text> { public void map(LongWritable key, Text value, Context context) { try{ String[] str = value.toString().split("\t", -3); String gender=str[3]; context.write(new Text(gender), new Text(value)); } catch(Exception e) { System.out.println(e.getMessage()); } } } //Reducer class public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable> { public int max = -1; public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException { max = -1; for (Text val : values) { String [] str = val.toString().split("\t", -3); if(Integer.parseInt(str[4])>max) max=Integer.parseInt(str[4]); } context.write(new Text(key), new IntWritable(max)); } } //Partitioner class public static class CaderPartitioner extends Partitioner < Text, Text > { @Override public int getPartition(Text key, Text value, int numReduceTasks) { String[] str = value.toString().split("\t"); int age = Integer.parseInt(str[2]); if(numReduceTasks == 0) { return 0; } if(age<=20) { return 0; } else if(age>20 && age<=30) { return 1 % numReduceTasks; } else { return 2 % numReduceTasks; } } } @Override public int run(String[] arg) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "topsal"); job.setJarByClass(PartitionerExample.class); FileInputFormat.setInputPaths(job, new Path(arg[0])); FileOutputFormat.setOutputPath(job,new Path(arg[1])); job.setMapperClass(MapClass.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //set partitioner statement job.setPartitionerClass(CaderPartitioner.class); job.setReducerClass(ReduceClass.class); job.setNumReduceTasks(3); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true)? 0 : 1); return 0; } public static void main(String ar[]) throws Exception { int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar); System.exit(0); } }
Сохраните приведенный выше код как PartitionerExample.java в «/ home / hadoop / hadoopPartitioner». Компиляция и выполнение программы приведены ниже.
Компиляция и выполнение
Предположим, мы находимся в домашнем каталоге пользователя Hadoop (например, / home / hadoop).
Следуйте инструкциям ниже, чтобы скомпилировать и выполнить вышеуказанную программу.
Шаг 1 — Загрузите Hadoop-core-1.2.1.jar, который используется для компиляции и запуска программы MapReduce. Вы можете скачать банку с mvnrepository.com .
Допустим, загруженная папка имеет вид «/ home / hadoop / hadoopPartitioner»
Шаг 2 — Следующие команды используются для компиляции программы PartitionerExample.java и создания jar для программы.
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C .
Шаг 3 — Используйте следующую команду для создания входного каталога в HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Шаг 4 — Используйте следующую команду, чтобы скопировать входной файл с именем input.txt во входной каталог HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
Шаг 5 — Используйте следующую команду, чтобы проверить файлы во входном каталоге.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Шаг 6 — Используйте следующую команду, чтобы запустить приложение Top salary, взяв входные файлы из входного каталога.
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
Подождите некоторое время, пока файл не запустится. После выполнения выходные данные содержат несколько входных разбиений, задач сопоставления и задач редуктора.
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully 15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=467 FILE: Number of bytes written=426777 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=480 HDFS: Number of bytes written=72 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Launched map tasks=1 Launched reduce tasks=3 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=8212 Total time spent by all reduces in occupied slots (ms)=59858 Total time spent by all map tasks (ms)=8212 Total time spent by all reduce tasks (ms)=59858 Total vcore-seconds taken by all map tasks=8212 Total vcore-seconds taken by all reduce tasks=59858 Total megabyte-seconds taken by all map tasks=8409088 Total megabyte-seconds taken by all reduce tasks=61294592 Map-Reduce Framework Map input records=13 Map output records=13 Map output bytes=423 Map output materialized bytes=467 Input split bytes=119 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=467 Reduce input records=13 Reduce output records=6 Spilled Records=26 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=224 CPU time spent (ms)=3690 Physical memory (bytes) snapshot=553816064 Virtual memory (bytes) snapshot=3441266688 Total committed heap usage (bytes)=334102528 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=361 File Output Format Counters Bytes Written=72
Шаг 7 — Используйте следующую команду для проверки результирующих файлов в выходной папке.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Вы найдете выходные данные в трех файлах, потому что вы используете в своей программе три секционера и три редуктора.
Шаг 8 — Используйте следующую команду, чтобы увидеть выходные данные в файле Part-00000 . Этот файл создан HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Выход в Part-00000
Female 15000 Male 40000
Используйте следующую команду, чтобы увидеть выходные данные в файле Part-00001 .
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Выход в Part-00001
Female 35000 Male 31000
Используйте следующую команду, чтобы увидеть выходные данные в файле Part-00002 .
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Выход в Part-00002