Статьи

Алгоритмы MapReduce: понимание объединения данных, часть II

Прошло некоторое время с тех пор, как я последний раз писал, и, как и в прошлый раз, когда я взял большой перерыв, я брал несколько уроков на Coursera. На этот раз это были Принципы функционального программирования в Scala и Принципы реактивного программирования . Я считаю, что они оба — отличные курсы, и я бы порекомендовал выбрать любой из них, если у вас есть время В этом посте мы возобновляем нашу серию о реализации алгоритмов обработки текстов с интенсивным использованием данных с помощью MapReduce , на этот раз о соединениях на стороне карты. Как можно догадаться из названия, объединения на стороне карты соединяют данные исключительно на этапе отображения и полностью пропускают этап сокращения. В последнем посте о соединениях данных мы рассмотрели уменьшение количества боковых соединений, Объединения на стороне сокращения просты в реализации, но имеют недостаток, заключающийся в том, что все данные передаются по сети в редукторы. Объединения на стороне карты обеспечивают существенное повышение производительности, поскольку мы избегаем затрат на передачу данных по сети. Однако, в отличие от объединений на стороне уменьшения, объединения на стороне карты требуют соблюдения очень определенных критериев. Сегодня мы обсудим требования к объединениям на стороне карты и способы их реализации.

Условия соединения на стороне карты

Чтобы воспользоваться объединениями на стороне карты, наши данные должны соответствовать одному из следующих критериев:

  1. Наборы данных для объединения уже отсортированы по одному и тому же ключу и имеют одинаковое количество разделов

  2. Из двух наборов данных, которые нужно объединить, один достаточно мал, чтобы поместиться в память

Мы рассмотрим первый сценарий, когда у нас есть два (или более) набора данных, которые необходимо объединить, но они слишком велики, чтобы уместиться в память. Мы предположим, что в худшем случае файлы не сортируются и не разбиваются на разделы.

Формат данных

Прежде чем мы начнем, давайте посмотрим на данные, с которыми мы работаем. У нас будет два набора данных:

  1. Первый набор данных состоит из GUID, имени, фамилии, адреса, города и штата

  2. Второй набор данных состоит из GUID и информации о работодателе.

Оба набора данных разделены запятыми, а ключ соединения (GUID) находится на первой позиции. После объединения мы хотим, чтобы информация работодателя из второго набора данных была добавлена ​​в конец первого набора данных. Кроме того, мы хотим сохранить GUID в первой позиции набора данных один, но удалить GUID из набора данных два. Набор данных 1:

aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA
  08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC
  de68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN
  6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI

Набор данных 2:

 de68186a-1004-4211-a866-736f414eac61,Jacobs
  6df1882d-4c81-4155-9d8b-0c35b2d34284,Chief Auto Parts
  aef9422c-d08c-4457-9760-f2d564d673bc,Earthworks Yard Maintenance
  08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms 

Регистрация результатов:

08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms
6df1882d-4c81-4155-9d8b-0c35b2d34284,John,Schofield,65 Summit Park Avenue,Detroit,MI,Chief Auto Parts
aef9422c-d08c-4457-9760-f2d564d673bc,Linda,Narvaez,3253 Davis Street,Atlanta,GA,Earthworks Yard Maintenance
de68186a-1004-4211-a866-736f414eac61,Charles,Arnold,1764 Public Works Drive,Johnson City,TN,Jacobs

Теперь мы переходим к тому, как мы собираемся объединить наши два набора данных.

Объединения на стороне карты с большими наборами данных

Чтобы иметь возможность выполнять соединения на стороне карты, нам необходимо отсортировать данные по одному и тому же ключу и иметь одинаковое количество разделов, что означает, что все ключи для любой записи находятся в одном разделе. Хотя это кажется жестким требованием, его легко исправить. Hadoop сортирует все ключи и гарантирует, что ключи с одинаковым значением отправляются одному и тому же редуктору. Таким образом, просто запустив задание MapReduce, которое не делает ничего, кроме вывода данных по ключу, к которому вы хотите присоединиться, и указав одинаковое количество редукторов для всех наборов данных, мы получим наши данные в правильной форме. Принимая во внимание повышение эффективности от возможности объединения на стороне карты, это может стоить затрат на запуск дополнительных заданий MapReduce. На этом этапе следует повторить, что крайне важно, чтобы все наборы данных указывали точноетакое же количество редукторов на этапе «подготовки», когда данные будут отсортированы и разделены. В этом посте мы возьмем два набора данных и запустим начальное задание MapReduce для обоих, чтобы выполнить сортировку и разбиение, а затем запустим последнее задание для выполнения соединения на стороне карты. Сначала давайте рассмотрим задачу MapReduce, чтобы отсортировать и разделить наши данные таким же образом.

Шаг первый: сортировка и разбиение

Сначала нам нужно создать Mapperключ, который просто выберет ключ для сортировки по заданному индексу:

 public class SortByKeyMapper extends Mapper<LongWritable, Text, Text, Text> {

    private int keyIndex;
    private Splitter splitter;
    private Joiner joiner;
    private Text joinKey = new Text();


    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        String separator =  context.getConfiguration().get("separator");
        keyIndex = Integer.parseInt(context.getConfiguration().get("keyIndex"));
        splitter = Splitter.on(separator);
        joiner = Joiner.on(separator);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Iterable<String> values = splitter.split(value.toString());
        joinKey.set(Iterables.get(values,keyIndex));
        if(keyIndex != 0){
            value.set(reorderValue(values,keyIndex));
        }
        context.write(joinKey,value);
    }


    private String reorderValue(Iterable<String> value, int index){
        List<String> temp = Lists.newArrayList(value);
        String originalFirst = temp.get(0);
        String newFirst = temp.get(index);
        temp.set(0,newFirst);
        temp.set(index,originalFirst);
        return joiner.join(temp);
    }
}

SortByKeyMapperПросто устанавливает значение свойства joinKeyпутем извлечения значения из заданной строки текста найденного в позиции , заданной параметром конфигурации keyIndex. Кроме того, если значение keyIndexне равно нулю, мы меняем порядок значений, найденных в первой позиции и keyIndexпозиции. Хотя это сомнительная функция, мы обсудим, почему мы делаем это позже. Далее нам нужно Reducer:

public class SortByKeyReducer extends Reducer<Text,Text,NullWritable,Text> {

    private static final NullWritable nullKey = NullWritable.get();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
             context.write(nullKey,value);
        }
    }
}

SortByKeyReducerВыписывает все значения для данного ключа, но выбрасывает ключ и пишет NullWritableвместо этого. В следующем разделе мы объясним, почему мы не используем ключ.

Шаг второй: соединение на стороне карты

При выполнении соединения на стороне карты записи объединяются до того, как они достигают картографа. Для этого мы используем CompositeInputFormat . Нам также нужно будет установить некоторые свойства конфигурации. Давайте посмотрим, как мы настроим наше соединение на стороне карты:

private static Configuration getMapJoinConfiguration(String separator, String... paths) {
        Configuration config = new Configuration();
        config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", separator);
        String joinExpression = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, paths);
        config.set("mapred.join.expr", joinExpression);
        config.set("separator", separator);
        return config;
    }

Сначала мы указываем символ, который разделяет ключ и значения, устанавливая mapreduce.input.keyvaluelinerecordreader.key.value.separatorсвойство. Затем мы используем CompositeInputFormat.composeметод для создания «выражения соединения», определяющего внутреннее соединение, используя слово «внутреннее», затем определяя используемый формат ввода, класс KeyValueTextInput и, наконец, переменные String, представляющие пути к файлам для присоединения (что являются выходными путями заданий отображения карты, выполняемых для сортировки и разделения данных). KeyValueTextInputFormatКласс будет использовать разделитель , чтобы установить первое значение в качестве ключа , а остальные будут использоваться для значения.

Картограф для соединения

Как только значения из исходных файлов были объединены, Mapper.mapвызывается метод, он получает Textобъект для ключа (тот же ключ для всех объединенных записей) и объект, TupleWritableкоторый состоит из значений, соединенных из наших входных файлов для данного ключа. Помните, что мы хотим, чтобы в нашем конечном выводе ключ соединения находился на первой позиции, а за ним следовали все объединенные значения в одном разделителе String. Для этого у нас есть специальный картограф, который помещает наши данные в правильный формат:

public class CombineValuesMapper extends Mapper<Text, TupleWritable, NullWritable, Text> {

    private static final NullWritable nullKey = NullWritable.get();
    private Text outValue = new Text();
    private StringBuilder valueBuilder = new StringBuilder();
    private String separator;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        separator = context.getConfiguration().get("separator");
    }

    @Override
    protected void map(Text key, TupleWritable value, Context context) throws IOException, InterruptedException {
        valueBuilder.append(key).append(separator);
        for (Writable writable : value) {
            valueBuilder.append(writable.toString()).append(separator);
        }
        valueBuilder.setLength(valueBuilder.length() - 1);
        outValue.set(valueBuilder.toString());
        context.write(nullKey, outValue);
        valueBuilder.setLength(0);
    }
}

В CombineValuesMapperмы добавляем ключ и все объединенные значения в один разделитель String. Здесь мы наконец можем увидеть причину, по которой мы выбросили ключ соединения в предыдущих заданиях MapReduce. Поскольку ключ является первой позицией в значениях для всех наборов данных, которые нужно объединить, наш картограф естественным образом удаляет дубликаты ключей из объединенных наборов данных. Все, что нам нужно сделать, это вставить данный ключ в, а StringBuilderзатем добавить значения, содержащиеся в TupleWritable.

Собираем все вместе

Теперь у нас есть весь код для выполнения соединения на стороне карты для больших наборов данных. Давайте посмотрим, как мы будем выполнять все задания вместе. Как было сказано ранее, мы предполагаем, что наши данные не сортируются и не разделяются, поэтому нам нужно запустить N (2 в данном случае) заданий MapReduce, чтобы получить данные в правильном формате. После запуска начальных заданий сортировки / разбиения будет выполняться последнее задание, выполняющее фактическое соединение.

public class MapSideJoinDriver {

    public static void main(String[] args) throws Exception {
        String separator = ",";
        String keyIndex = "0";
        int numReducers = 10;
        String jobOneInputPath = args[0];
        String jobTwoInputPath = args[1];
        String joinJobOutPath = args[2];

        String jobOneSortedPath = jobOneInputPath + "_sorted";
        String jobTwoSortedPath = jobTwoInputPath + "_sorted";

        Job firstSort = Job.getInstance(getConfiguration(keyIndex, separator));
        configureJob(firstSort, "firstSort", numReducers, jobOneInputPath, jobOneSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);

        Job secondSort = Job.getInstance(getConfiguration(keyIndex, separator));
        configureJob(secondSort, "secondSort", numReducers, jobTwoInputPath, jobTwoSortedPath, SortByKeyMapper.class, SortByKeyReducer.class);

        Job mapJoin = Job.getInstance(getMapJoinConfiguration(separator, jobOneSortedPath, jobTwoSortedPath));
        configureJob(mapJoin, "mapJoin", 0, jobOneSortedPath + "," + jobTwoSortedPath, joinJobOutPath, CombineValuesMapper.class, Reducer.class);
        mapJoin.setInputFormatClass(CompositeInputFormat.class);

        List<Job> jobs = Lists.newArrayList(firstSort, secondSort, mapJoin);
        int exitStatus = 0;
        for (Job job : jobs) {
            boolean jobSuccessful = job.waitForCompletion(true);
            if (!jobSuccessful) {
                System.out.println("Error with job " + job.getJobName() + "  " + job.getStatus().getFailureInfo());
                exitStatus = 1;
                break;
            }
        }
        System.exit(exitStatus);
    }

MapSideJoinDriverДелает базовую конфигурацию для выполнения заданий MapReduce. Одним интересным моментом является то, что задания сортировки / разбиения задают по 10 редукторов каждый, в то время как последнее задание явно устанавливает количество редукторов равным 0, поскольку мы объединяемся на стороне карты и не нуждаемся в фазе редукции. Поскольку у нас нет каких-либо сложных зависимостей, мы помещаем задания в ArrayList и запускаем задания в линейном порядке (строки 24-33).

Полученные результаты

Изначально у нас было 2 файла; информация об имени и адресе в первом файле и информация о занятости во втором. Оба файла имели уникальный идентификатор в первом столбце. Файл один:

....
08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC
...

Файл два:

....
08db7c55-22ae-4199-8826-c67a5689f838,Ellman's Catalog Showrooms
....

Полученные результаты:

08db7c55-22ae-4199-8826-c67a5689f838,John,Gregory,258 Khale Street,Florence,SC,Ellman's Catalog Showrooms

Как мы видим здесь, мы успешно объединили записи и сохранили формат файлов без дублирующих ключей в результатах.

Вывод

В этом посте мы продемонстрировали, как выполнить соединение на стороне карты, когда оба набора данных велики и не могут поместиться в памяти. Если вы чувствуете, что для этого требуется много работы, вы правы. Хотя в большинстве случаев мы хотели бы использовать инструменты более высокого уровня, такие как Pig или Hive, полезно знать механизм выполнения объединений на стороне карты с большими наборами данных. Это особенно актуально в тех случаях, когда вам нужно написать решение с нуля. Спасибо за ваше время.