Статьи

Работа с интенсивной обработкой текста с помощью MapReduce


Прошло много времени с тех пор, как я последний раз писал, так как я был занят некоторыми из курсов, предлагаемых
Coursera . Есть несколько очень интересных предложений, которые стоит посмотреть. Некоторое время назад я приобрел
Data-Intensive Processing с MapReduce от Jimmy Lin и Chris Dyer. В книге представлено несколько ключевых алгоритмов MapReduce, но в формате псевдокода. Моя цель состоит в том, чтобы взять алгоритмы, представленные в главах 3-6, и реализовать их в Hadoop, используя
Hadoop: полное руководство Тома Уайта в качестве справочного материала. Я собираюсь предположить, что вы знакомы с Hadoop и MapReduce, а не охватывать вводный материал. Итак, давайте перейдем к главе 3 — Разработка алгоритма MapReduce, начиная с локальной агрегации.

Местная агрегация

На очень высоком уровне, когда Mappers генерируют данные, промежуточные результаты записываются на диск, а затем отправляются по сети в редукторы для окончательной обработки. Задержка записи на диск и последующей передачи данных по сети — дорогостоящая операция при обработке задания MapReduce. Поэтому вполне понятно, что, когда это возможно, уменьшение объема данных, отправляемых из картографических систем, увеличило бы скорость задания MapReduce. Локальная агрегация — это метод, используемый для уменьшения объема данных и повышения эффективности нашей работы MapReduce. Локальная агрегация не может заменить редукторы, так как нам нужен способ сбора результатов с одним и тем же ключом из разных картографов. Мы рассмотрим 3 способа достижения локальной агрегации:

  1. Использование функций Hadoop Combiner.
  2. Два подхода к комбинированию «in-mapper» представлены в книге «Обработка текста с MapReduce».

Конечно, любая оптимизация будет иметь компромиссы, и мы также обсудим их.
Чтобы продемонстрировать локальную агрегацию, мы запустим вездесущее задание подсчета слов в простой текстовой версии A Christmas Carol Чарльза Диккенса (загруженной из Project Gutenberg ) в псевдораспределенном кластере, установленном на моем MacBookPro, с использованием hadoop-0.20.2-cdh3u3 раздача от Cloudera . Я планирую в будущем посте провести такой же эксперимент на кластере EC2 с более реалистичными данными.

Сумматоры

Функция объединения — это объект, который расширяет класс Reducer. Фактически, для наших примеров здесь мы собираемся повторно использовать тот же редуктор, который использовался в задании подсчета слов. Функция объединения указывается при настройке задания MapReduce следующим образом:

job.setReducerClass(TokenCountReducer.class);

Вот код редуктора:

public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
              count+= value.get();
        }
        context.write(key,new IntWritable(count));
    }
}

Задача объединителя состоит в том, чтобы сделать именно то, что подразумевает название, объединенные данные с чистым результатом меньшего количества данных начинают перетасовываться по сети, что дает нам повышение эффективности. Как указывалось ранее, имейте в виду, что редукторы по-прежнему должны объединять результаты с использованием одних и тех же ключей от разных картографов. Поскольку функции объединителя являются оптимизацией, среда Hadoop не дает никаких гарантий относительно того, сколько раз будет вызываться объединитель, если вообще будет.

Вариант объединения картографов 1

Первая альтернатива использованию Combiners (рис. 3.2 на стр. 41) очень прямолинейна и вносит небольшую модификацию в наш оригинальный механизм подсчета слов:

public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        IntWritable writableCount = new IntWritable();
        Text text = new Text();
        Map<String,Integer> tokenMap = new HashMap<String, Integer>();
        StringTokenizer tokenizer = new StringTokenizer(value.toString());

        while(tokenizer.hasMoreElements()){
            String token = tokenizer.nextToken();
            Integer count = tokenMap.get(token);
            if(count == null) count = new Integer(0);
            count+=1;
            tokenMap.put(token,count);
        }

        Set<String> keys = tokenMap.keySet();
        for (String s : keys) {
             text.set(s);
             writableCount.set(tokenMap.get(s));
             context.write(text,writableCount);
        }
    }
}

Как мы видим здесь, вместо испускания слова со счетом 1 для каждого встреченного слова мы используем карту для отслеживания каждого слова, уже обработанного. Затем, когда все токены обработаны, мы перебираем карту и выводим общее количество для каждого слова, встречающегося в этой строке.

В Mapper Combining Option 2

Второй вариант комбинирования картографов (рисунок 3.3 на стр. 41) очень похож на приведенный выше пример с двумя отличиями — когда создается хэш-карта и когда мы генерируем результаты, содержащиеся в карте. В приведенном выше примере создается карта, и ее содержимое сбрасывается по сети для каждого вызова метода карты. В этом примере мы собираемся сделать карту переменной экземпляра и перенести создание экземпляра карты на метод setUp в нашем преобразователе. Аналогично, содержимое карты не будет отправлено редукторам до тех пор, пока не будут завершены все вызовы mapper и не будет вызван метод cleanUp.

public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

    private  Map<String,Integer> tokenMap;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
           tokenMap = new HashMap<String, Integer>();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
        while(tokenizer.hasMoreElements()){
            String token = tokenizer.nextToken();
            Integer count = tokenMap.get(token);
            if(count == null) count = new Integer(0);
            count+=1;
            tokenMap.put(token,count);
        }
    }


    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        IntWritable writableCount = new IntWritable();
        Text text = new Text();
        Set<String> keys = tokenMap.keySet();
        for (String s : keys) {
            text.set(s);
            writableCount.set(tokenMap.get(s));
            context.write(text,writableCount);
        }
    }
}

Как видно из приведенного выше примера кода, картограф отслеживает уникальное количество слов во всех вызовах метода map. Отслеживая уникальные токены и их количество, необходимо значительно сократить количество записей, отправляемых редукторам, что, в свою очередь, должно сократить время выполнения задания MapReduce. Это выполняет тот же эффект, что и использование функции функции объединения, предоставляемой платформой MapReduce, но в этом случае вы гарантированно вызовете код объединения. Но есть некоторые предостережения с этим подходом. Сохранение состояния при вызовах карты может оказаться проблематичным и определенно является нарушением функционального духа функции «карты». Кроме того, поддерживая состояние всех картографов, в зависимости от данных, используемых в задании, память может стать еще одной проблемой, с которой приходится сталкиваться. В конечном счете,нужно было бы взвесить все компромиссы, чтобы определить лучший подход.

Полученные результаты

Теперь давайте посмотрим на некоторые результаты различных картографов. Поскольку задание выполнялось в псевдораспределенном режиме, фактическое время выполнения не имеет значения, но мы все же можем сделать вывод о том, как использование локальной агрегации может повлиять на эффективность задания MapReduce в реальном кластере.

В Token Mapper:

12/09/13 21:25:32 INFO mapred.JobClient:     Reduce shuffle bytes=366010
12/09/13 21:25:32 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:25:32 INFO mapred.JobClient:     Spilled Records=63118
12/09/13 21:25:32 INFO mapred.JobClient:     Map output bytes=302886

В варианте сокращения Mapper 1:

12/09/13 21:28:15 INFO mapred.JobClient:     Reduce shuffle bytes=354112
12/09/13 21:28:15 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:28:15 INFO mapred.JobClient:     Spilled Records=60704
12/09/13 21:28:15 INFO mapred.JobClient:     Map output bytes=293402

В варианте сокращения Mapper 2:

12/09/13 21:30:49 INFO mapred.JobClient:     Reduce shuffle bytes=105885
12/09/13 21:30:49 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:30:49 INFO mapred.JobClient:     Spilled Records=15314
12/09/13 21:30:49 INFO mapred.JobClient:     Map output bytes=90565

Опция Combiner:

12/09/13 21:22:18 INFO mapred.JobClient:     Reduce shuffle bytes=105885
12/09/13 21:22:18 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:22:18 INFO mapred.JobClient:     Spilled Records=15314
12/09/13 21:22:18 INFO mapred.JobClient:     Map output bytes=302886
12/09/13 21:22:18 INFO mapred.JobClient:     Combine input records=31559
12/09/13 21:22:18 INFO mapred.JobClient:     Combine output records=7657

Как и ожидалось, Mapper, который не объединял, имел худшие результаты, за которым следовал первый вариант объединения в Mapper (хотя эти результаты можно было бы улучшить, если бы данные были очищены перед выполнением подсчета слов). Второй вариант объединения в картографе и функция объединителя дали практически идентичные результаты. Существенным фактом является то, что оба произвели на 2/3 меньшеуменьшить количество случайных байтов в качестве первых двух вариантов. Сокращение количества байтов, отправляемых по сети редукторам на эту сумму, несомненно, положительно скажется на эффективности задания MapReduce. Здесь нужно помнить об одном: объединение Combiners / In-Mapper может использоваться не только во всех заданиях MapReduce, в этом случае подсчет слов очень хорошо подходит для такого улучшения, но это не всегда может быть правдой. ,

Вывод

Как вы видите, преимущества использования объединения в картографическом режиме или функции объединителя Hadoop требуют серьезного рассмотрения при повышении производительности ваших заданий MapReduce. Что касается того, какой подход, то вам решать, какие компромиссы для каждого подхода.

Ресурсы