Но что, если вы хотите среднего ? Тогда тот же подход не сработает, потому что вычисление среднего значения не равно среднему из исходного набора чисел. Хотя с небольшим пониманием, мы все еще можем использовать локальную агрегацию. Для этих примеров мы будем использовать образец набора данных о погоде NCDC, который используется в книге Hadoop «Полное руководство» . Мы рассчитаем среднюю температуру для каждого месяца в 1901 году. Алгоритм усреднения для объединителя и опции объединения в картографе можно найти в главе 3.1.3 «Интенсивная обработка данных с помощью MapReduce».
Один размер не подходит для всех
В прошлый раз мы описали два подхода к сокращению данных в задании MapReduce: Hadoop Combiners и подход комбинирования in-mapper. Комбинаторы считаются оптимизацией средой Hadoop, и нет никаких гарантий относительно того, сколько раз они будут вызваны, если вообще будут вызваны. В результате преобразователи должны выдавать данные в ожидаемой преобразователями форме, поэтому, если объединители не задействованы, конечный результат не изменяется. Чтобы приспособиться к вычислению средних значений, нам нужно вернуться к мапперу и изменить его вывод.
Изменения в Mapper
В примере с подсчетом слов неоптимизированный преобразователь просто выдал слово и счетчик 1. Сопоставитель и преобразователь объединения в преобразователе оптимизировали этот вывод, сохраняя каждое слово в качестве ключа в хэш-карте с общим счетом в качестве значение. Каждый раз, когда слово было замечено, счетчик увеличивался на 1. При такой настройке, если объединитель не был вызван, редуктор получит слово в качестве ключа и длинный список из 1? S для сложения, что приведет к тому же результату (конечно, использование объединяющего средства отображения в mapper избежало этой проблемы, потому что оно гарантированно объединяет результаты как часть кода mapper). Чтобы вычислить среднее значение, у нас будет наш базовый преобразователь, который будет выдавать строковый ключ (объединенный вместе год и месяц наблюдения за погодой) и настраиваемый записываемый объект с именем TemperatureAveragingPair. Объект TemperatureAveragingPair будет содержать два числа (IntWritables), измеренную температуру и счетчик единиц. Мы возьмем MaximumTemperaMapper от Hadoop: полное руководство и используем его как источник вдохновения для создания AverageTemperaMapper:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class AverageTemperatureMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> { //sample line of weather data //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999 private Text outText = new Text(); private TemperatureAveragingPair pair = new TemperatureAveragingPair(); private static final int MISSING = 9999 ; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String yearMonth = line.substring( 15 , 21 ); int tempStartPosition = 87 ; if (line.charAt(tempStartPosition) == '+' ) { tempStartPosition += 1 ; } int temp = Integer.parseInt(line.substring(tempStartPosition, 92 )); if (temp != MISSING) { outText.set(yearMonth); pair.set(temp, 1 ); context.write(outText, pair); } } } |
Благодаря тому, что маппер выводит ключ и объект TemperatureAveragingPair, наша программа MapReduce гарантированно будет иметь правильные результаты независимо от того, вызван ли объединитель.
Сумматор
Нам нужно уменьшить количество отправляемых данных, чтобы мы суммировали температуры, суммировали значения и хранили их отдельно. Тем самым мы уменьшаем количество отправляемых данных, но сохраняем формат, необходимый для вычисления правильных средних значений. Если / когда вызывается объединитель, он возьмет все переданные в него объекты TemperatureAveragingPair и выдаст один объект TemperatureAveragingPair для того же ключа, содержащий суммированные температуры и значения. Вот код для объединителя:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
public class AverageTemperatureCombiner extends Reducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> { private TemperatureAveragingPair pair = new TemperatureAveragingPair(); @Override protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException { int temp = 0 ; int count = 0 ; for (TemperatureAveragingPair value : values) { temp += value.getTemp().get(); count += value.getCount().get(); } pair.set(temp,count); context.write(key,pair); } } |
Но мы действительно заинтересованы в том, чтобы гарантировать, что мы сократили объем данных, отправляемых редукторам, поэтому мы посмотрим, как этого добиться в дальнейшем.
В Mapper, объединяющем средние
Как и в примере с подсчетом слов, для вычисления средних значений маппер, объединяющий in mapper, будет использовать хеш-карту с объединенным годом + месяцом в качестве ключа и TemperatureAveragingPair в качестве значения. Каждый раз, когда мы получаем одну и ту же комбинацию год / месяц, мы убираем парный объект с карты, добавляем температуру и увеличиваем количество на единицу. Как только метод очистки будет вызван, мы выпустим все пары с соответствующим ключом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> { //sample line of weather data //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999 private static final int MISSING = 9999 ; private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String yearMonth = line.substring( 15 , 21 ); int tempStartPosition = 87 ; if (line.charAt(tempStartPosition) == '+' ) { tempStartPosition += 1 ; } int temp = Integer.parseInt(line.substring(tempStartPosition, 92 )); if (temp != MISSING) { TemperatureAveragingPair pair = pairMap.get(yearMonth); if (pair == null ){ pair = new TemperatureAveragingPair(); pairMap.put(yearMonth,pair); } int temps = pair.getTemp().get() + temp; int count = pair.getCount().get() + 1 ; pair.set(temps,count); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set<String> keys = pairMap.keySet(); Text keyText = new Text(); for (String key : keys) { keyText.set(key); context.write(keyText,pairMap.get(key)); } } } |
Следуя той же схеме отслеживания данных между вызовами карты, мы можем добиться надежного сокращения данных путем реализации стратегии объединения в картографе. Те же предостережения применяются для сохранения состояния во всех вызовах преобразователя, но рассмотрение преимуществ, которые могут быть достигнуты в эффективности обработки, при использовании этого подхода заслуживает некоторого рассмотрения.
редуктор
На этом этапе написать наш редуктор очень просто, составьте список пар для каждой клавиши, суммируйте все температуры и рассчитайте их, а затем разделите сумму температур на сумму подсчетов.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
public class AverageTemperatureReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> { private IntWritable average = new IntWritable(); @Override protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException { int temp = 0 ; int count = 0 ; for (TemperatureAveragingPair pair : values) { temp += pair.getTemp().get(); count += pair.getCount().get(); } average.set(temp / count); context.write(key, average); } } |
Полученные результаты
Результаты предсказуемы с помощью опций сумматора и комбинирования в картографическом модуле, показывающих существенно уменьшенный вывод данных.
Опция неоптимизированного картографа:
01
02
03
04
05
06
07
08
09
10
11
12
|
12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Reduce input groups= 12 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Combine output records= 0 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Map input records= 6565 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Reduce shuffle bytes= 111594 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Reduce output records= 12 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Spilled Records= 13128 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Map output bytes= 98460 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Total committed heap usage (bytes)= 269619200 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Combine input records= 0 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Map output records= 6564 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: SPLIT_RAW_BYTES= 108 12 / 10 / 10 23 : 05 : 28 INFO mapred.JobClient: Reduce input records= 6564 |
Опция Combiner:
01
02
03
04
05
06
07
08
09
10
11
12
|
12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Reduce input groups= 12 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Combine output records= 12 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Map input records= 6565 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Reduce shuffle bytes= 210 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Reduce output records= 12 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Spilled Records= 24 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Map output bytes= 98460 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Total committed heap usage (bytes)= 269619200 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Combine input records= 6564 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Map output records= 6564 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: SPLIT_RAW_BYTES= 108 12 / 10 / 10 23 : 07 : 19 INFO mapred.JobClient: Reduce input records= 12 |
Возможность комбинирования в картографе:
01
02
03
04
05
06
07
08
09
10
11
12
|
12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Reduce input groups= 12 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Combine output records= 0 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Map input records= 6565 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Reduce shuffle bytes= 210 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Reduce output records= 12 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Spilled Records= 24 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Map output bytes= 180 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Total committed heap usage (bytes)= 269619200 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Combine input records= 0 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Map output records= 12 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: SPLIT_RAW_BYTES= 108 12 / 10 / 10 23 : 09 : 09 INFO mapred.JobClient: Reduce input records= 12 |
Расчетные результаты:
(ПРИМЕЧАНИЕ: температуры в файле образца указаны в градусах Цельсия * 10)
Неоптимизированные | Сумматор | In-Mapper-Combiner Mapper |
190101-25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111-16 190112 -77 |
190101-25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111-16 190112 -77 |
190101-25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111-16 190112 -77 |
Вывод
Мы рассмотрели локальную агрегацию как для простого случая, когда можно повторно использовать редуктор в качестве объединителя, так и для более сложного случая, когда некоторое представление о том, как структурировать данные, все еще получают преимущества от локальной агрегации данных для повышения эффективности обработки.
Дальнейшее чтение
- Интенсивная обработка данных с MapReduce Джимми Лином и Крисом Дайером
- Hadoop: полное руководство Тома Уайта
- Исходный код из блога
- Hadoop API
- MRUnit для модульного тестирования Apache Hadoop map уменьшить количество рабочих мест
- Project Gutenberg — отличный источник книг в текстовом формате, отлично подходящий для локального тестирования заданий Hadoop.
Ссылка: Работа с интенсивной обработкой данных с помощью MapReduce — Локальная агрегация, часть II, от нашего партнера по JCG Билла Бекака в блоге « Случайные мысли о кодировании» .