Статьи

Упрощение вторичной сортировки в MapReduce с помощью htuple

Недавно я погрузился в написание ряда заданий MapReduce, которые требуют вторичной сортировки. Пока я ухаживал за своими спазмыми руками после написания того, что было похоже на сотую запись (и поддерживает разделитель / компараторы), мне пришла в голову мысль: «Конечно, есть лучший способ?» Когда я начал думать об этом еще немного, я понял, что мне нужен механизм общего назначения, который позволил бы мне:

  1. Работа с составными элементами
  2. Предоставить готовые разделители и компараторы, которые бы знали, как работать с этими составными элементами
  3. Смоделируйте все это так, чтобы было легко читать и понимать

Это вдохновение для htuple , маленького проекта, который я только что открыл.

htuple

Позвольте мне привести пример того, как вы можете использовать htupleдля выполнения вторичной сортировки. Представьте, что у вас есть набор данных, который содержит фамилии и имена:

Smith	John
Smith	Anne
Smith	Ken

Одним из примеров агрегации, которую вы можете выполнить для этих данных, является подсчет количества различных имен для каждой фамилии. Разумным подходом для реализации этого в MapReduce было бы использование фамилии в качестве выходного ключа преобразователя, имени в качестве выходного значения преобразователя, а в редукторе вы должны собрать все первые имена в наборе и затем подсчитать их. Это будет хорошо работать при работе с именами, но что если в вашем наборе данных есть несколько ключей с большим количеством различных значений — достаточно больших, чтобы вы могли столкнуться с проблемами кэширования всех данных в памяти редуктора?

Одним из решений здесь было бы использовать вторичную сортировку — и в примере с нашими именами отсортировать имена так, чтобы редуктору не нужно было хранить их в наборе (вместо этого он может просто увеличивать счетчик при чтении первых имен ). В этом случае вы, вероятно, в конечном итоге напишете пользовательский, Writableкоторый будет содержать как фамилию, так и имя, а также пользовательский разделитель и компаратор сортировки и группировки. Фу, это много работы только для того, чтобы заставить работать вторичную сортировку.

Давайте рассмотрим, как вы будете использовать htupleэту работу. Прежде всего, я бы рекомендовал определить перечисление для создания логических имен для элементов, которые вы будете хранить в кортеже. В нашем случае нам нужны два элемента для имен, так что здесь идет:

/**
 * User-friendly names that we can use to refer to fields in the tuple.
 */
enum TupleFields {
    LAST_NAME,
    FIRST_NAME
}

Первая концепция, которую мы представим, htuple— это Tupleкласс. Этот класс является просто контейнером для чтения и записи нескольких элементов и будет тем классом, который вы будете использовать для передачи ключей из вашего преобразователя. Есть три способа записать данные в этот кортеж — здесь мы рассмотрим то, что я считаю наиболее полезным методом, который использует только что созданное перечисление. Давайте посмотрим, как это будет работать в нашем картографе.

public static class Map extends Mapper<LongWritable, Text, Tuple, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        // tokenize the line
        String nameParts[] = value.toString().split("\t");

        // create the tuple, setting the first and last names
        Tuple outputKey = new Tuple();
        outputKey.set(TupleFields.LAST_NAME, nameParts[0]);
        outputKey.set(TupleFields.FIRST_NAME, nameParts[1]);

        // emit the tuple and the original contents of the line
        context.write(outputKey, value);
    }
}

Первое, что вы делаете в маппере, это разделяете строку ввода, где первый токен — это фамилия, а второй токен — это имя. Далее вы создаете новый Tupleобъект и устанавливаете фамилию и имя. Мы используем перечисление для логической ссылки на поля. За кулисами происходит то, что Tupleкласс использует порядковый номер перечисления для определения позиции в ArrayList, которую нужно установить. Таким образом, это значение LAST_NAME, которое имеет порядковый номер 0, будет иметь свое значение, установленное в индексе 0в Tupleбазовых классах ArrayList.

Теперь, когда вы выпустили свой кортеж в маппере, вам нужно настроить свою работу для вторичной сортировки. Это будет подвергать вас ко второму классу в htuple, ShuffleUtils. ShuffleUtilsпозволяет указать, какие элементы в вашем кортеже используются для разделения, сортировки и группировки на этапе перемешивания. И вот как вы это делаете:

ShuffleUtils.configBuilder()
    .useNewApi()
    .setPartitionerIndices(TupleFields.LAST_NAME)
    .setSortIndices(TupleFields.values())
    .setGroupIndices(TupleFields.LAST_NAME)
    .configure(conf);

Если вы помните, как работает вторичная сортировка ( подробное объяснение см. В моей книге « Hadoop на практике »), вам нужно выполнить три шага в вашем драйвере MapReduce:

  1. Укажите, как ваш составной ключ будет разделен. В нашем примере мы хотим, чтобы разделитель использовал только фамилию, чтобы все записи с одинаковыми фамилиями направлялись в один и тот же редуктор.
  2. Укажите, как ваш составной ключ будет отсортирован. Здесь мы хотим отсортировать как фамилию, так и фамилию, чтобы имена были представлены вашему редуктору в отсортированном порядке.
  3. Укажите, как ваш составной ключ будет сгруппирован. Поскольку мы хотим, чтобы все первые имена передавались в один вызов редуктора для заданной фамилии, мы хотим группировать только по фамилии.

Несколько вещей, которые стоит отметить в приведенном выше примере кода:

  1. Мы используем новый API MapReduce (то есть, используя пакет org.apache.hadoop.mapreduce), и поэтому вам нужно вызвать useNewApiметод.
  2. valuesМетод на перечислении возвращает массив всех полей перечислений в порядке определения, в нашем примере это последнее имя , сопровождаемое первым именем — именно порядок , в котором мы хотим , чтобы сортировка произойти.

Вы сделали! Вы можете посмотреть полный источник в SecondarySort.java . На странице htuple github есть инструкции по загрузке, сборке и запуску этого же примера за пару простых шагов. Если вы изучите выходные данные задания MapReduce в HDFS, то увидите, что все записи отсортированы по фамилии и имени.

$ hadoop fs -cat output/part*
Smith	Anne
Smith	John
Smith	Ken