Статьи

Начало работы с Hadoop MapReduce


Платформа Hadoop MapReduce позволяет параллельно обрабатывать большие данные на больших кластерах аппаратного обеспечения.

[Обработка большого файла последовательно сверху вниз может быть очень трудоемкой задачей, вместо этого, вкратце, MapReduce разбивает этот большой файл на куски и обрабатывает параллельно.]

Небольшая заметка о HDFS

HDFS, распределенная файловая система Hadoop, является отказоустойчивым распределенным хранилищем, которое отвечает за хранение больших данных на больших кластерах стандартного оборудования.

HDFS разбивает данные на блоки по умолчанию размером 128 МБ (вы можете настроить это значение с помощью свойства dfs.blocksize в файле hdfs-default.xml) по размеру и хранить на нескольких узлах (в их локальной файловой системе) в кластере. Каждый блок по умолчанию реплицируется 3 раза (это значение можно настроить с помощью свойства dfs.replication в файле hdfs-default.xml) на разные узлы, чтобы обеспечить доступность.

NameNode

NameNode хранит метаданные файлов HDFS, но не хранит сами данные. Демон NameNode запускается на главном узле, и для каждого кластера Hadoop существует только один NameNode. NameNode работает в отдельном процессе JVM, в типичном производственном кластере есть отдельный узел, который выполняет процесс NameNode.

DataNode

DataNodes демоны, которые работают на подчиненных узлах, хранят блоки HDFS. На каждом подчиненном узле выполняется только один процесс DataNode. Узлы данных также работают на отдельных процессах JVM. Узлы данных периодически отправляют тактовые импульсы на узел имени, чтобы указать, что они живы. Узлы данных также могут общаться друг с другом, например, когда они общаются друг с другом во время репликации данных.

Когда клиент хочет выполнить операцию с файлом, он сначала связывается с NameNode, чтобы найти этот файл. Затем NameNode отправляет расположения узлов, на которых хранится файл, в виде блоков HDFS. Затем клиент может напрямую общаться с узлами данных и выполнять операции с файлом.

Демоны MapReduce

Есть два демонических процесса, которые мы должны рассмотреть; Демон JobTracker и демон TaskTracker.

JobTracker

Демон JobTracker, который работает на главном узле, отслеживает задания MapReduce. В каждом кластере Hadoop есть только один демон JobTracker. JobTracker запускается в отдельном процессе JVM, в типичном производственном кластере есть отдельный узел, который запускает процесс JobTracker.

TaskTracker

Демоны TaskTracker, которые работают на подчиненных узлах, обрабатывают задачи (отображают, уменьшают), полученные из JobTracker. Существует только один TaskTracker на каждый подчиненный узел. Каждый TaskTracker настроен с набором слотов, который определяет количество задач, которые он может обработать (TaskTracker может быть настроен для обработки нескольких карт и сокращения задач). TaskTracker запускает отдельные процессы JVM для каждой задачи, чтобы изолировать ее от проблем, вызванных задачами. Узлы данных периодически отправляют тактовые импульсы в JobTracker, чтобы указать, что они живы, и сообщить количество доступных слотов.

Запустив задание MapReduce, клиентское приложение отправляет задания в JobTracker. Затем JobTracker обращается к NameNode, чтобы найти необходимые блоки данных. Затем JobTracker выбирает TaskTrackers со свободными слотами, который работает на тех же узлах, которые содержат данные, или в той же стойке, что и данные. Затем TaskTrackers запускают отдельные процессы JVM для каждой задачи и отслеживают их, в то время как JobTracker контролирует TaskTrackers на предмет сбоев. Когда задача выполнена, TaskTracker информирует JobTracker.

И HDFS, и инфраструктура MapReduce работают на одном и том же наборе узлов, другими словами, узлы хранения (узлы данных в HDFS) и вычислительные узлы (узлы, на которых работают TaskTrackers) одинаковы. В Hadoop вычисления переносятся на данные, а не наоборот.

Входы и выходы

Каркас MapReduce работает с серией преобразований ключ / значение, где входные данные для задания MapReduce представляют собой набор пар {ключ, значение}, а выходные данные также представляют собой набор пар {ключ, значение}. Обратите внимание, что типы входных пар {ключ, значение} могут отличаться от типов выходных пар {ключ, значение}.


(вход) {k1, v1} -> карта -> {k2, список (v2)} -> уменьшить -> {k3, v3} (выход)

Каждый тип данных, который будет использоваться в качестве ключей, должен реализовывать интерфейсы Writable и Comparable, а каждый тип данных, который будет использоваться в качестве значений, должен реализовывать интерфейс Writable. Интерфейс с возможностью записи обеспечивает способ сериализации и десериализации данных в сети. Поскольку выходные данные сортируются по ключам средой, чтобы упростить процесс сортировки, только тип данных, который будет использоваться в качестве ключей, должен реализовывать сопоставимый интерфейс.

WordCount — пример программы MapReduce

Следующий пример программы MapReduce — это та же самая программа, которую вы найдете в
учебнике Hadoop MapReduce . Этот код работает со всеми тремя режимами; Автономный режим, псевдораспределенный режим и полностью распределенный режим.

посмотреть обычную
печать
?

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

 public static class TokenizerMapper
 extends Mapper<Object, Text, Text, IntWritable>{

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(Object key, Text value, Context context
    ) throws IOException, InterruptedException {
   StringTokenizer itr = new StringTokenizer(value.toString());
   while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
   }
  }
 }
 
 public static class IntSumReducer
 extends Reducer<Text,IntWritable,Text,IntWritable> {
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values,
    Context context
    ) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   result.set(sum);
   context.write(key, result);
  }
 }

 public static void main(String[] args) throws Exception {
// Creating a configuration object
  Configuration conf = new Configuration();
// Creating an instance of Job class
  Job job = Job.getInstance(conf, "word count");
// Setting the name of the main class within the jar file
  job.setJarByClass(WordCount.class);
// Setting the mapper class
  job.setMapperClass(TokenizerMapper.class);
 // Setting the combiner class
  job.setCombinerClass(IntSumReducer.class);
// Setting the reducer class
  job.setReducerClass(IntSumReducer.class);
// Setting the data type of the output(final) key
  job.setOutputKeyClass(Text.class);
// Setting the data type of the output(final) value
  job.setOutputValueClass(IntWritable.class);
// Setting input file path, the 1st argument passed in to the main method is used.
  FileInputFormat.addInputPath(job, new Path(args[0]));
// Setting output file path,the 2nd argument passed in to the main method is used.
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Running the job and wait for it to get completed
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
 
}

Для выполнения задания MapReduce нам нужна реализация картографа и реализация редуктора. Как вы можете видеть в приведенном выше коде, классы Mapper и Reducer определены как внутренние классы. Также конфигурация задания MapReduce происходит в его основном методе, в основном с использованием экземпляра класса Job. Вы можете просмотреть код WordCount.java и просмотреть комментарии, которые я там добавил, чтобы понять эти конфигурации.

картопостроитель

В примере WordCount вы можете видеть, что реализация mapper названа как «TokenizerMapper», что расширяет базовый класс Mapper, предоставляемый Hadoop, и переопределяет его метод map. Как видите, метод карты имеет три параметра:

  1. Клавиша ввода
  2. Входное значение
  3. Экземпляр класса Context, который используется для выдачи результатов.

Mapper выполняется один раз для каждой строки текста, и каждый раз, когда эта строка текста разбивается на слова, он генерирует серию новых пар ключ / значение в форме {word, 1}, используя объект context.

Разметка, перемешивание и сортировка

Секционирование

Hadoop гарантирует, что все промежуточные записи с одним и тем же ключом попадут в один и тот же редуктор. Разделителем по умолчанию, используемым платформой MapReduce, является HashPartitioner.

Shuffle и Sort

MapReduce обеспечивает сортировку входных данных редуктора по ключу. Фазы перемешивания и сортировки происходят одновременно. Это процесс выполнения сортировки и передачи промежуточных выходов картографа в редукторы в качестве входных данных. Когда выходные данные выбираются редуктором, они объединяются.

Сумматор

Hadoop позволяет использовать дополнительный класс Combiner для работы с выходами Mapper. Указывая класс Combiner, каждый вывод mapper будет проходить через локальный Combiner и будет выполнять сортировку по ключам, локальную агрегацию по ним. Выход Combiner создает вход для редуктора. Класс Combiner — это оптимизация, поэтому нет гарантии, сколько раз он будет работать на выходе маппера, он может быть равен нулю, одному или нескольким разам. Поэтому при указании Combiner мы должны быть абсолютно уверены, что задание будет выдавать один и тот же вывод из редуктора независимо от того, сколько раз Combiner работает на выходе преобразователя. В примере WordCount указан комбинатор, аналогичный редуктору.

редуктор

В примере WordCount вы можете видеть, что реализация редуктора называется IntSumReducer, что расширяет базовый класс Reducer, предоставляемый Hadoop, и переопределяет его метод Reduce. Как видите, метод Reduce имеет три параметра:

  1. Клавиша ввода
  2. Ввод списка значений в виде объекта Iterable
  3. Экземпляр класса Context, который используется для выдачи результатов.

Обратите внимание, что каждый преобразователь генерирует серию пар ключ / значение, а в промежуточной фазе тасования и сортировки эти отдельные пары ключ / значение объединяются в серию пар ключ / список (значение), которые вводятся в редукторы. Редуктор выполняется один раз для каждого ключа (слова). В примере WordCount редуктор вычисляет сумму значений в объекте Iterable и выдает результаты для каждого слова в виде {word, sum}.

Давайте рассмотрим пример. Предположим, у нас есть два входных файла, один из которых содержит слово «Hello World Bye World», а другой содержит слово «Bye World Bye». В этом конкретном случае;

W / O Combiner

1st Mapper испускает;

    {Hello, 1} 
    {World, 1} 
    {Bye, 1} 
    {World, 1} 

2-й Mapper испускает;

    {Bye, 1} 
    {World, 1} 
    {Bye, 1} 

После фазы перемешивания и сортировки, ввод в редуктор;

    {Bye, (1,1,1)} 
    {Hello,1} 
    {World, (1,1,1)} 

Редуктор испускает;

    {Bye, 3} 
    {Hello,1} 
    {World, 3} 

С Combiner

1-й Mapper испускает;

    {Hello, 1} 
    {World, 1} 
    {Bye, 1} 
    {World, 1} 

2-й Mapper испускает;

    {Bye, 1} 
    {World, 1} 
    {Bye, 1} 

Combiner выполняет локальную агрегацию, а выходные данные сопоставления сортируются по ключам;

Для 1-го картографа;

    {Bye, 1} 
    {Hello, 1} 
    {World, 2} 

Для 2-го картографа;

    {Bye, 2} 
    {World, 1} 

Редуктор испускает;

    {Bye, 3} 
    {Hello, 1} 
    {World, 3} 

Выполнение задания MapReduce

  1. Добавьте classpath hadoop к вашему classpath с помощью следующей команды.
        $export CLASSPATH=`hadoop classpath`:$CLASSPATH 
  2. Теперь скомпилируйте WordCount.java, используя следующую команду.
        $javac WordCount.java 
  3. Создайте файл фляги работы.

  4. Если вы используете автономный режим для запуска задания, вы можете просто использовать следующую команду.

    Как вы можете видеть, есть четыре аргумента этой команды,

    1. Имя файла фляги
    2. Имя основного класса в файле jar.
    3. Расположение входного файла на вашем локальном компьютере.

      Просмотр входов

          $ls input 
          ## file01 file02 
          $cat input/file01 
          ## Hello World Bye World
          $cat input/file02 
          ## Bye World Bye
    4. Расположение выходного файла.

    Чтобы просмотреть вывод, используйте следующую команду,

        $cat output/part-r-00000 

    В случае успешного выполнения задания, следующим должен быть вывод.

        Bye 3 
        Hello 1 
        World 3 
  5. Если вы используете псевдораспределенный режим для запуска задания, сначала поместите банку задания в желаемое место (я использовал домашнюю HDFS) на HDFS, используя следующую команду.
            $hdfs dfs -put wc.jar /user/pavithra 

    Вы можете использовать следующую команду для запуска задания.

    Как вы можете видеть, есть четыре аргумента этой команды,

    1. Имя файла фляги
    2. Имя основного класса в файле jar.
    3. Расположение входного файла в HDFS. Это относительно вашего домашнего каталога в HDFS. В моем случае полный путь к домашней директории будет / user / pavithra / input.

      Просмотр входов

          $hdfs dfs -ls input 
          ## file01 file02 
          $hdfs dfs -cat input/file01 
          ## Hello World Bye World
          $hdfs dfs -cat input/file02 
          ## Bye World Bye
    4. Расположение выходного файла. Это также относительно вашего домашнего каталога в HDFS. Полный путь будет в моем случае, user / pavithra / output

    Чтобы просмотреть вывод, используйте следующую команду,

        hdfs dfs -cat output/part-r-00000 

    При успешном выполнении задания следует следующий вывод.