Статьи

Алгоритмы MapReduce — инверсия порядка

Этот пост является еще одним сегментом в серии, представляющей алгоритмы MapReduce, которые можно найти в книге « Интенсивная обработка текста с помощью MapReduce» . Предыдущие выпуски : Локальная агрегация , Локальная агрегация, Часть II и Создание матрицы сопутствующих явлений . На этот раз мы обсудим модель инверсии порядка. В шаблоне инверсии порядка используется фаза сортировки MapReduce для передачи данных, необходимых для вычислений, в редуктор перед данными, которыми будут манипулировать. Прежде чем вы отклоните это как крайнее условие для MapReduce, я настоятельно призываю вас прочитать, как мы обсудим как использовать сортировку в наших интересах и использовать специальный разделитель, оба из которых являются полезными инструментами.

Хотя многие программы MapReduce написаны на более высоком уровне абстракции, например, Hive или Pig, все же полезно иметь представление о том, что происходит на более низком уровне. Шаблон инверсии порядка можно найти в главе 3 Обработки текста с интенсивным использованием данных в книге MapReduce. , Чтобы проиллюстрировать шаблон инверсии порядка, мы будем использовать подход «Пары» из шаблона матрицы совместного использования. При создании матрицы совместного вхождения мы отслеживаем общее количество слов, когда слова появляются вместе. На высоком уровне мы используем подход «Пары» и добавляем небольшой поворот, в дополнение к тому, что маппер испускает пару слов, таких как («foo», «bar»), мы испускаем дополнительную пару слов («foo», » * ») И будет делать это для каждой пары слов, чтобы мы могли легко рассчитать общий счетчик частоты появления самого левого слова и использовать его для расчета наших относительных частот. Этот подход породил две конкретные проблемы. Во-первых, нам нужно найти способ обеспечить, чтобы пары слов («foo», «*») сначала приходили к редуктору. Во-вторых, нам нужно убедиться, что все пары слов с одинаковым левым словом приходят к одному и тому же редуктору. Прежде чем мы решим эти проблемы, давайте взглянем на наш код Mapper.

Код Mapper

Сначала нам нужно изменить наш картограф из подхода Пары. В нижней части каждого цикла после того, как мы выпустили все пары слов для определенного слова, мы выпустим специальный токен WordPair («слово», «*») вместе со счетчиком раз, когда было найдено слово слева.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class PairsRelativeOccurrenceMapper extends Mapper<LongWritable, Text, WordPair, IntWritable> {
    private WordPair wordPair = new WordPair();
    private IntWritable ONE = new IntWritable(1);
    private IntWritable totalCount = new IntWritable();
 
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        int neighbors = context.getConfiguration().getInt('neighbors', 2);
        String[] tokens = value.toString().split('\\s+');
        if (tokens.length > 1) {
            for (int i = 0; i < tokens.length; i++) {
                    tokens[i] = tokens[i].replaceAll('\\W+','');
 
                    if(tokens[i].equals('')){
                        continue;
                    }
 
                    wordPair.setWord(tokens[i]);
 
                    int start = (i - neighbors < 0) ? 0 : i - neighbors;
                    int end = (i + neighbors >= tokens.length) ? tokens.length - 1 : i + neighbors;
                    for (int j = start; j <= end; j++) {
                        if (j == i) continue;
                        wordPair.setNeighbor(tokens[j].replaceAll('\\W',''));
                        context.write(wordPair, ONE);
                    }
                    wordPair.setNeighbor('*');
                    totalCount.set(end - start);
                    context.write(wordPair, totalCount);
            }
        }
    }
}

Теперь, когда мы сгенерировали способ отследить общее количество раз, с которым встречалось определенное слово, нам нужно убедиться, что эти специальные символы сначала достигают редуктора, чтобы можно было подсчитать общее количество для вычисления относительных частот. У нас будет фаза сортировки процесса MapReduce, чтобы обработать это для нас, изменив метод CompareTo в объекте WordPair.

Модифицированная сортировка

Мы модифицируем метод CompareTo в классе WordPair, чтобы при появлении символа «*» справа этот конкретный объект помещался наверх.

01
02
03
04
05
06
07
08
09
10
11
12
13
@Override
public int compareTo(WordPair other) {
    int returnVal = this.word.compareTo(other.getWord());
    if(returnVal != 0){
        return returnVal;
    }
    if(this.neighbor.toString().equals('*')){
        return -1;
    }else if(other.getNeighbor().toString().equals('*')){
        return 1;
    }
    return this.neighbor.compareTo(other.getNeighbor());
}

Изменяя метод compareTo, мы теперь гарантируем, что любая WordPair со специальным символом будет отсортирована сверху и сначала прибудет в редуктор. Это приводит к нашей второй специализации, как мы можем гарантировать, что все объекты WordPair с данным левым словом будут отправлены одному и тому же редуктору? Ответ заключается в создании пользовательского разделителя.

Пользовательский Разделитель

Промежуточные ключи перетасовываются в редукторы путем вычисления хэш-кода ключа по модулю количества редукторов. Но наши объекты WordPair содержат два слова, поэтому получение хеш-кода всего объекта явно не сработает. Нам нужно создать собственный Partitioner, который учитывает только левое слово, когда речь заходит о том, в какой редуктор отправлять вывод.

1
2
3
4
5
6
7
public class WordPairPartitioner extends Partitioner<WordPair,IntWritable> {
 
    @Override
    public int getPartition(WordPair wordPair, IntWritable intWritable, int numPartitions) {
        return wordPair.getWord().hashCode() % numPartitions;
    }
}

Теперь мы гарантируем, что все объекты WordPair с одинаковым левым словом отправляются одному и тому же редуктору. Осталось только создать редуктор, чтобы использовать преимущества формата отправляемых данных.

редуктор

Построение редуктора для инвертированной последовательности инверсии порядка просто. Это будет включать в себя сохранение переменной счетчика и переменной текущего слова. Редуктор проверит клавишу ввода WordPair на наличие специального символа «*» справа. Если слово слева не совпадает с «текущим» словом, мы переустанавливаем счетчик и суммируем все значения, чтобы получить общее количество раз, когда данное текущее слово наблюдалось. Теперь мы будем обрабатывать следующие объекты WordPair, суммировать счетчики и делить их на нашу переменную-счетчик, чтобы получить относительную частоту. Этот процесс будет продолжаться, пока не встретится другой специальный символ, и процесс не начнется заново.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class PairsRelativeOccurrenceReducer extends Reducer<WordPair, IntWritable, WordPair, DoubleWritable> {
    private DoubleWritable totalCount = new DoubleWritable();
    private DoubleWritable relativeCount = new DoubleWritable();
    private Text currentWord = new Text('NOT_SET');
    private Text flag = new Text('*');
 
    @Override
    protected void reduce(WordPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        if (key.getNeighbor().equals(flag)) {
            if (key.getWord().equals(currentWord)) {
                totalCount.set(totalCount.get() + getTotalCount(values));
            } else {
                currentWord.set(key.getWord());
                totalCount.set(0);
                totalCount.set(getTotalCount(values));
            }
        } else {
            int count = getTotalCount(values);
            relativeCount.set((double) count / totalCount.get());
            context.write(key, relativeCount);
        }
    }
  private int getTotalCount(Iterable<IntWritable> values) {
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        return count;
    }
}

Управляя порядком сортировки и создавая пользовательский разделитель, мы смогли отправить данные в редуктор, необходимый для вычисления, до того, как поступят данные, необходимые для этого вычисления. Хотя здесь это не показано, для запуска задания MapReduce использовался объединитель. Этот подход также является хорошим кандидатом на комбинацию «in-mapper».

Пример и результаты

Учитывая, что наступают праздники, я почувствовал, что настало время привести пример схемы инверсии порядка в романе Чарльза Диккенса «Рождественская песнь». Я знаю, что это банально, но это служит цели.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
new-host-2:sbin bbejeck$ hdfs dfs -cat relative/part* | grep Humbug
{word=[Humbug] neighbor=[Scrooge]}  0.2222222222222222
{word=[Humbug] neighbor=[creation]} 0.1111111111111111
{word=[Humbug] neighbor=[own]}  0.1111111111111111
{word=[Humbug] neighbor=[said]} 0.2222222222222222
{word=[Humbug] neighbor=[say]}  0.1111111111111111
{word=[Humbug] neighbor=[to]}   0.1111111111111111
{word=[Humbug] neighbor=[with]} 0.1111111111111111
{word=[Scrooge] neighbor=[Humbug]}  0.0020833333333333333
{word=[creation] neighbor=[Humbug]} 0.1
{word=[own] neighbor=[Humbug]}  0.006097560975609756
{word=[said] neighbor=[Humbug]} 0.0026246719160104987
{word=[say] neighbor=[Humbug]}  0.010526315789473684
{word=[to] neighbor=[Humbug]}   3.97456279809221E-4
{word=[with] neighbor=[Humbug]} 9.372071227741331E-4

Вывод

Хотя вычисление относительных частот встречаемости слов, вероятно, не является обычной задачей, мы смогли продемонстрировать полезные примеры сортировки и использования пользовательского разделителя, которые являются хорошими инструментами, которые можно использовать при создании программ MapReduce. Как указывалось ранее, даже если большая часть вашего MapReduce написана на более высоком уровне абстракции, такой как Hive или Pig, все же поучительно иметь представление о том, что происходит под капотом. Спасибо за ваше время.

Ссылка: MapReduce Algorithms — Инверсия заказа от нашего партнера по JCG Билла Бекака в блоге « Случайные мысли о кодировании»