Статьи

Алгоритмы MapReduce — вторичная сортировка

Мы продолжаем нашу серию по реализации алгоритмов MapReduce, которые можно найти в книге «Интенсивная обработка текста с помощью MapReduce». Другие посты в этой серии:

  1. Работа с интенсивной обработкой текста с помощью MapReduce
  2. Работа с интенсивной обработкой текста с помощью MapReduce — локальная агрегация, часть II
  3. Вычисление матрицы сопутствующих явлений с помощью Hadoop
  4. Алгоритмы MapReduce — инверсия порядка

Эта статья посвящена шаблону вторичной сортировки, описанному в главе 3 « Интенсивная обработка текста с помощью MapReduce» . В то время как Hadoop автоматически сортирует данные, испускаемые картографами, перед отправкой в ​​редукторы, что вы можете сделать, если вы также хотите отсортировать по значению? Вы используете вторичную сортировку, конечно. С небольшой манипуляцией с форматом ключевого объекта, вторичная сортировка дает нам возможность учитывать значение на этапе сортировки. Здесь есть два возможных подхода.

Первый подход предполагает наличие в буфере-восстановителе всех значений для данного ключа и выполнение сортировки по значениям в редукторе. Поскольку редуктор будет получать все значения для данного ключа, такой подход может привести к нехватке памяти редуктора.

Второй подход заключается в создании составного ключа путем добавления части или всего значения к естественному ключу для достижения целей сортировки. Компромисс между этими двумя подходами заключается в том, чтобы выполнить явную сортировку значений в редукторе, скорее всего, будет быстрее (с риском нехватки памяти), но реализация подхода преобразования «значение в ключ» снимает нагрузку с сортировки инфраструктуры MapReduce. , которая лежит в основе того, для чего предназначен Hadoop / MapReduce. Для целей данного поста мы рассмотрим подход «значение для ключа». Нам нужно написать собственный разделитель, чтобы все данные с одним и тем же ключом (естественный ключ, не включая составной ключ со значением) отправлялись в один и тот же редуктор и настраиваемый компаратор, чтобы данные группировались по естественному ключу после его прибывает на редуктор.

Преобразование значения в ключ

Создать составной ключ очень просто. Нам нужно проанализировать, какую часть (и) значения мы хотим учесть во время сортировки, и добавить соответствующие части в естественный ключ. Затем нам нужно поработать с методом CompareTo либо в классе ключей, либо в классе компаратора, чтобы убедиться, что составной ключ учтен. Мы повторно посетим набор данных о погоде и включим температуру как часть естественного ключа (естественным ключом является год и месяц, соединенные вместе). Результатом будет список самого холодного дня для данного месяца и года. Этот пример был вдохновлен примером вторичной сортировки, найденным в Hadoop, The Definitive Guidebook . Хотя, возможно, есть лучшие способы для достижения этой цели, но она будет достаточно хороша, чтобы продемонстрировать, как работает вторичная сортировка.

Код Mapper

Наш код сопоставления уже объединяет год и месяц вместе, но мы также включим температуру как часть ключа. Поскольку мы включили значение в сам ключ, преобразователь выдаст значение NullWritable, где в других случаях мы будем указывать температуру.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SecondarySortingTemperatureMapper extends Mapper<LongWritable, Text, TemperaturePair, NullWritable> {
 
    private TemperaturePair temperaturePair = new TemperaturePair();
    private NullWritable nullValue = NullWritable.get();
    private static final int MISSING = 9999;
@Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String yearMonth = line.substring(15, 21);
 
        int tempStartPosition = 87;
 
        if (line.charAt(tempStartPosition) == '+') {
            tempStartPosition += 1;
        }
 
        int temp = Integer.parseInt(line.substring(tempStartPosition, 92));
 
        if (temp != MISSING) {
            temperaturePair.setYearMonth(yearMonth);
            temperaturePair.setTemperature(temp);
            context.write(temperaturePair, nullValue);
        }
    }
}

Теперь мы добавили температуру к ключу, мы установили этап для включения вторичной сортировки. Осталось написать код, учитывающий температуру, когда это необходимо. Здесь у нас есть два варианта: написать Comparator или настроить метод CompareTo для класса TemperaturePair (TemperaturePair реализует WritableComparable). В большинстве случаев я бы порекомендовал написать отдельный Comparator, но класс TemperaturePair был написан специально для демонстрации вторичной сортировки, поэтому мы изменим метод compareTo класса TemperaturePair.

1
2
3
4
5
6
7
8
@Override
   public int compareTo(TemperaturePair temperaturePair) {
       int compareValue = this.yearMonth.compareTo(temperaturePair.getYearMonth());
       if (compareValue == 0) {
           compareValue = temperature.compareTo(temperaturePair.getTemperature());
       }
       return compareValue;
   }

Если бы мы хотели отсортировать в порядке убывания, мы могли бы просто умножить результат сравнения температуры на -1.
Теперь, когда мы завершили часть, необходимую для сортировки, нам нужно написать собственный разделитель.

Кодекс участника

Чтобы при определении того, в какой редуктор отправлять данные, учитывался только естественный ключ, нам нужно написать собственный разделитель. Код прост и учитывает значение yearMonth класса TemperaturePair только при расчете редуктора, в который будут отправляться данные.

1
2
3
4
5
6
public class TemperaturePartitioner extends Partitioner<TemperaturePair, NullWritable>{
    @Override
    public int getPartition(TemperaturePair temperaturePair, NullWritable nullWritable, int numPartitions) {
        return temperaturePair.getYearMonth().hashCode() % numPartitions;
    }
}

В то время как пользовательский разделитель гарантирует, что все данные за год и месяц поступают в один и тот же редуктор, нам все равно необходимо учитывать тот факт, что редуктор будет группировать записи по ключу.

Группировка Компаратор

Как только данные достигают редуктора, все данные группируются по ключу. Поскольку у нас есть составной ключ, нам нужно убедиться, что записи сгруппированы исключительно по естественному ключу. Это достигается написанием пользовательского GroupPartitioner. У нас есть объект Comparator, который рассматривает только поле yearMonth класса TemperaturePair для группировки записей.

01
02
03
04
05
06
07
08
09
10
11
public class YearMonthGroupingComparator extends WritableComparator {
    public YearMonthGroupingComparator() {
        super(TemperaturePair.class, true);
    }
    @Override
    public int compare(WritableComparable tp1, WritableComparable tp2) {
        TemperaturePair temperaturePair = (TemperaturePair) tp1;
        TemperaturePair temperaturePair2 = (TemperaturePair) tp2;
        return temperaturePair.getYearMonth().compareTo(temperaturePair2.getYearMonth());
    }
}

Полученные результаты

Вот результаты выполнения нашей вторичной сортировки:

01
02
03
04
05
06
07
08
09
10
11
12
13
new-host-2:sbin bbejeck$ hdfs dfs -cat secondary-sort/part-r-00000
190101  -206
190102  -333
190103  -272
190104  -61
190105  -33
190106  44
190107  72
190108  44
190109  17
190110  -33
190111  -217
190112  -300

Вывод

Хотя сортировка данных по значению может не являться общей потребностью, это хороший инструмент, который нужно иметь в своем заднем кармане, когда это необходимо. Кроме того, мы смогли глубже взглянуть на внутреннюю работу Hadoop, работая с пользовательскими разделителями и групповыми разделителями. Спасибо за уделенное время.

Ресурсы

Ссылка: Алгоритмы MapReduce — вторичная сортировка от нашего партнера по JCG Билла Бекака в блоге « Случайные мысли о кодировании» .