Статьи

MapReduce: работа с интенсивной обработкой текста

Прошло много времени с тех пор, как я последний раз писал, так как я был занят некоторыми из курсов, предлагаемых Coursera . Есть несколько очень интересных предложений, которые стоит посмотреть. Некоторое время назад я приобрел Data-Intensive Processing с MapReduce от Jimmy Lin и Chris Dyer. В книге представлено несколько ключевых алгоритмов MapReduce, но в формате псевдокода. Моя цель состоит в том, чтобы взять алгоритмы, представленные в главах 3-6, и реализовать их в Hadoop, используя Hadoop: полное руководство Тома Уайта в качестве справочного материала. Я собираюсь предположить, что вы знакомы с Hadoop и MapReduce, а не охватывать вводный материал. Итак, давайте перейдем к главе 3 — Разработка алгоритма MapReduce, начиная с локальной агрегации.

Местная агрегация

На очень высоком уровне, когда Mappers генерируют данные, промежуточные результаты записываются на диск, а затем отправляются по сети в редукторы для окончательной обработки. Задержка записи на диск и последующей передачи данных по сети — дорогостоящая операция при обработке задания MapReduce. Поэтому вполне понятно, что, когда это возможно, уменьшение объема данных, отправляемых из картографических систем, увеличило бы скорость задания MapReduce. Локальная агрегация — это метод, используемый для уменьшения объема данных и повышения эффективности нашей работы MapReduce. Локальная агрегация не может заменить редукторы, так как нам нужен способ сбора результатов с одним и тем же ключом из разных картографов. Мы рассмотрим 3 способа достижения локальной агрегации:

  1. Использование функций Hadoop Combiner.
  2. Два подхода к комбинированию «in-mapper» представлены в книге «Обработка текста с MapReduce».

Конечно, любая оптимизация будет иметь компромиссы, и мы также обсудим их.
Чтобы продемонстрировать локальную агрегацию, мы запустим вездесущее задание подсчета слов в простой текстовой версии A Christmas Carol Чарльза Диккенса (загруженной из Project Gutenberg ) в псевдораспределенном кластере, установленном на моем MacBookPro, с использованием hadoop-0.20.2-cdh3u3 раздача от Cloudera . Я планирую в будущем посте провести такой же эксперимент на кластере EC2 с более реалистичными данными.

Сумматоры

Функция объединения — это объект, который расширяет класс Reducer. Фактически, для наших примеров здесь мы собираемся повторно использовать тот же редуктор, который использовался в задании подсчета слов. Функция объединения указывается при настройке задания MapReduce следующим образом:

1
job.setReducerClass(TokenCountReducer.class);

Вот код редуктора:

01
02
03
04
05
06
07
08
09
10
public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
              count+= value.get();
        }
        context.write(key,new IntWritable(count));
    }
}

Задача объединителя состоит в том, чтобы сделать именно то, что подразумевает название, объединенные данные с чистым результатом меньшего количества данных начинают перетасовываться по сети, что дает нам повышение эффективности. Как указывалось ранее, имейте в виду, что редукторы по-прежнему должны объединять результаты с использованием одних и тех же ключей от разных картографов. Поскольку функции объединителя являются оптимизацией, среда Hadoop не дает никаких гарантий относительно того, сколько раз будет вызываться объединитель, если вообще будет.

Вариант объединения картографов 1

Первая альтернатива использованию Combiners (рис. 3.2 на стр. 41) очень прямолинейна и вносит небольшую модификацию в наш оригинальный механизм подсчета слов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        IntWritable writableCount = new IntWritable();
        Text text = new Text();
        Map<String,Integer> tokenMap = new HashMap<String, Integer>();
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
 
        while(tokenizer.hasMoreElements()){
            String token = tokenizer.nextToken();
            Integer count = tokenMap.get(token);
            if(count == null) count = new Integer(0);
            count+=1;
            tokenMap.put(token,count);
        }
 
        Set<String> keys = tokenMap.keySet();
        for (String s : keys) {
             text.set(s);
             writableCount.set(tokenMap.get(s));
             context.write(text,writableCount);
        }
    }
}

Как мы видим здесь, вместо испускания слова со счетом 1 для каждого встреченного слова мы используем карту для отслеживания каждого слова, уже обработанного. Затем, когда все токены обработаны, мы перебираем карту и выводим общее количество для каждого слова, встречающегося в этой строке.

В Mapper Combining Option 2

Второй вариант комбинирования картографов (рис. 3.3 на стр. 41) очень похож на приведенный выше пример с двумя отличиями — когда создается хэш-карта и когда мы генерируем результаты, содержащиеся в карте. В приведенном выше примере создается карта, и ее содержимое сбрасывается по сети для каждого вызова метода карты. В этом примере мы собираемся сделать карту переменной экземпляра и перенести создание экземпляра карты на метод setUp в нашем преобразователе. Аналогично, содержимое карты не будет отправлено редукторам до тех пор, пока не будут завершены все вызовы mapper и не будет вызван метод cleanUp.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
 
    private  Map<String,Integer> tokenMap;
 
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
           tokenMap = new HashMap<String, Integer>();
    }
 
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
        while(tokenizer.hasMoreElements()){
            String token = tokenizer.nextToken();
            Integer count = tokenMap.get(token);
            if(count == null) count = new Integer(0);
            count+=1;
            tokenMap.put(token,count);
        }
    }
 
 
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        IntWritable writableCount = new IntWritable();
        Text text = new Text();
        Set<String> keys = tokenMap.keySet();
        for (String s : keys) {
            text.set(s);
            writableCount.set(tokenMap.get(s));
            context.write(text,writableCount);
        }
    }
}

Как видно из приведенного выше примера кода, картограф отслеживает уникальное количество слов во всех вызовах метода map. Отслеживая уникальные токены и их количество, необходимо значительно сократить количество записей, отправляемых редукторам, что, в свою очередь, должно сократить время выполнения задания MapReduce. Это выполняет тот же эффект, что и использование функции функции объединения, предоставляемой платформой MapReduce, но в этом случае вы гарантированно вызовете код объединения. Но есть некоторые предостережения с этим подходом. Сохранение состояния при вызовах карты может оказаться проблематичным и определенно является нарушением функционального духа функции «карты». Кроме того, поддерживая состояние всех картографов, в зависимости от данных, используемых в задании, память может стать еще одной проблемой, с которой приходится сталкиваться. В конечном счете, нужно было бы взвесить все компромиссы, чтобы определить лучший подход.

Полученные результаты

Теперь давайте посмотрим на некоторые результаты различных картографов. Поскольку задание выполнялось в псевдораспределенном режиме, фактическое время выполнения не имеет значения, но мы все же можем сделать вывод о том, как использование локальной агрегации может повлиять на эффективность задания MapReduce в реальном кластере.

В Token Mapper:

1
2
3
4
12/09/13 21:25:32 INFO mapred.JobClient:     Reduce shuffle bytes=366010
12/09/13 21:25:32 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:25:32 INFO mapred.JobClient:     Spilled Records=63118
12/09/13 21:25:32 INFO mapred.JobClient:     Map output bytes=302886

В варианте сокращения Mapper 1:

1
2
3
4
12/09/13 21:28:15 INFO mapred.JobClient:     Reduce shuffle bytes=354112
12/09/13 21:28:15 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:28:15 INFO mapred.JobClient:     Spilled Records=60704
12/09/13 21:28:15 INFO mapred.JobClient:     Map output bytes=293402

В варианте сокращения Mapper 2:

1
2
3
4
12/09/13 21:30:49 INFO mapred.JobClient:     Reduce shuffle bytes=105885
12/09/13 21:30:49 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:30:49 INFO mapred.JobClient:     Spilled Records=15314
12/09/13 21:30:49 INFO mapred.JobClient:     Map output bytes=90565

Опция Combiner:

1
2
3
4
5
6
12/09/13 21:22:18 INFO mapred.JobClient:     Reduce shuffle bytes=105885
12/09/13 21:22:18 INFO mapred.JobClient:     Reduce output records=7657
12/09/13 21:22:18 INFO mapred.JobClient:     Spilled Records=15314
12/09/13 21:22:18 INFO mapred.JobClient:     Map output bytes=302886
12/09/13 21:22:18 INFO mapred.JobClient:     Combine input records=31559
12/09/13 21:22:18 INFO mapred.JobClient:     Combine output records=7657

Как и ожидалось, Mapper, который не объединял, имел худшие результаты, за которым следовал первый вариант объединения в Mapper (хотя эти результаты можно было бы улучшить, если бы данные были очищены перед выполнением подсчета слов). Второй вариант объединения в картографе и функция объединителя дали практически идентичные результаты. Существенным фактом является то, что оба производят на 2/3 меньше сокращенных байтов в случайном порядке, как первые два варианта. Сокращение количества байтов, отправляемых по сети редукторам на эту сумму, несомненно, положительно скажется на эффективности задания MapReduce. Здесь нужно помнить об одном: объединение Combiners / In-Mapper может использоваться не только во всех заданиях MapReduce, в этом случае подсчет слов очень хорошо подходит для такого улучшения, но это не всегда может быть правдой. ,

Вывод

Как вы видите, преимущества использования объединения в картографическом режиме или функции объединителя Hadoop требуют серьезного рассмотрения при повышении производительности ваших заданий MapReduce. Что касается того, какой подход, то вам решать, какие компромиссы для каждого подхода.

Ссылки по теме

Приятного кодирования и не забудьте поделиться!

Ссылка: Работа с интенсивной обработкой данных с помощью MapReduce от нашего партнера по JCG Билла Бекака в блоге Randomечни о кодировании .