Статьи

MapReduce: работа с интенсивной обработкой данных — локальная агрегация, часть II

Этот пост продолжается серией статей по реализации алгоритмов, которые можно найти в книге « Интенсивная обработка данных с помощью MapReduce» . Часть первая можно найти здесь . В предыдущем посте мы обсуждали использование техники локальной агрегации как средства уменьшения объема данных, перетасовываемых и передаваемых по сети. Сокращение объема передаваемых данных является одним из главных способов повысить эффективность задания MapReduce. Задание MapReduce с подсчетом слов использовалось для демонстрации локальной агрегации. Поскольку для результатов требуется только общее количество, мы могли бы повторно использовать тот же редуктор для нашего объединителя, поскольку изменение порядка или группировок добавлений не повлияет на сумму.

Но что, если вы хотите среднего ? Тогда тот же подход не сработает, потому что вычисление среднего значения не равно среднему из исходного набора чисел. Хотя с небольшим пониманием, мы все еще можем использовать локальную агрегацию. Для этих примеров мы будем использовать образец набора данных о погоде NCDC, который используется в книге Hadoop «Полное руководство» . Мы рассчитаем среднюю температуру для каждого месяца в 1901 году. Алгоритм усреднения для объединителя и опции объединения в картографе можно найти в главе 3.1.3 «Интенсивная обработка данных с помощью MapReduce».

Один размер не подходит для всех

В прошлый раз мы описали два подхода к сокращению данных в задании MapReduce: Hadoop Combiners и подход комбинирования in-mapper. Комбинаторы считаются оптимизацией средой Hadoop, и нет никаких гарантий относительно того, сколько раз они будут вызваны, если вообще будут вызваны. В результате преобразователи должны выдавать данные в ожидаемой преобразователями форме, поэтому, если объединители не задействованы, конечный результат не изменяется. Чтобы приспособиться к вычислению средних значений, нам нужно вернуться к мапперу и изменить его вывод.

Изменения в Mapper

В примере с подсчетом слов неоптимизированный преобразователь просто выдал слово и счетчик 1. Сопоставитель и преобразователь объединения в преобразователе оптимизировали этот вывод, сохраняя каждое слово в качестве ключа в хэш-карте с общим счетом в качестве значение. Каждый раз, когда слово было замечено, счетчик увеличивался на 1. При такой настройке, если объединитель не был вызван, редуктор получит слово в качестве ключа и длинный список из 1? S для сложения, что приведет к тому же результату (конечно, использование объединяющего средства отображения в mapper избежало этой проблемы, потому что оно гарантированно объединяет результаты как часть кода mapper). Чтобы вычислить среднее значение, у нас будет наш базовый преобразователь, который будет выдавать строковый ключ (объединенный вместе год и месяц наблюдения за погодой) и настраиваемый записываемый объект с именем TemperatureAveragingPair. Объект TemperatureAveragingPair будет содержать два числа (IntWritables), измеренную температуру и счетчик единиц. Мы возьмем MaximumTemperaMapper от Hadoop: полное руководство и используем его как источник вдохновения для создания AverageTemperaMapper:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AverageTemperatureMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {
 //sample line of weather data
 //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999
 
 
    private Text outText = new Text();
    private TemperatureAveragingPair pair = new TemperatureAveragingPair();
    private static final int MISSING = 9999;
 
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String yearMonth = line.substring(15, 21);
 
        int tempStartPosition = 87;
 
        if (line.charAt(tempStartPosition) == '+') {
            tempStartPosition += 1;
        }
 
        int temp = Integer.parseInt(line.substring(tempStartPosition, 92));
 
        if (temp != MISSING) {
            outText.set(yearMonth);
            pair.set(temp, 1);
            context.write(outText, pair);
        }
    }
}

Благодаря тому, что маппер выводит ключ и объект TemperatureAveragingPair, наша программа MapReduce гарантированно будет иметь правильные результаты независимо от того, вызван ли объединитель.

Сумматор

Нам нужно уменьшить количество отправляемых данных, чтобы мы суммировали температуры, суммировали значения и хранили их отдельно. Тем самым мы уменьшаем количество отправляемых данных, но сохраняем формат, необходимый для вычисления правильных средних значений. Если / когда вызывается объединитель, он возьмет все переданные в него объекты TemperatureAveragingPair и выдаст один объект TemperatureAveragingPair для того же ключа, содержащий суммированные температуры и значения. Вот код для объединителя:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
  public class AverageTemperatureCombiner extends Reducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> {
    private TemperatureAveragingPair pair = new TemperatureAveragingPair();
 
    @Override
    protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {
        int temp = 0;
        int count = 0;
        for (TemperatureAveragingPair value : values) {
             temp += value.getTemp().get();
             count += value.getCount().get();
        }
        pair.set(temp,count);
        context.write(key,pair);
    }
}

Но мы действительно заинтересованы в том, чтобы гарантировать, что мы сократили объем данных, отправляемых редукторам, поэтому мы посмотрим, как этого добиться в дальнейшем.

В Mapper, объединяющем средние

Как и в примере с подсчетом слов, для вычисления средних значений маппер, объединяющий in mapper, будет использовать хеш-карту с объединенным годом + месяцом в качестве ключа и TemperatureAveragingPair в качестве значения. Каждый раз, когда мы получаем одну и ту же комбинацию год / месяц, мы убираем парный объект с карты, добавляем температуру и увеличиваем количество на единицу. Как только метод очистки будет вызван, мы выпустим все пары с соответствующим ключом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> {
 //sample line of weather data
 //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999
 
 
    private static final int MISSING = 9999;
    private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>();
 
 
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String yearMonth = line.substring(15, 21);
 
        int tempStartPosition = 87;
 
        if (line.charAt(tempStartPosition) == '+') {
            tempStartPosition += 1;
        }
 
        int temp = Integer.parseInt(line.substring(tempStartPosition, 92));
 
        if (temp != MISSING) {
            TemperatureAveragingPair pair = pairMap.get(yearMonth);
            if(pair == null){
                pair = new TemperatureAveragingPair();
                pairMap.put(yearMonth,pair);
            }
            int temps = pair.getTemp().get() + temp;
            int count = pair.getCount().get() + 1;
            pair.set(temps,count);
        }
    }
 
 
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        Set<String> keys = pairMap.keySet();
        Text keyText = new Text();
        for (String key : keys) {
             keyText.set(key);
             context.write(keyText,pairMap.get(key));
        }
    }
}

Следуя той же схеме отслеживания данных между вызовами карты, мы можем добиться надежного сокращения данных путем реализации стратегии объединения в картографе. Те же предостережения применяются для сохранения состояния во всех вызовах преобразователя, но рассмотрение преимуществ, которые могут быть достигнуты в эффективности обработки, при использовании этого подхода заслуживает некоторого рассмотрения.

редуктор

На этом этапе написать наш редуктор очень просто, составьте список пар для каждой клавиши, суммируйте все температуры и рассчитайте их, а затем разделите сумму температур на сумму подсчетов.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
public class AverageTemperatureReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> {
    private IntWritable average = new IntWritable();
 
    @Override
    protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException {
        int temp = 0;
        int count = 0;
        for (TemperatureAveragingPair pair : values) {
            temp += pair.getTemp().get();
            count += pair.getCount().get();
        }
        average.set(temp / count);
        context.write(key, average);
    }
}


Полученные результаты

Результаты предсказуемы с помощью опций сумматора и комбинирования в картографическом модуле, показывающих существенно уменьшенный вывод данных.
Опция неоптимизированного картографа:

01
02
03
04
05
06
07
08
09
10
11
12
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:05:28 INFO mapred.JobClient:     Combine output records=0
12/10/10 23:05:28 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce shuffle bytes=111594
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:05:28 INFO mapred.JobClient:     Spilled Records=13128
12/10/10 23:05:28 INFO mapred.JobClient:     Map output bytes=98460
12/10/10 23:05:28 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:05:28 INFO mapred.JobClient:     Combine input records=0
12/10/10 23:05:28 INFO mapred.JobClient:     Map output records=6564
12/10/10 23:05:28 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:05:28 INFO mapred.JobClient:     Reduce input records=6564

Опция Combiner:

01
02
03
04
05
06
07
08
09
10
11
12
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:07:19 INFO mapred.JobClient:     Combine output records=12
12/10/10 23:07:19 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce shuffle bytes=210
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:07:19 INFO mapred.JobClient:     Spilled Records=24
12/10/10 23:07:19 INFO mapred.JobClient:     Map output bytes=98460
12/10/10 23:07:19 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:07:19 INFO mapred.JobClient:     Combine input records=6564
12/10/10 23:07:19 INFO mapred.JobClient:     Map output records=6564
12/10/10 23:07:19 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:07:19 INFO mapred.JobClient:     Reduce input records=12

Возможность комбинирования в картографе:

01
02
03
04
05
06
07
08
09
10
11
12
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input groups=12
12/10/10 23:09:09 INFO mapred.JobClient:     Combine output records=0
12/10/10 23:09:09 INFO mapred.JobClient:     Map input records=6565
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce shuffle bytes=210
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce output records=12
12/10/10 23:09:09 INFO mapred.JobClient:     Spilled Records=24
12/10/10 23:09:09 INFO mapred.JobClient:     Map output bytes=180
12/10/10 23:09:09 INFO mapred.JobClient:     Total committed heap usage (bytes)=269619200
12/10/10 23:09:09 INFO mapred.JobClient:     Combine input records=0
12/10/10 23:09:09 INFO mapred.JobClient:     Map output records=12
12/10/10 23:09:09 INFO mapred.JobClient:     SPLIT_RAW_BYTES=108
12/10/10 23:09:09 INFO mapred.JobClient:     Reduce input records=12

Расчетные результаты:
(ПРИМЕЧАНИЕ: температуры в файле образца указаны в градусах Цельсия * 10)

Неоптимизированные Сумматор In-Mapper-Combiner Mapper
190101-25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111-16
190112 -77
190101-25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111-16
190112 -77
190101-25
190102 -91
190103 -49
190104 22
190105 76
190106 146
190107 192
190108 170
190109 114
190110 86
190111-16
190112 -77


Вывод

Мы рассмотрели локальную агрегацию как для простого случая, когда можно повторно использовать редуктор в качестве объединителя, так и для более сложного случая, когда некоторое представление о том, как структурировать данные, все еще получают преимущества от локальной агрегации данных для повышения эффективности обработки.

Дальнейшее чтение

Ссылка: Работа с интенсивной обработкой данных с помощью MapReduce — Локальная агрегация, часть II, от нашего партнера по JCG Билла Бекака в блоге « Случайные мысли о кодировании» .