Учебники

Hadoop — MapReduce

MapReduce — это фреймворк, с помощью которого мы можем писать приложения для параллельной обработки огромных объемов данных на больших кластерах стандартного оборудования.

Что такое MapReduce?

MapReduce — это технология обработки и программная модель для распределенных вычислений на основе Java. Алгоритм MapReduce содержит две важные задачи, а именно Map и Reduce. Карта берет набор данных и преобразует его в другой набор данных, где отдельные элементы разбиваются на кортежи (пары ключ / значение). Во-вторых, задача сокращения, которая принимает выходные данные карты в качестве входных данных и объединяет эти кортежи данных в меньший набор кортежей. Как следует из последовательности имени MapReduce, задача сокращения всегда выполняется после задания карты.

Основным преимуществом MapReduce является простота масштабирования обработки данных на нескольких вычислительных узлах. В модели MapReduce примитивы обработки данных называются преобразователями и преобразователями. Разложение приложения обработки данных на картографы и редукторы иногда бывает нетривиальным. Но как только мы напишем приложение в форме MapReduce, масштабирование приложения для запуска более сотни, тысяч или даже десятков тысяч машин в кластере — это просто изменение конфигурации. Эта простая масштабируемость привлекла многих программистов для использования модели MapReduce.

Алгоритм

  • Обычно парадигма MapReduce основана на отправке компьютера туда, где хранятся данные!

  • Программа MapReduce выполняется в три этапа, а именно: этап отображения, этап перемешивания и этап сокращения.

    • Этап карты — работа карты или картографа заключается в обработке входных данных. Обычно входные данные находятся в форме файла или каталога и хранятся в файловой системе Hadoop (HDFS). Входной файл передается в функцию картографа построчно. Картограф обрабатывает данные и создает несколько небольших порций данных.

    • Этап сокращения — этот этап является комбинацией этапа перемешивания и этапа уменьшения . Работа редуктора заключается в обработке данных, поступающих от картографа. После обработки он создает новый набор выходных данных, который будет храниться в HDFS.

  • Во время задания MapReduce Hadoop отправляет задачи Map и Reduce на соответствующие серверы в кластере.

  • Каркас управляет всеми деталями передачи данных, такими как выдача задач, проверка завершения задач и копирование данных вокруг кластера между узлами.

  • Большая часть вычислений происходит на узлах с данными на локальных дисках, что снижает сетевой трафик.

  • После выполнения данных задач кластер собирает и сокращает данные, чтобы сформировать соответствующий результат, и отправляет их обратно на сервер Hadoop.

Обычно парадигма MapReduce основана на отправке компьютера туда, где хранятся данные!

Программа MapReduce выполняется в три этапа, а именно: этап отображения, этап перемешивания и этап сокращения.

Этап карты — работа карты или картографа заключается в обработке входных данных. Обычно входные данные находятся в форме файла или каталога и хранятся в файловой системе Hadoop (HDFS). Входной файл передается в функцию картографа построчно. Картограф обрабатывает данные и создает несколько небольших порций данных.

Этап сокращения — этот этап является комбинацией этапа перемешивания и этапа уменьшения . Работа редуктора заключается в обработке данных, поступающих от картографа. После обработки он создает новый набор выходных данных, который будет храниться в HDFS.

Во время задания MapReduce Hadoop отправляет задачи Map и Reduce на соответствующие серверы в кластере.

Каркас управляет всеми деталями передачи данных, такими как выдача задач, проверка завершения задач и копирование данных вокруг кластера между узлами.

Большая часть вычислений происходит на узлах с данными на локальных дисках, что снижает сетевой трафик.

После выполнения данных задач кластер собирает и сокращает данные, чтобы сформировать соответствующий результат, и отправляет их обратно на сервер Hadoop.

Алгоритм MapReduce

Входы и выходы (перспектива Java)

Каркас MapReduce работает с парами <ключ, значение>, то есть каркас просматривает входные данные для задания в виде набора пар <ключ, значение> и создает набор пар <ключ, значение> в качестве выходных данных задания. , возможно, разных типов.

Классы ключей и значений должны быть сериализованы способом и, следовательно, должны реализовывать интерфейс Writable. Кроме того, ключевые классы должны реализовывать интерфейс Writable-Comparable для облегчения сортировки в рамках. Типы ввода и вывода задания MapReduce — (Входные данные) <k1, v1> → карта → <k2, v2> → уменьшить → <k3, v3> (Выходные данные).

вход Выход
карта <k1, v1> список (<k2, v2>)
уменьшить <k2, список (v2)> список (<k3, v3>)

терминология

  • PayLoad — приложения реализуют функции Map и Reduce и составляют основу работы.

  • Mapper — Mapper отображает пары ключ / значение на набор промежуточных пар ключ / значение.

  • NamedNode — узел, управляющий распределенной файловой системой Hadoop (HDFS).

  • DataNode — Узел, в котором данные представляются заранее, до начала какой-либо обработки.

  • MasterNode — узел, где работает JobTracker и который принимает запросы на работу от клиентов.

  • SlaveNode — узел, в котором работает программа Map and Reduce.

  • JobTracker — Планирует задания и отслеживает назначения заданий для трекера задач.

  • Task Tracker — отслеживает задачу и сообщает о статусе в JobTracker.

  • Задание — программа — это выполнение картографа и редуктора в наборе данных.

  • Задача — выполнение Mapper или Reducer на срезе данных.

  • Попытка задачи — конкретный случай попытки выполнить задачу на подчиненном узле.

PayLoad — приложения реализуют функции Map и Reduce и составляют основу работы.

Mapper — Mapper отображает пары ключ / значение на набор промежуточных пар ключ / значение.

NamedNode — узел, управляющий распределенной файловой системой Hadoop (HDFS).

DataNode — Узел, в котором данные представляются заранее, до начала какой-либо обработки.

MasterNode — узел, где работает JobTracker и который принимает запросы на работу от клиентов.

SlaveNode — узел, в котором работает программа Map and Reduce.

JobTracker — Планирует задания и отслеживает назначения заданий для трекера задач.

Task Tracker — отслеживает задачу и сообщает о статусе в JobTracker.

Задание — программа — это выполнение картографа и редуктора в наборе данных.

Задача — выполнение Mapper или Reducer на срезе данных.

Попытка задачи — конкретный случай попытки выполнить задачу на подчиненном узле.

Пример сценария

Ниже приведены данные о потреблении электроэнергии в организации. Он содержит ежемесячное потребление электроэнергии и среднегодовое значение за разные годы.

январь февраль март апрель май июнь июль август сентябрь октябрь ноябрь декабрь в среднем
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

Если вышеуказанные данные приведены в качестве входных данных, мы должны написать приложения для их обработки и получения таких результатов, как определение года максимального использования, года минимального использования и так далее. Это прогулка для программистов с конечным числом записей. Они просто напишут логику для получения требуемого вывода и передадут данные в написанное приложение.

Но подумайте о данных, представляющих потребление электроэнергии всеми крупными отраслями определенного государства, с момента его образования.

Когда мы пишем приложения для обработки таких массовых данных,

  • Они займут много времени, чтобы выполнить.

  • Будет большой сетевой трафик, когда мы перемещаем данные с источника на сетевой сервер и так далее.

Они займут много времени, чтобы выполнить.

Будет большой сетевой трафик, когда мы перемещаем данные с источника на сетевой сервер и так далее.

Для решения этих проблем у нас есть инфраструктура MapReduce.

Входные данные

Приведенные выше данные сохраняются как sample.txt и передаются в качестве входных данных. Входной файл выглядит так, как показано ниже.

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

Пример программы

Ниже приведена программа для образца данных с использованием инфраструктуры MapReduce.

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
} 

Сохраните вышеуказанную программу как ProcessUnits.java. Компиляция и выполнение программы объяснены ниже.

Компиляция и выполнение программы Process Units

Предположим, мы находимся в домашнем каталоге пользователя Hadoop (например, / home / hadoop).

Следуйте инструкциям ниже, чтобы скомпилировать и выполнить вышеуказанную программу.

Шаг 1

Следующая команда должна создать каталог для хранения скомпилированных классов Java.

$ mkdir units 

Шаг 2

Загрузите Hadoop-core-1.2.1.jar, который используется для компиляции и запуска программы MapReduce. Посетите следующую ссылку mvnrepository.com, чтобы скачать банку. Давайте предположим, что загруженная папка — / home / hadoop /.

Шаг 3

Следующие команды используются для компиляции программы ProcessUnits.java и создания jar для программы.

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ . 

Шаг 4

Следующая команда используется для создания входного каталога в HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir 

Шаг 5

Следующая команда используется для копирования входного файла с именем sample.txt во входной каталог HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 

Шаг 6

Следующая команда используется для проверки файлов во входном каталоге.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/ 

Шаг 7

Следующая команда используется для запуска приложения Eleunit_max путем извлечения входных файлов из входного каталога.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 

Подождите некоторое время, пока файл не будет выполнен. После выполнения, как показано ниже, выходные данные будут содержать количество входных разбиений, количество задач Map, количество задач редуктора и т. Д.

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40 

Шаг 8

Следующая команда используется для проверки результирующих файлов в выходной папке.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/ 

Шаг 9

Следующая команда используется для просмотра выходных данных в файле Part-00000 . Этот файл создан HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 

Ниже вывод, сгенерированный программой MapReduce.

1981    34 
1984    40 
1985    45 

Шаг 10

Следующая команда используется для копирования выходной папки из HDFS в локальную файловую систему для анализа.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 

Важные команды

Все команды Hadoop вызываются командой $ HADOOP_HOME / bin / hadoop . При запуске сценария Hadoop без каких-либо аргументов выводится описание всех команд.

Использование — hadoop [—config confdir] КОМАНДА

В следующей таблице перечислены доступные параметры и их описание.

Sr.No. Вариант и описание
1

наменоде -формат

Форматирует файловую систему DFS.

2

secondarynamenode

Запускает вторичный наменод DFS.

3

NameNode

Запускает DFS наменоде.

4

DataNode

Запускает данные DFS.

5

dfsadmin

Запускает клиент администратора DFS.

6

mradmin

Запускает клиент администрирования Map-Reduce.

7

Fsck

Запускает утилиту проверки файловой системы DFS.

8

фс

Запускает универсальный пользовательский клиент файловой системы.

9

балансер

Запускает утилиту балансировки кластера.

10

OIV

Применяет офлайн-просмотрщик fsimage к fsimage.

11

fetchdt

Извлекает токен делегирования из NameNode.

12

JobTracker

Запускает узел отслеживания заданий MapReduce.

13

трубы

Запускает работу Pipes.

14

TaskTracker

Запускает узел отслеживания задач MapReduce.

15

historyserver

Запускает серверы истории заданий как автономный демон.

16

работа

Управляет заданиями MapReduce.

17

очередь

Получает информацию о JobQueues.

18

версия

Печатает версию.

19

баночка <баночка>

Запускает файл JAR.

20

distcp <srcurl> <desturl>

Копирует файл или каталоги рекурсивно.

21

distcp2 <srcurl> <desturl>

DistCp версия 2.

22

archive -archiveName NAME -p <родительский путь> <src> * <dest>

Создает архив hadoop.

23

Путь к классам

Печатает путь к классу, необходимый для получения jar Hadoop и необходимых библиотек.

24

daemonlog

Получить / установить уровень журнала для каждого демона

наменоде -формат

Форматирует файловую систему DFS.

secondarynamenode

Запускает вторичный наменод DFS.

NameNode

Запускает DFS наменоде.

DataNode

Запускает данные DFS.

dfsadmin

Запускает клиент администратора DFS.

mradmin

Запускает клиент администрирования Map-Reduce.

Fsck

Запускает утилиту проверки файловой системы DFS.

фс

Запускает универсальный пользовательский клиент файловой системы.

балансер

Запускает утилиту балансировки кластера.

OIV

Применяет офлайн-просмотрщик fsimage к fsimage.

fetchdt

Извлекает токен делегирования из NameNode.

JobTracker

Запускает узел отслеживания заданий MapReduce.

трубы

Запускает работу Pipes.

TaskTracker

Запускает узел отслеживания задач MapReduce.

historyserver

Запускает серверы истории заданий как автономный демон.

работа

Управляет заданиями MapReduce.

очередь

Получает информацию о JobQueues.

версия

Печатает версию.

баночка <баночка>

Запускает файл JAR.

distcp <srcurl> <desturl>

Копирует файл или каталоги рекурсивно.

distcp2 <srcurl> <desturl>

DistCp версия 2.

archive -archiveName NAME -p <родительский путь> <src> * <dest>

Создает архив hadoop.

Путь к классам

Печатает путь к классу, необходимый для получения jar Hadoop и необходимых библиотек.

daemonlog

Получить / установить уровень журнала для каждого демона

Как взаимодействовать с MapReduce Jobs

Использование — задание hadoop [GENERIC_OPTIONS]

Ниже приведены общие параметры, доступные в задании Hadoop.

-submit <файл-задания>

Представляет работу.

-status <идентификатор работы>

Печатает карту и уменьшает процент выполнения и все счетчики заданий.

-counter <идентификатор-задания> <имя-группы> <имя-счетчика>

Печатает значение счетчика.

-kill <идентификатор работы>

Убивает работу.

-events <идентификатор-задания> <fromevent — #> <# — of-events>

Печатает детали событий, полученные трекером для заданного диапазона.

-history [все] <jobOutputDir> — история <jobOutputDir>

Печатает сведения о работе, ошибки и убитые подсказки Более подробную информацию о задании, например об успешных заданиях и попытках выполнения заданий для каждой задачи, можно просмотреть, указав параметр [все].

-лист [все]

Отображает все вакансии. -list отображает только задания, которые еще не завершены.

-kill-task <идентификатор задачи>

Убивает задачу. Убитые задачи НЕ учитываются при неудачных попытках.

-fail-task <идентификатор задачи>

Сбой задачи Неудачные задачи учитываются как неудачные попытки.

-set-priority <идентификатор-задания> <приоритет>

Изменяет приоритет задания. Допустимые значения приоритета: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW