Местная агрегация
На очень высоком уровне, когда Mappers генерируют данные, промежуточные результаты записываются на диск, а затем отправляются по сети в редукторы для окончательной обработки. Задержка записи на диск и последующей передачи данных по сети — дорогостоящая операция при обработке задания MapReduce. Поэтому вполне понятно, что, когда это возможно, уменьшение объема данных, отправляемых из картографических систем, увеличило бы скорость задания MapReduce. Локальная агрегация — это метод, используемый для уменьшения объема данных и повышения эффективности нашей работы MapReduce. Локальная агрегация не может заменить редукторы, так как нам нужен способ сбора результатов с одним и тем же ключом из разных картографов. Мы рассмотрим 3 способа достижения локальной агрегации:
- Использование функций Hadoop Combiner.
- Два подхода к комбинированию «in-mapper» представлены в книге «Обработка текста с MapReduce».
Конечно, любая оптимизация будет иметь компромиссы, и мы также обсудим их.
Чтобы продемонстрировать локальную агрегацию, мы запустим вездесущее задание подсчета слов в простой текстовой версии A Christmas Carol Чарльза Диккенса (загруженной из Project Gutenberg ) в псевдораспределенном кластере, установленном на моем MacBookPro, с использованием hadoop-0.20.2-cdh3u3 раздача от Cloudera . Я планирую в будущем посте провести такой же эксперимент на кластере EC2 с более реалистичными данными.
Сумматоры
Функция объединения — это объект, который расширяет класс Reducer. Фактически, для наших примеров здесь мы собираемся повторно использовать тот же редуктор, который использовался в задании подсчета слов. Функция объединения указывается при настройке задания MapReduce следующим образом:
1
|
job.setReducerClass(TokenCountReducer. class ); |
Вот код редуктора:
01
02
03
04
05
06
07
08
09
10
|
public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0 ; for (IntWritable value : values) { count+= value.get(); } context.write(key, new IntWritable(count)); } } |
Задача объединителя состоит в том, чтобы сделать именно то, что подразумевает название, объединенные данные с чистым результатом меньшего количества данных начинают перетасовываться по сети, что дает нам повышение эффективности. Как указывалось ранее, имейте в виду, что редукторы по-прежнему должны объединять результаты с использованием одних и тех же ключей от разных картографов. Поскольку функции объединителя являются оптимизацией, среда Hadoop не дает никаких гарантий относительно того, сколько раз будет вызываться объединитель, если вообще будет.
Вариант объединения картографов 1
Первая альтернатива использованию Combiners (рис. 3.2 на стр. 41) очень прямолинейна и вносит небольшую модификацию в наш оригинальный механизм подсчета слов:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { IntWritable writableCount = new IntWritable(); Text text = new Text(); Map<String,Integer> tokenMap = new HashMap<String, Integer>(); StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreElements()){ String token = tokenizer.nextToken(); Integer count = tokenMap.get(token); if (count == null ) count = new Integer( 0 ); count+= 1 ; tokenMap.put(token,count); } Set<String> keys = tokenMap.keySet(); for (String s : keys) { text.set(s); writableCount.set(tokenMap.get(s)); context.write(text,writableCount); } } } |
Как мы видим здесь, вместо испускания слова со счетом 1 для каждого встреченного слова мы используем карту для отслеживания каждого слова, уже обработанного. Затем, когда все токены обработаны, мы перебираем карту и выводим общее количество для каждого слова, встречающегося в этой строке.
В Mapper Combining Option 2
Второй вариант комбинирования картографов (рис. 3.3 на стр. 41) очень похож на приведенный выше пример с двумя отличиями — когда создается хэш-карта и когда мы генерируем результаты, содержащиеся в карте. В приведенном выше примере создается карта, и ее содержимое сбрасывается по сети для каждого вызова метода карты. В этом примере мы собираемся сделать карту переменной экземпляра и перенести создание экземпляра карты на метод setUp в нашем преобразователе. Аналогично, содержимое карты не будет отправлено редукторам до тех пор, пока не будут завершены все вызовы mapper и не будет вызван метод cleanUp.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> { private Map<String,Integer> tokenMap; @Override protected void setup(Context context) throws IOException, InterruptedException { tokenMap = new HashMap<String, Integer>(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreElements()){ String token = tokenizer.nextToken(); Integer count = tokenMap.get(token); if (count == null ) count = new Integer( 0 ); count+= 1 ; tokenMap.put(token,count); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { IntWritable writableCount = new IntWritable(); Text text = new Text(); Set<String> keys = tokenMap.keySet(); for (String s : keys) { text.set(s); writableCount.set(tokenMap.get(s)); context.write(text,writableCount); } } } |
Как видно из приведенного выше примера кода, картограф отслеживает уникальное количество слов во всех вызовах метода map. Отслеживая уникальные токены и их количество, необходимо значительно сократить количество записей, отправляемых редукторам, что, в свою очередь, должно сократить время выполнения задания MapReduce. Это выполняет тот же эффект, что и использование функции функции объединения, предоставляемой платформой MapReduce, но в этом случае вы гарантированно вызовете код объединения. Но есть некоторые предостережения с этим подходом. Сохранение состояния при вызовах карты может оказаться проблематичным и определенно является нарушением функционального духа функции «карты». Кроме того, поддерживая состояние всех картографов, в зависимости от данных, используемых в задании, память может стать еще одной проблемой, с которой приходится сталкиваться. В конечном счете, нужно было бы взвесить все компромиссы, чтобы определить лучший подход.
Полученные результаты
Теперь давайте посмотрим на некоторые результаты различных картографов. Поскольку задание выполнялось в псевдораспределенном режиме, фактическое время выполнения не имеет значения, но мы все же можем сделать вывод о том, как использование локальной агрегации может повлиять на эффективность задания MapReduce в реальном кластере.
В Token Mapper:
1
2
3
4
|
12 / 09 / 13 21 : 25 : 32 INFO mapred.JobClient: Reduce shuffle bytes= 366010 12 / 09 / 13 21 : 25 : 32 INFO mapred.JobClient: Reduce output records= 7657 12 / 09 / 13 21 : 25 : 32 INFO mapred.JobClient: Spilled Records= 63118 12 / 09 / 13 21 : 25 : 32 INFO mapred.JobClient: Map output bytes= 302886 |
В варианте сокращения Mapper 1:
1
2
3
4
|
12 / 09 / 13 21 : 28 : 15 INFO mapred.JobClient: Reduce shuffle bytes= 354112 12 / 09 / 13 21 : 28 : 15 INFO mapred.JobClient: Reduce output records= 7657 12 / 09 / 13 21 : 28 : 15 INFO mapred.JobClient: Spilled Records= 60704 12 / 09 / 13 21 : 28 : 15 INFO mapred.JobClient: Map output bytes= 293402 |
В варианте сокращения Mapper 2:
1
2
3
4
|
12 / 09 / 13 21 : 30 : 49 INFO mapred.JobClient: Reduce shuffle bytes= 105885 12 / 09 / 13 21 : 30 : 49 INFO mapred.JobClient: Reduce output records= 7657 12 / 09 / 13 21 : 30 : 49 INFO mapred.JobClient: Spilled Records= 15314 12 / 09 / 13 21 : 30 : 49 INFO mapred.JobClient: Map output bytes= 90565 |
Опция Combiner:
1
2
3
4
5
6
|
12 / 09 / 13 21 : 22 : 18 INFO mapred.JobClient: Reduce shuffle bytes= 105885 12 / 09 / 13 21 : 22 : 18 INFO mapred.JobClient: Reduce output records= 7657 12 / 09 / 13 21 : 22 : 18 INFO mapred.JobClient: Spilled Records= 15314 12 / 09 / 13 21 : 22 : 18 INFO mapred.JobClient: Map output bytes= 302886 12 / 09 / 13 21 : 22 : 18 INFO mapred.JobClient: Combine input records= 31559 12 / 09 / 13 21 : 22 : 18 INFO mapred.JobClient: Combine output records= 7657 |
Как и ожидалось, Mapper, который не объединял, имел худшие результаты, за которым следовал первый вариант объединения в Mapper (хотя эти результаты можно было бы улучшить, если бы данные были очищены перед выполнением подсчета слов). Второй вариант объединения в картографе и функция объединителя дали практически идентичные результаты. Существенным фактом является то, что оба производят на 2/3 меньше сокращенных байтов в случайном порядке, как первые два варианта. Сокращение количества байтов, отправляемых по сети редукторам на эту сумму, несомненно, положительно скажется на эффективности задания MapReduce. Здесь нужно помнить об одном: объединение Combiners / In-Mapper может использоваться не только во всех заданиях MapReduce, в этом случае подсчет слов очень хорошо подходит для такого улучшения, но это не всегда может быть правдой. ,
Вывод
Как вы видите, преимущества использования объединения в картографическом режиме или функции объединителя Hadoop требуют серьезного рассмотрения при повышении производительности ваших заданий MapReduce. Что касается того, какой подход, то вам решать, какие компромиссы для каждого подхода.
Ссылки по теме
- Интенсивная обработка данных с MapReduce Джимми Лином и Крисом Дайером
- Hadoop: полное руководство Тома Уайта
- Исходный код из блога
- MRUnit для модульного тестирования Apache Hadoop map уменьшить количество рабочих мест
- Project Gutenberg — отличный источник книг в текстовом формате, отлично подходящий для локального тестирования заданий Hadoop.
Приятного кодирования и не забудьте поделиться!
Ссылка: Работа с интенсивной обработкой данных с помощью MapReduce от нашего партнера по JCG Билла Бекака в блоге Randomечни о кодировании .