Мы продолжаем нашу серию по реализации алгоритмов MapReduce, которые можно найти в книге «Интенсивная обработка текста с помощью MapReduce». Другие посты в этой серии:
- Работа с интенсивной обработкой текста с помощью MapReduce
- Работа с интенсивной обработкой текста с помощью MapReduce — локальная агрегация, часть II
- Вычисление матрицы сопутствующих явлений с помощью Hadoop
- Алгоритмы MapReduce — инверсия порядка
Эта статья посвящена шаблону вторичной сортировки, описанному в главе 3 « Интенсивная обработка текста с помощью MapReduce» . В то время как Hadoop автоматически сортирует данные, испускаемые картографами, перед отправкой в редукторы, что вы можете сделать, если вы также хотите отсортировать по значению? Вы используете вторичную сортировку, конечно. С небольшой манипуляцией с форматом ключевого объекта, вторичная сортировка дает нам возможность учитывать значение на этапе сортировки. Здесь есть два возможных подхода.
Первый подход предполагает наличие в буфере-восстановителе всех значений для данного ключа и выполнение сортировки по значениям в редукторе. Поскольку редуктор будет получать все значения для данного ключа, такой подход может привести к нехватке памяти редуктора.
Второй подход заключается в создании составного ключа путем добавления части или всего значения к естественному ключу для достижения целей сортировки. Компромисс между этими двумя подходами заключается в том, чтобы выполнить явную сортировку значений в редукторе, скорее всего, будет быстрее (с риском нехватки памяти), но реализация подхода преобразования «значение в ключ» снимает нагрузку с сортировки инфраструктуры MapReduce. , которая лежит в основе того, для чего предназначен Hadoop / MapReduce. Для целей данного поста мы рассмотрим подход «значение для ключа». Нам нужно написать собственный разделитель, чтобы все данные с одним и тем же ключом (естественный ключ, не включая составной ключ со значением) отправлялись в один и тот же редуктор и настраиваемый компаратор, чтобы данные группировались по естественному ключу после его прибывает на редуктор.
Преобразование значения в ключ
Создать составной ключ очень просто. Нам нужно проанализировать, какую часть (и) значения мы хотим учесть во время сортировки, и добавить соответствующие части в естественный ключ. Затем нам нужно поработать с методом CompareTo либо в классе ключей, либо в классе компаратора, чтобы убедиться, что составной ключ учтен. Мы повторно посетим набор данных о погоде и включим температуру как часть естественного ключа (естественным ключом является год и месяц, соединенные вместе). Результатом будет список самого холодного дня для данного месяца и года. Этот пример был вдохновлен примером вторичной сортировки, найденным в Hadoop, The Definitive Guidebook . Хотя, возможно, есть лучшие способы для достижения этой цели, но она будет достаточно хороша, чтобы продемонстрировать, как работает вторичная сортировка.
Код Mapper
Наш код сопоставления уже объединяет год и месяц вместе, но мы также включим температуру как часть ключа. Поскольку мы включили значение в сам ключ, преобразователь выдаст значение NullWritable, где в других случаях мы будем указывать температуру.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public class SecondarySortingTemperatureMapper extends Mapper<LongWritable, Text, TemperaturePair, NullWritable> { private TemperaturePair temperaturePair = new TemperaturePair(); private NullWritable nullValue = NullWritable.get(); private static final int MISSING = 9999;@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String yearMonth = line.substring(15, 21); int tempStartPosition = 87; if (line.charAt(tempStartPosition) == '+') { tempStartPosition += 1; } int temp = Integer.parseInt(line.substring(tempStartPosition, 92)); if (temp != MISSING) { temperaturePair.setYearMonth(yearMonth); temperaturePair.setTemperature(temp); context.write(temperaturePair, nullValue); } }} |
Теперь мы добавили температуру к ключу, мы установили этап для включения вторичной сортировки. Осталось написать код, учитывающий температуру, когда это необходимо. Здесь у нас есть два варианта: написать Comparator или настроить метод CompareTo для класса TemperaturePair (TemperaturePair реализует WritableComparable). В большинстве случаев я бы порекомендовал написать отдельный Comparator, но класс TemperaturePair был написан специально для демонстрации вторичной сортировки, поэтому мы изменим метод compareTo класса TemperaturePair.
|
1
2
3
4
5
6
7
8
|
@Override public int compareTo(TemperaturePair temperaturePair) { int compareValue = this.yearMonth.compareTo(temperaturePair.getYearMonth()); if (compareValue == 0) { compareValue = temperature.compareTo(temperaturePair.getTemperature()); } return compareValue; } |
Если бы мы хотели отсортировать в порядке убывания, мы могли бы просто умножить результат сравнения температуры на -1.
Теперь, когда мы завершили часть, необходимую для сортировки, нам нужно написать собственный разделитель.
Кодекс участника
Чтобы при определении того, в какой редуктор отправлять данные, учитывался только естественный ключ, нам нужно написать собственный разделитель. Код прост и учитывает значение yearMonth класса TemperaturePair только при расчете редуктора, в который будут отправляться данные.
|
1
2
3
4
5
6
|
public class TemperaturePartitioner extends Partitioner<TemperaturePair, NullWritable>{ @Override public int getPartition(TemperaturePair temperaturePair, NullWritable nullWritable, int numPartitions) { return temperaturePair.getYearMonth().hashCode() % numPartitions; }} |
В то время как пользовательский разделитель гарантирует, что все данные за год и месяц поступают в один и тот же редуктор, нам все равно необходимо учитывать тот факт, что редуктор будет группировать записи по ключу.
Группировка Компаратор
Как только данные достигают редуктора, все данные группируются по ключу. Поскольку у нас есть составной ключ, нам нужно убедиться, что записи сгруппированы исключительно по естественному ключу. Это достигается написанием пользовательского GroupPartitioner. У нас есть объект Comparator, который рассматривает только поле yearMonth класса TemperaturePair для группировки записей.
|
01
02
03
04
05
06
07
08
09
10
11
|
public class YearMonthGroupingComparator extends WritableComparator { public YearMonthGroupingComparator() { super(TemperaturePair.class, true); } @Override public int compare(WritableComparable tp1, WritableComparable tp2) { TemperaturePair temperaturePair = (TemperaturePair) tp1; TemperaturePair temperaturePair2 = (TemperaturePair) tp2; return temperaturePair.getYearMonth().compareTo(temperaturePair2.getYearMonth()); }} |
Полученные результаты
Вот результаты выполнения нашей вторичной сортировки:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
new-host-2:sbin bbejeck$ hdfs dfs -cat secondary-sort/part-r-00000190101 -206190102 -333190103 -272190104 -61190105 -33190106 44190107 72190108 44190109 17190110 -33190111 -217190112 -300 |
Вывод
Хотя сортировка данных по значению может не являться общей потребностью, это хороший инструмент, который нужно иметь в своем заднем кармане, когда это необходимо. Кроме того, мы смогли глубже взглянуть на внутреннюю работу Hadoop, работая с пользовательскими разделителями и групповыми разделителями. Спасибо за уделенное время.
Ресурсы
- Интенсивная обработка данных с MapReduce Джимми Лином и Крисом Дайером
- Hadoop: полное руководство Тома Уайта
- Исходный код и тесты из блога
- Hadoop API
- MRUnit для модульного тестирования Apache Hadoop map уменьшить количество рабочих мест
Ссылка: Алгоритмы MapReduce — вторичная сортировка от нашего партнера по JCG Билла Бекака в блоге « Случайные мысли о кодировании» .