Статьи

Алгоритмы MapReduce — Понимание объединения данных, часть 1

В этой статье мы продолжим серию реализации алгоритмов, описанных в книге « Интенсивная обработка текста с использованием MapReduce» , на этот раз обсуждаем объединения данных. Хотя мы собираемся обсудить методы объединения данных в Hadoop и предоставить пример кода, в большинстве случаев вы, вероятно, не будете писать код для выполнения соединений самостоятельно. Вместо этого объединение данных лучше выполнить с помощью инструментов, работающих на более высоком уровне абстракции, таких как Hive или Pig. Зачем тратить время на то, чтобы научиться объединять данные, если есть инструменты, которые позаботятся об этом за вас? Объединение данных, возможно, является одним из самых больших применений Hadoop. Получение полного понимания того, как Hadoop выполняет объединения, крайне важно для принятия решения о том, какое объединение использовать, и для отладки при возникновении проблем. Кроме того, как только вы полностью поймете, как выполняются различные объединения в Hadoop, вы можете лучше использовать такие инструменты, как Hive и Pig. Наконец, может быть единственный случай, когда инструмент просто не даст вам того, что вам нужно, и вам придется засучить рукава и написать код самостоятельно.

Потребность в соединениях

При обработке больших наборов данных необходимость объединения данных с помощью общего ключа может быть очень полезной, если не существенной. Объединяя данные, вы можете получить дополнительную информацию, такую ​​как объединение с временными метками, чтобы соотнести события со временем в день. Необходимость объединения данных многочисленна и разнообразна. Мы расскажем о 3 типах объединений, объединениях на стороне уменьшения, объединениях на стороне карты и объединении с поддержкой памяти в трех отдельных публикациях. В этой части мы рассмотрим работу с соединениями Reduce-Side.

Уменьшить боковые соединения

Из шаблонов соединений, которые мы обсудим, соединения на стороне сокращения проще всего реализовать. Прямое соединение со стороны редуктора делает то, что Hadoop отправляет идентичные ключи одному и тому же редуктору, поэтому по умолчанию данные организованы для нас. Чтобы выполнить объединение, нам просто необходимо кэшировать ключ и сравнить его с входящими ключами. Пока ключи совпадают, мы можем объединить значения из соответствующих ключей. Компромисс с объединениями на стороне сокращения — это производительность, поскольку все данные перетасовываются по сети. В объединениях на стороне сокращения мы рассмотрим два разных сценария: один-к-одному и один-ко-многим. Мы также рассмотрим варианты, в которых нам не нужно отслеживать входящие ключи; все значения для данного ключа будут сгруппированы в редуктор.

Индивидуальные объединения

Соединение «один к одному» — это случай, когда значение из набора данных «X» совместно использует общий ключ со значением из набора данных «Y». Поскольку Hadoop гарантирует, что равные ключи отправляются одному и тому же редуктору, сопоставление двух наборов данных позаботится о соединении для нас. Поскольку сортировка происходит только по ключам, порядок значений неизвестен. Мы можем легко исправить ситуацию, используя вторичную сортировку . Наша реализация вторичной сортировки будет заключаться в том, чтобы помечать ключи «1» или «2» для определения порядка значений. Нам нужно сделать пару дополнительных шагов для реализации нашей стратегии тегирования.

Реализация WritableComparable

Сначала нам нужно написать класс, который реализует интерфейс WritableComparable, который будет использоваться для переноса нашего ключа.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
public class TaggedKey implements Writable, WritableComparable<TaggedKey> {
 
    private Text joinKey = new Text();
    private IntWritable tag = new IntWritable();
 
    @Override
    public int compareTo(TaggedKey taggedKey) {
        int compareValue = this.joinKey.compareTo(taggedKey.getJoinKey());
        if(compareValue == 0 ){
            compareValue = this.tag.compareTo(taggedKey.getTag());
        }
       return compareValue;
    }
   //Details left out for clarity
 }

Когда наш класс TaggedKey отсортирован, ключи с одинаковым значением joinKey будут иметь вторичную сортировку по значению поля tag , гарантируя joinKey порядок.

Написание собственного раздела

Затем нам нужно написать пользовательский разделитель, который будет учитывать ключ объединения только при определении, в какой редуктор отправляются составной ключ и данные:

1
2
3
4
5
6
7
public class TaggedJoiningPartitioner extends Partitioner<TaggedKey,Text> {
 
    @Override
    public int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {
        return taggedKey.getJoinKey().hashCode() % numPartitions;
    }
}

На данный момент у нас есть то, что нам нужно, чтобы объединить данные и обеспечить порядок значений. Но мы не хотим отслеживать ключи, когда они входят в метод reduce() . Мы хотим, чтобы все ценности были сгруппированы для нас. Для этого мы будем использовать Comparator который будет учитывать только ключ соединения при принятии решения о том, как группировать значения.

Написание группового компаратора

Наш компаратор, используемый для группировки, будет выглядеть так:

01
02
03
04
05
06
07
08
09
10
11
12
13
public class TaggedJoiningGroupingComparator extends WritableComparator {
 
    public TaggedJoiningGroupingComparator() {
        super(TaggedKey.class,true);
    }
 
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        TaggedKey taggedKey1 = (TaggedKey)a;
        TaggedKey taggedKey2 = (TaggedKey)b;
        return taggedKey1.getJoinKey().compareTo(taggedKey2.getJoinKey());
    }
}

Структура данных

Теперь нам нужно определить, что мы будем использовать для нашего ключа для объединения данных. Для наших образцов данных мы будем использовать файл CSV, созданный из Fakenames Generator . Первый столбец — это GUID, который будет служить нашим ключом соединения. Наши образцы данных содержат такую ​​информацию, как имя, адрес, адрес электронной почты, информация о работе, данные о кредитных картах и ​​автомобилях. Для демонстрации мы возьмем поля GUID, name и address и поместим их в один файл, который будет структурирован следующим образом:

1
2
3
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY

Затем мы возьмем поля GUID, адрес электронной почты, имя пользователя, пароль и кредитную карту и поместим их в другой файл, который будет выглядеть следующим образом:

1
2
3
4
5
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,517-706-9565,[email protected],Waskepter38,noL2ieghie,MasterCard,
5305687295670850
81a43486-07e1-4b92-b92b-03d0caa87b5f,508-307-3433,[email protected],Conerse,Gif4Edeiba,MasterCard,
5265896533330445
aef52cf1-f565-4124-bf18-47acdac47a0e,212-780-4015,[email protected],Subjecall,AiKoiweihi6,MasterCard,524

Теперь нам нужен Mapper, который будет знать, как работать с нашими данными, чтобы извлечь правильный ключ для присоединения, а также установить правильный тег.

Создание картографа

Вот наш код Mapper:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class JoiningMapper extends Mapper<LongWritable, Text, TaggedKey, Text> {
 
    private int keyIndex;
    private Splitter splitter;
    private Joiner joiner;
    private TaggedKey taggedKey = new TaggedKey();
    private Text data = new Text();
    private int joinOrder;
 
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));
        String separator = context.getConfiguration().get("separator");
        splitter = Splitter.on(separator).trimResults();
        joiner = Joiner.on(separator);
        FileSplit fileSplit = (FileSplit)context.getInputSplit();
        joinOrder = Integer.parseInt(context.getConfiguration().get(fileSplit.getPath().getName()));
    }
 
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        List<String> values = Lists.newArrayList(splitter.split(value.toString()));
        String joinKey = values.remove(keyIndex);
        String valuesWithOutKey = joiner.join(values);
        taggedKey.set(joinKey, joinOrder);
        data.set(valuesWithOutKey);
        context.write(taggedKey, data);
    }
 
}

Давайте рассмотрим, что происходит в методе setup() .

  1. Сначала мы получаем индекс нашего ключа соединения и разделитель, используемый в тексте, из значений, заданных в конфигурации при запуске задания.
  2. Затем мы создаем разделитель гуавы, используемый для разделения данных на разделителе, который мы извлекли из вызова context.getConfiguration().get("separator") . Мы также создаем Guava Joiner, который используется для соединения данных после извлечения ключа.
  3. Затем мы получаем имя файла, который будет обрабатывать этот маппер. Мы используем имя файла, чтобы получить порядок соединения для этого файла, который был сохранен в конфигурации.

Мы также должны обсудить, что происходит в методе map() :

  1. Выплевывая наши данные и создавая список значений
  2. Удалить ключ объединения из списка
  3. Повторно объедините данные обратно в одну строку
  4. Установите ключ соединения, порядок соединения и оставшиеся данные
  5. Запишите данные

Итак, мы прочитали наши данные, извлекли ключ, установили порядок соединения и записали наши данные обратно. Давайте посмотрим, как мы будем объединять данные.

Соединение данных

Теперь давайте посмотрим, как данные объединяются в редукторе:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
public class JoiningReducer extends Reduce<TaggedKey, Text, NullWritable, Text> {
 
    private Text joinedText = new Text();
    private StringBuilder builder = new StringBuilder();
    private NullWritable nullKey = NullWritable.get();
 
    @Override
    protected void reduce(TaggedKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        builder.append(key.getJoinKey()).append(",");
        for (Text value : values) {
            builder.append(value.toString()).append(",");
        }
        builder.setLength(builder.length()-1);
        joinedText.set(builder.toString());
        context.write(nullKey, joinedText);
        builder.setLength(0);
    }
}

Поскольку ключ с тэгом «1» первым достиг редуктора, мы знаем, что имя и адрес — это первое значение, а адрес электронной почты, имя пользователя, пароль и данные кредитной карты — второе. Поэтому нам не нужно отслеживать какие-либо ключи. Мы просто перебираем значения и объединяем их вместе.

Индивидуальные результаты Присоединения

Вот результаты выполнения нашей работы One-To-One MapReduce:

1
2
3
4
5
6
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,517-706-9565,[email protected],Waskepter38,noL2ieghie,MasterCard,
5305687295670850
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,508-307-3433,[email protected],Conerse,Gif4Edeiba,MasterCard,
5265896533330445
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,212-780-4015,[email protected],Subjecall,AiKoiweihi6,MasterCard,
5243379373546690

Как мы видим, две записи из нашего примера данных выше были объединены в одну запись. Мы успешно объединили поля GUID, имя, адрес, адрес электронной почты, имя пользователя, пароль и кредитную карту в один файл.

Указание порядка соединения

В этот момент мы можем спросить, как мы указываем порядок соединения для нескольких файлов? Ответ лежит в нашем классе ReduceSideJoinDriver который служит драйвером для нашей программы MapReduce.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ReduceSideJoinDriver {
 
    public static void main(String[] args) throws Exception {
        Splitter splitter = Splitter.on('/');
        StringBuilder filePaths = new StringBuilder();
 
        Configuration config = new Configuration();
        config.set("keyIndex", "0");
        config.set("separator", ",");
 
        for(int i = 0; i< args.length - 1; i++) {
            String fileName = Iterables.getLast(splitter.split(args[i]));
            config.set(fileName, Integer.toString(i+1));
            filePaths.append(args[i]).append(",");
        }
 
        filePaths.setLength(filePaths.length() - 1);
        Job job = Job.getInstance(config, "ReduceSideJoin");
        job.setJarByClass(ReduceSideJoinDriver.class);
 
        FileInputFormat.addInputPaths(job, filePaths.toString());
        FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));
 
        job.setMapperClass(JoiningMapper.class);
        job.setReducerClass(JoiningReducer.class);
        job.setPartitionerClass(TaggedJoiningPartitioner.class);
        job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);
        job.setOutputKeyClass(TaggedKey.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
 
    }
}
    1. Сначала мы создадим разделитель гуавы в строке 5, который разделит строки на «/».
    2. Затем в строках 8-10 мы устанавливаем индекс нашего ключа соединения и разделитель, используемый в файлах.
    3. В строках 12-17 мы устанавливаем теги для входных файлов, которые будут объединены. Порядок имен файлов в командной строке определяет их положение в объединении. Поскольку мы перебираем имена файлов из командной строки, мы разделяем все имя файла и получаем последнее значение (базовое имя файла) с помощью метода Guava Iterables.getLast() . Затем мы вызываем config.set() с именем файла в качестве ключа и используем i + 1 в качестве значения, которое устанавливает тег или порядок соединения. Последнее значение в массиве args пропускается в цикле, так как оно используется для выходного пути нашего задания MapReduce в строке 23. В последней строке цикла мы добавляем каждый путь к файлу в StringBuilder, который используется позже (строка 22) установить пути ввода для задания.
    4. Нам нужно использовать только один сопоставитель для всех файлов, JoiningMapper, который установлен в строке 25.
    5. Строки 27 и 28 устанавливают наш пользовательский разделитель и групповой компаратор (соответственно), которые обеспечивают порядок поступления ключей и значений в редуктор и правильно группируют значения с правильным ключом.

Используя секционер и компаратор группировки, мы знаем, что первое значение принадлежит первому ключу и может использоваться для соединения со всеми другими значениями, содержащимися в Iterable отправляемом методу Iterable reduce() для данного ключа. Теперь пришло время подумать о присоединении «один ко многим».

Один-ко-многим присоединиться

Хорошей новостью является то, что со всей работой, которую мы проделали до этого момента, мы на самом деле можем использовать код в его нынешнем виде для выполнения соединения один ко многим. Существует два подхода к объединению «один ко многим», которые мы можем рассмотреть: 1) небольшой файл с одиночными записями и второй файл с множеством записей для одного и того же ключа и 2) снова файл меньшего размера с отдельными записями, но N количество файлов, каждый из которых содержит запись, которая соответствует первому файлу. Основное отличие состоит в том, что при первом подходе порядок значений за пределами соединения первых двух ключей будет неизвестен. Однако при втором подходе мы будем «помечать» каждый файл соединения, чтобы мы могли контролировать порядок всех соединяемых значений. Для нашего примера первый файл останется нашим файлом GUID-name-address, и у нас будет 3 дополнительных файла, которые будут содержать записи об автомобиле, работодателе и должностной инструкции. Вероятно, это не самый реалистичный сценарий, но он будет служить для демонстрации. Вот пример того, как будут выглядеть данные, прежде чем мы сделаем соединение:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
//The single person records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY
//Automobile records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,2003 Holden Cruze
81a43486-07e1-4b92-b92b-03d0caa87b5f,2012 Volkswagen T5
aef52cf1-f565-4124-bf18-47acdac47a0e,2009 Renault Trafic
//Employer records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Creative Wealth
81a43486-07e1-4b92-b92b-03d0caa87b5f,Susie's Casuals
aef52cf1-f565-4124-bf18-47acdac47a0e,Super Saver Foods
//Job Description records
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Data entry clerk
81a43486-07e1-4b92-b92b-03d0caa87b5f,Precision instrument and equipment repairer
aef52cf1-f565-4124-bf18-47acdac47a0e,Gas and water service dispatcher

Результаты «Один ко многим»

Теперь давайте рассмотрим пример результатов наших объединений «один ко многим» (для сравнения используются те же значения сверху):

1
2
3
cdd8dde3-0349-4f0d-b97a-7ae84b687f9c,Esther,Garner,4071 Haven Lane,Okemos,MI,2003 Holden Cruze,Creative Wealth,Data entry clerk
81a43486-07e1-4b92-b92b-03d0caa87b5f,Timothy,Duncan,753 Stadium Drive,Taunton,MA,2012 Volkswagen T5,Susie's Casuals,Precision instrument and equipment repairer
aef52cf1-f565-4124-bf18-47acdac47a0e,Brett,Ramsey,4985 Shinn Street,New York,NY,2009 Renault Trafic,Super Saver Foods,Gas and water service dispatcher

Как показывают результаты, мы смогли успешно объединить несколько значений в указанном порядке.

Вывод

Мы успешно продемонстрировали, как мы можем выполнять соединения на стороне уменьшения в MapReduce. Несмотря на то, что подход не слишком сложен, мы видим, что выполнение соединений в Hadoop может включать в себя написание достаточного количества кода. Хотя изучение объединения является полезным упражнением, в большинстве случаев нам лучше использовать такие инструменты, как Hive или Pig для объединения данных. Спасибо за ваше время.

Ресурсы