Этот пост продолжается серией статей по реализации алгоритмов, которые можно найти в книге « Интенсивная обработка данных с помощью MapReduce» . В предыдущем посте мы обсуждали использование техники локальной агрегации как средства уменьшения объема данных, перетасовываемых и передаваемых по сети. Сокращение объема передаваемых данных является одним из главных способов повысить эффективность задания MapReduce. Задание MapReduce с подсчетом слов использовалось для демонстрации локальной агрегации. Поскольку для результатов требуется только общее количество, мы могли бы повторно использовать тот же редуктор для нашего объединителя, поскольку изменение порядка или группировок добавлений не повлияет на сумму. Но что, если вы хотели в среднем? Тогда тот же подход не сработает, потому что вычисление среднего значения не равно среднему из исходного набора чисел. Хотя с небольшим пониманием, мы все еще можем использовать локальную агрегацию. Для этих примеров мы будем использовать образец набора данных о погоде NCDC, который используется в книге Hadoop «Полное руководство» . Мы рассчитаем среднюю температуру для каждого месяца в 1901 году. Алгоритм усреднения для объединителя и опции объединения в картографе можно найти в главе 3.1.3 «Интенсивная обработка данных с помощью MapReduce».
Один размер не подходит для всех
В прошлый раз мы описали два подхода к сокращению данных в задании MapReduce: Hadoop Combiners и подход комбинирования in-mapper. Комбинаторы считаются оптимизацией средой Hadoop, и нет никаких гарантий относительно того, сколько раз они будут вызваны, если вообще будут вызваны. В результате, преобразователи должны выдавать данные в ожидаемой преобразователями форме, поэтому, если объединители не задействованы, конечный результат не изменяется. Чтобы приспособиться к вычислению средних значений, нам нужно вернуться к мапперу и изменить его вывод.
Изменения в Mapper
В примере с подсчетом слов неоптимизированный преобразователь просто выдал слово и счетчик 1. Сопоставитель и преобразователь объединения в преобразователе оптимизировали этот вывод, сохраняя каждое слово в качестве ключа в хэш-карте с общим счетом в качестве стоимость. Каждый раз, когда слово было замечено, счетчик увеличивался на 1. При такой настройке, если объединитель не был вызван, редуктор получит слово в качестве ключа и длинный список из 1 для сложения, что приведет к тому же результату (конечно, использование объединяющего средства отображения в mapper избежало этой проблемы, потому что оно гарантированно объединяет результаты как часть кода mapper). Чтобы вычислить среднее значение, у нас будет наш базовый преобразователь, который будет выдавать строковый ключ (объединенный год и месяц наблюдения за погодой) и настраиваемый записываемый объект, который называется TemperatureAveragingPair.Объект TemperatureAveragingPair будет содержать два числа (IntWritables), измеренную температуру и счетчик один. Мы возьмем MaximumTemperaMapper от Hadoop: полное руководство и используем его как источник вдохновения для создания AverageTemperaMapper:
public class AverageTemperatureMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> { //sample line of weather data //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999 private Text outText = new Text(); private TemperatureAveragingPair pair = new TemperatureAveragingPair(); private static final int MISSING = 9999; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String yearMonth = line.substring(15, 21); int tempStartPosition = 87; if (line.charAt(tempStartPosition) == '+') { tempStartPosition += 1; } int temp = Integer.parseInt(line.substring(tempStartPosition, 92)); if (temp != MISSING) { outText.set(yearMonth); pair.set(temp, 1); context.write(outText, pair); } } }
Благодаря тому, что маппер выводит ключ и объект TemperatureAveragingPair, наша программа MapReduce гарантированно будет иметь правильные результаты независимо от того, вызван ли комбинатор.
Сумматор
Нам нужно уменьшить количество отправляемых данных, чтобы мы суммировали температуры, суммировали значения и хранили их отдельно. Тем самым мы уменьшаем количество отправляемых данных, но сохраняем формат, необходимый для вычисления правильных средних значений. Если / когда вызывается объединитель, он возьмет все переданные внутрь объекты TemperatureAveragingPair и издаст один объект TemperatureAveragingPair для того же ключа, содержащий суммированные температуры и значения. Вот код для объединителя:
public class AverageTemperatureCombiner extends Reducer<Text,TemperatureAveragingPair,Text,TemperatureAveragingPair> { private TemperatureAveragingPair pair = new TemperatureAveragingPair(); @Override protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException { int temp = 0; int count = 0; for (TemperatureAveragingPair value : values) { temp += value.getTemp().get(); count += value.getCount().get(); } pair.set(temp,count); context.write(key,pair); } }
Но мы действительно заинтересованы в том, чтобы гарантировать, что мы сократили объем данных, отправляемых редукторам, поэтому мы посмотрим, как это сделать дальше.
В Mapper, объединяющем средние
Как и в примере с подсчетом слов, для вычисления средних значений маппер, объединяющий in mapper, будет использовать хеш-карту с объединенным годом + месяцом в качестве ключа и TemperatureAveragingPair в качестве значения. Каждый раз, когда мы получаем одну и ту же комбинацию год / месяц, мы убираем парный объект с карты, добавляем температуру и увеличиваем количество на единицу. Как только метод очистки будет вызван, мы выпустим все пары с соответствующим ключом:
public class AverageTemperatureCombiningMapper extends Mapper<LongWritable, Text, Text, TemperatureAveragingPair> { //sample line of weather data //0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF10899199999999999 private static final int MISSING = 9999; private Map<String,TemperatureAveragingPair> pairMap = new HashMap<String,TemperatureAveragingPair>(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String yearMonth = line.substring(15, 21); int tempStartPosition = 87; if (line.charAt(tempStartPosition) == '+') { tempStartPosition += 1; } int temp = Integer.parseInt(line.substring(tempStartPosition, 92)); if (temp != MISSING) { TemperatureAveragingPair pair = pairMap.get(yearMonth); if(pair == null){ pair = new TemperatureAveragingPair(); pairMap.put(yearMonth,pair); } int temps = pair.getTemp().get() + temp; int count = pair.getCount().get() + 1; pair.set(temps,count); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set<String> keys = pairMap.keySet(); Text keyText = new Text(); for (String key : keys) { keyText.set(key); context.write(keyText,pairMap.get(key)); } } }
Следуя той же схеме отслеживания данных между вызовами карты, мы можем добиться надежного сокращения данных путем реализации стратегии объединения в картографе. Те же предостережения применяются для сохранения состояния во всех вызовах преобразователя, но рассмотрение преимуществ, которые могут быть достигнуты в эффективности обработки, при использовании этого подхода заслуживает некоторого рассмотрения.
редуктор
На этом этапе написать наш редуктор очень просто, составьте список пар для каждой клавиши, суммируйте все температуры и рассчитайте их, а затем разделите сумму температур на сумму подсчетов.
public class AverageTemperatureReducer extends Reducer<Text, TemperatureAveragingPair, Text, IntWritable> { private IntWritable average = new IntWritable(); @Override protected void reduce(Text key, Iterable<TemperatureAveragingPair> values, Context context) throws IOException, InterruptedException { int temp = 0; int count = 0; for (TemperatureAveragingPair pair : values) { temp += pair.getTemp().get(); count += pair.getCount().get(); } average.set(temp / count); context.write(key, average); } }
Полученные результаты
Результаты предсказуемы с помощью опций сумматора и комбинирования в картографическом модуле, показывающих существенно уменьшенный вывод данных.
Опция неоптимизированного картографа:
12/10/10 23:05:28 INFO mapred.JobClient: Reduce input groups=12 12/10/10 23:05:28 INFO mapred.JobClient: Combine output records=0 12/10/10 23:05:28 INFO mapred.JobClient: Map input records=6565 12/10/10 23:05:28 INFO mapred.JobClient: Reduce shuffle bytes=111594 12/10/10 23:05:28 INFO mapred.JobClient: Reduce output records=12 12/10/10 23:05:28 INFO mapred.JobClient: Spilled Records=13128 12/10/10 23:05:28 INFO mapred.JobClient: Map output bytes=98460 12/10/10 23:05:28 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200 12/10/10 23:05:28 INFO mapred.JobClient: Combine input records=0 12/10/10 23:05:28 INFO mapred.JobClient: Map output records=6564 12/10/10 23:05:28 INFO mapred.JobClient: SPLIT_RAW_BYTES=108 12/10/10 23:05:28 INFO mapred.JobClient: Reduce input records=6564
Опция Combiner:
12/10/10 23:07:19 INFO mapred.JobClient: Reduce input groups=12 12/10/10 23:07:19 INFO mapred.JobClient: Combine output records=12 12/10/10 23:07:19 INFO mapred.JobClient: Map input records=6565 12/10/10 23:07:19 INFO mapred.JobClient: Reduce shuffle bytes=210 12/10/10 23:07:19 INFO mapred.JobClient: Reduce output records=12 12/10/10 23:07:19 INFO mapred.JobClient: Spilled Records=24 12/10/10 23:07:19 INFO mapred.JobClient: Map output bytes=98460 12/10/10 23:07:19 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200 12/10/10 23:07:19 INFO mapred.JobClient: Combine input records=6564 12/10/10 23:07:19 INFO mapred.JobClient: Map output records=6564 12/10/10 23:07:19 INFO mapred.JobClient: SPLIT_RAW_BYTES=108 12/10/10 23:07:19 INFO mapred.JobClient: Reduce input records=12
Возможность комбинирования в картографе:
12/10/10 23:09:09 INFO mapred.JobClient: Reduce input groups=12 12/10/10 23:09:09 INFO mapred.JobClient: Combine output records=0 12/10/10 23:09:09 INFO mapred.JobClient: Map input records=6565 12/10/10 23:09:09 INFO mapred.JobClient: Reduce shuffle bytes=210 12/10/10 23:09:09 INFO mapred.JobClient: Reduce output records=12 12/10/10 23:09:09 INFO mapred.JobClient: Spilled Records=24 12/10/10 23:09:09 INFO mapred.JobClient: Map output bytes=180 12/10/10 23:09:09 INFO mapred.JobClient: Total committed heap usage (bytes)=269619200 12/10/10 23:09:09 INFO mapred.JobClient: Combine input records=0 12/10/10 23:09:09 INFO mapred.JobClient: Map output records=12 12/10/10 23:09:09 INFO mapred.JobClient: SPLIT_RAW_BYTES=108 12/10/10 23:09:09 INFO mapred.JobClient: Reduce input records=12
Расчетные результаты:
(ПРИМЕЧАНИЕ: температуры в файле образца приведены в градусах Цельсия * 10)
Неоптимизированные | Сумматор | In-Mapper-Combiner Mapper |
190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 |
190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 |
190101 -25 190102 -91 190103 -49 190104 22 190105 76 190106 146 190107 192 190108 170 190109 114 190110 86 190111 -16 190112 -77 |
Вывод
Мы рассмотрели локальную агрегацию как для простого случая, когда можно повторно использовать редуктор в качестве объединителя, так и для более сложного случая, когда некоторое представление о том, как структурировать данные, все еще получают преимущества от локальной агрегации данных для повышения эффективности обработки.
Ресурсы
- Интенсивная обработка данных с MapReduce Джимми Лином и Крисом Дайером
- Hadoop: полное руководство Тома Уайта
- Исходный код из блога
- Hadoop API
- MRUnit для модульного тестирования Apache Hadoop map уменьшить количество рабочих мест
- Project Gutenberg — отличный источник книг в текстовом формате, отлично подходящий для локального тестирования заданий Hadoop.