Прошло много времени с тех пор, как я последний раз писал, так как я был занят некоторыми из курсов, предлагаемых
Coursera . Есть несколько очень интересных предложений, которые стоит посмотреть. Некоторое время назад я приобрел
Data-Intensive Processing с MapReduce от Jimmy Lin и Chris Dyer. В книге представлено несколько ключевых алгоритмов MapReduce, но в формате псевдокода. Моя цель состоит в том, чтобы взять алгоритмы, представленные в главах 3-6, и реализовать их в Hadoop, используя
Hadoop: полное руководство Тома Уайта в качестве справочного материала. Я собираюсь предположить, что вы знакомы с Hadoop и MapReduce, а не охватывать вводный материал. Итак, давайте перейдем к главе 3 — Разработка алгоритма MapReduce, начиная с локальной агрегации.
Местная агрегация
На очень высоком уровне, когда Mappers генерируют данные, промежуточные результаты записываются на диск, а затем отправляются по сети в редукторы для окончательной обработки. Задержка записи на диск и последующей передачи данных по сети — дорогостоящая операция при обработке задания MapReduce. Поэтому вполне понятно, что, когда это возможно, уменьшение объема данных, отправляемых из картографических систем, увеличило бы скорость задания MapReduce. Локальная агрегация — это метод, используемый для уменьшения объема данных и повышения эффективности нашей работы MapReduce. Локальная агрегация не может заменить редукторы, так как нам нужен способ сбора результатов с одним и тем же ключом из разных картографов. Мы рассмотрим 3 способа достижения локальной агрегации:
- Использование функций Hadoop Combiner.
- Два подхода к комбинированию «in-mapper» представлены в книге «Обработка текста с MapReduce».
Конечно, любая оптимизация будет иметь компромиссы, и мы также обсудим их.
Чтобы продемонстрировать локальную агрегацию, мы запустим вездесущее задание подсчета слов в простой текстовой версии A Christmas Carol Чарльза Диккенса (загруженной из Project Gutenberg ) в псевдораспределенном кластере, установленном на моем MacBookPro, с использованием hadoop-0.20.2-cdh3u3 раздача от Cloudera . Я планирую в будущем посте провести такой же эксперимент на кластере EC2 с более реалистичными данными.
Сумматоры
Функция объединения — это объект, который расширяет класс Reducer. Фактически, для наших примеров здесь мы собираемся повторно использовать тот же редуктор, который использовался в задании подсчета слов. Функция объединения указывается при настройке задания MapReduce следующим образом:
job.setReducerClass(TokenCountReducer.class);
Вот код редуктора:
public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count+= value.get(); } context.write(key,new IntWritable(count)); } }
Задача объединителя состоит в том, чтобы сделать именно то, что подразумевает название, объединенные данные с чистым результатом меньшего количества данных начинают перетасовываться по сети, что дает нам повышение эффективности. Как указывалось ранее, имейте в виду, что редукторы по-прежнему должны объединять результаты с использованием одних и тех же ключей от разных картографов. Поскольку функции объединителя являются оптимизацией, среда Hadoop не дает никаких гарантий относительно того, сколько раз будет вызываться объединитель, если вообще будет.
Вариант объединения картографов 1
Первая альтернатива использованию Combiners (рис. 3.2 на стр. 41) очень прямолинейна и вносит небольшую модификацию в наш оригинальный механизм подсчета слов:
public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { IntWritable writableCount = new IntWritable(); Text text = new Text(); Map<String,Integer> tokenMap = new HashMap<String, Integer>(); StringTokenizer tokenizer = new StringTokenizer(value.toString()); while(tokenizer.hasMoreElements()){ String token = tokenizer.nextToken(); Integer count = tokenMap.get(token); if(count == null) count = new Integer(0); count+=1; tokenMap.put(token,count); } Set<String> keys = tokenMap.keySet(); for (String s : keys) { text.set(s); writableCount.set(tokenMap.get(s)); context.write(text,writableCount); } } }
Как мы видим здесь, вместо испускания слова со счетом 1 для каждого встреченного слова мы используем карту для отслеживания каждого слова, уже обработанного. Затем, когда все токены обработаны, мы перебираем карту и выводим общее количество для каждого слова, встречающегося в этой строке.
В Mapper Combining Option 2
Второй вариант комбинирования картографов (рисунок 3.3 на стр. 41) очень похож на приведенный выше пример с двумя отличиями — когда создается хэш-карта и когда мы генерируем результаты, содержащиеся в карте. В приведенном выше примере создается карта, и ее содержимое сбрасывается по сети для каждого вызова метода карты. В этом примере мы собираемся сделать карту переменной экземпляра и перенести создание экземпляра карты на метод setUp в нашем преобразователе. Аналогично, содержимое карты не будет отправлено редукторам до тех пор, пока не будут завершены все вызовы mapper и не будет вызван метод cleanUp.
public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> { private Map<String,Integer> tokenMap; @Override protected void setup(Context context) throws IOException, InterruptedException { tokenMap = new HashMap<String, Integer>(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while(tokenizer.hasMoreElements()){ String token = tokenizer.nextToken(); Integer count = tokenMap.get(token); if(count == null) count = new Integer(0); count+=1; tokenMap.put(token,count); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { IntWritable writableCount = new IntWritable(); Text text = new Text(); Set<String> keys = tokenMap.keySet(); for (String s : keys) { text.set(s); writableCount.set(tokenMap.get(s)); context.write(text,writableCount); } } }
Как видно из приведенного выше примера кода, картограф отслеживает уникальное количество слов во всех вызовах метода map. Отслеживая уникальные токены и их количество, необходимо значительно сократить количество записей, отправляемых редукторам, что, в свою очередь, должно сократить время выполнения задания MapReduce. Это выполняет тот же эффект, что и использование функции функции объединения, предоставляемой платформой MapReduce, но в этом случае вы гарантированно вызовете код объединения. Но есть некоторые предостережения с этим подходом. Сохранение состояния при вызовах карты может оказаться проблематичным и определенно является нарушением функционального духа функции «карты». Кроме того, поддерживая состояние всех картографов, в зависимости от данных, используемых в задании, память может стать еще одной проблемой, с которой приходится сталкиваться. В конечном счете,нужно было бы взвесить все компромиссы, чтобы определить лучший подход.
Полученные результаты
Теперь давайте посмотрим на некоторые результаты различных картографов. Поскольку задание выполнялось в псевдораспределенном режиме, фактическое время выполнения не имеет значения, но мы все же можем сделать вывод о том, как использование локальной агрегации может повлиять на эффективность задания MapReduce в реальном кластере.
В Token Mapper:
12/09/13 21:25:32 INFO mapred.JobClient: Reduce shuffle bytes=366010 12/09/13 21:25:32 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:25:32 INFO mapred.JobClient: Spilled Records=63118 12/09/13 21:25:32 INFO mapred.JobClient: Map output bytes=302886
В варианте сокращения Mapper 1:
12/09/13 21:28:15 INFO mapred.JobClient: Reduce shuffle bytes=354112 12/09/13 21:28:15 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:28:15 INFO mapred.JobClient: Spilled Records=60704 12/09/13 21:28:15 INFO mapred.JobClient: Map output bytes=293402
В варианте сокращения Mapper 2:
12/09/13 21:30:49 INFO mapred.JobClient: Reduce shuffle bytes=105885 12/09/13 21:30:49 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:30:49 INFO mapred.JobClient: Spilled Records=15314 12/09/13 21:30:49 INFO mapred.JobClient: Map output bytes=90565
Опция Combiner:
12/09/13 21:22:18 INFO mapred.JobClient: Reduce shuffle bytes=105885 12/09/13 21:22:18 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:22:18 INFO mapred.JobClient: Spilled Records=15314 12/09/13 21:22:18 INFO mapred.JobClient: Map output bytes=302886 12/09/13 21:22:18 INFO mapred.JobClient: Combine input records=31559 12/09/13 21:22:18 INFO mapred.JobClient: Combine output records=7657
Как и ожидалось, Mapper, который не объединял, имел худшие результаты, за которым следовал первый вариант объединения в Mapper (хотя эти результаты можно было бы улучшить, если бы данные были очищены перед выполнением подсчета слов). Второй вариант объединения в картографе и функция объединителя дали практически идентичные результаты. Существенным фактом является то, что оба произвели на 2/3 меньшеуменьшить количество случайных байтов в качестве первых двух вариантов. Сокращение количества байтов, отправляемых по сети редукторам на эту сумму, несомненно, положительно скажется на эффективности задания MapReduce. Здесь нужно помнить об одном: объединение Combiners / In-Mapper может использоваться не только во всех заданиях MapReduce, в этом случае подсчет слов очень хорошо подходит для такого улучшения, но это не всегда может быть правдой. ,
Вывод
Как вы видите, преимущества использования объединения в картографическом режиме или функции объединителя Hadoop требуют серьезного рассмотрения при повышении производительности ваших заданий MapReduce. Что касается того, какой подход, то вам решать, какие компромиссы для каждого подхода.
Ресурсы
- Интенсивная обработка данных с MapReduce Джимми Лином и Крисом Дайером
- Hadoop: полное руководство Тома Уайта
- Исходный код из блога
- MRUnit для модульного тестирования Apache Hadoop map уменьшить количество рабочих мест
- Project Gutenberg — отличный источник книг в текстовом формате, отлично подходящий для локального тестирования заданий Hadoop.