В этом уроке вы научитесь использовать Hadoop и MapReduce с примером. Используемые входные данные — SalesJan2009.csv . Он содержит информацию о продажах, такую как название продукта, цена, способ оплаты, город, страна клиента и т. Д. Цель состоит в том, чтобы узнать количество продуктов, проданных в каждой стране.
В этом уроке вы узнаете
- Первая программа Hadoop MapReduce
- Объяснение класса SalesMapper
- Объяснение класса SalesCountryReducer
- Объяснение класса SalesCountryDriver
Первая программа Hadoop MapReduce
Убедитесь, что у вас установлен Hadoop. Прежде чем начать с самого процесса, измените пользователя на «hduser» (идентификатор, используемый при настройке Hadoop, вы можете переключиться на идентификатор пользователя, использованный при настройке Hadoop).
su - hduser_
Шаг 1)
Создайте новый каталог с именем MapReduceTutorial
sudo mkdir MapReduceTutorial
Дать разрешения
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class SalesMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {
String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");
output.collect(new Text(SingleCountryData[7]), one);
}
}
SalesCountryReducer.java
package SalesCountry;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
public class SalesCountryReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
Text key = t_key;
int frequencyForCountry = 0;
while (values.hasNext()) {
// replace type of value with the actual type of our value
IntWritable value = (IntWritable) values.next();
frequencyForCountry += value.get();
}
output.collect(key, new IntWritable(frequencyForCountry));
}
}
SalesCountryDriver.java
package SalesCountry;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class SalesCountryDriver {
public static void main(String[] args) {
JobClient my_client = new JobClient();
// Create a configuration object for the job
JobConf job_conf = new JobConf(SalesCountryDriver.class);
// Set a name of the Job
job_conf.setJobName("SalePerCountry");
// Specify data type of output key and value
job_conf.setOutputKeyClass(Text.class);
job_conf.setOutputValueClass(IntWritable.class);
// Specify names of Mapper and Reducer Class
job_conf.setMapperClass(SalesCountry.SalesMapper.class);
job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);
// Specify formats of the data type of Input and output
job_conf.setInputFormat(TextInputFormat.class);
job_conf.setOutputFormat(TextOutputFormat.class);
// Set input and output directories using command line arguments,
//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.
FileInputFormat.setInputPaths(job_conf, new Path(args[0]));
FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));
my_client.setConf(job_conf);
try {
// Run the job
JobClient.runJob(job_conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Проверьте права доступа ко всем этим файлам
и если разрешения на «чтение» отсутствуют, предоставьте
Шаг 2)
Экспорт classpath
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Шаг 3)
Компилировать файлы Java (эти файлы присутствуют в каталоге Final-MapReduceHandsOn ). Его файлы классов будут помещены в каталог пакета
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Это предупреждение можно смело игнорировать.
Эта компиляция создаст каталог в текущем каталоге с именем пакета, указанным в исходном файле java (т.е. в нашем случае SalesCountry ) и поместит в него все скомпилированные файлы классов.
Шаг 4)
Создать новый файл Manifest.txt
sudo gedit Manifest.txt
добавить следующие строки к нему,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver — это имя основного класса. Обратите внимание, что вы должны нажать клавишу ввода в конце этой строки.
Шаг 5)
Создать файл Jar
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Убедитесь, что файл JAR создан
Шаг 6)
Запустите Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Шаг 7)
Скопируйте файл SalesJan2009.csv в ~ / inputMapReduce
Теперь используйте команду ниже, чтобы скопировать ~ / inputMapReduce в HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Мы можем смело игнорировать это предупреждение.
Проверьте, действительно ли файл скопирован или нет.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Шаг 8)
Запустить задание MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Это создаст выходной каталог с именем mapreduce_output_sales в HDFS. Содержимое этого каталога представляет собой файл, содержащий информацию о продажах товаров в каждой стране.
Шаг 9)
Результат можно увидеть через командный интерфейс как,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Результаты также можно увидеть через веб-интерфейс, как
Откройте r в веб-браузере.
Теперь выберите «Обзор файловой системы» и перейдите к / mapreduce_output_sales
Открытая часть-r-00000
Объяснение класса SalesMapper
В этом разделе мы разберем реализацию класса SalesMapper .
1. Начнем с указания имени пакета для нашего класса. SalesCountry — это название нашего пакета. Обратите внимание, что выходные данные компиляции SalesMapper.class будут помещены в каталог с именем этого пакета: SalesCountry .
Затем мы импортируем библиотечные пакеты.
Ниже моментальных показана реализация SalesMapper class-
Пример кода Объяснение:
1. Определение класса SalesMapper
Открытый класс SalesMapper расширяет MapReduceBase реализует Mapper <LongWritable, Text, Text, IntWritable> {
Каждый класс mapper должен быть расширен из класса MapReduceBase, и он должен реализовывать интерфейс Mapper .
2. Определение функции «карта»
public void map(LongWritable key,
Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException
Основная часть класса Mapper — это метод ‘map ()’, который принимает четыре аргумента.
При каждом вызове метода map () передается пара ключ-значение ( «ключ» и «значение» в этом коде).
Метод map () начинается с разделения входного текста, полученного в качестве аргумента. Он использует токенизатор, чтобы разбить эти строки на слова.
String valueString = value.toString();
String[] SingleCountryData = valueString.split(",");
Здесь «,» используется в качестве разделителя.
После этого пара формируется с использованием записи с 7-м индексом массива «SingleCountryData» и значением «1» .
output.collect (новый текст (SingleCountryData [7]), один);
Мы выбираем запись с 7-м индексом, потому что нам нужны данные о стране, и она расположена с 7-м индексом в массиве SingleCountryData .
Обратите внимание, что наши входные данные представлены в следующем формате (где Страна находится на 7- м индексе, с 0 в качестве начального индекса) —
Дата транзакции, продукт, цена, тип платежа, имя, город, штат, страна , созданный аккаунт, последний логин, широта, долгота
Выходными данными mapper снова является пара ключ-значение, которая выводится с использованием метода collect () объекта OutputCollector .
Объяснение класса SalesCountryReducer
В этом разделе мы поймем реализацию класса SalesCountryReducer .
1. Начнем с указания имени пакета для нашего класса. SalesCountry — это название нашего пакета. Обратите внимание, что выходные данные компиляции SalesCountryReducer.class будут помещены в каталог с именем этого пакета: SalesCountry .
Затем мы импортируем библиотечные пакеты.
Ниже моментальных показана реализация SalesCountryReducer class-
Объяснение кода:
1. Определение класса SalesCountryReducer
открытый класс SalesCountryReducer расширяет MapReduceBase реализует Reducer <Text, IntWritable, Text, IntWritable> {
Здесь первые два типа данных, «Текст» и «IntWritable», являются типом данных ввода значения ключа для редуктора.
Выходные данные маппера представлены в виде <CountryName1, 1>, <CountryName2, 1>. Этот выход преобразователя становится входом для редуктора. Таким образом, чтобы выровнять его тип данных, Text и IntWritable используются здесь как тип данных.
Последние два типа данных, «Текст» и «IntWritable», являются типом данных, сгенерированных редуктором в виде пары ключ-значение.
Каждый класс редуктора должен быть расширен из класса MapReduceBase, и он должен реализовывать интерфейс редуктора .
2. Определение функции уменьшения
public void reduce( Text t_key,
Iterator<IntWritable> values,
OutputCollector<Text,IntWritable> output,
Reporter reporter) throws IOException {
Входные данные для метода redu () — это ключ со списком нескольких значений.
Например, в нашем случае это будет
<Объединенные Арабские Эмираты, 1>, <Объединенные Арабские Эмираты, 1>, <Объединенные Арабские Эмираты, 1>, <Объединенные Арабские Эмираты, 1>, <Объединенные Арабские Эмираты, 1>, <Объединенные Арабские Эмираты, 1>.
Это дается редуктору как <Объединенные Арабские Эмираты, {1,1,1,1,1,1}>
Таким образом, чтобы принять аргументы этой формы, используются первые два типа данных, а именно: Text и Iterator <IntWritable> . Текст — это тип данных ключа, а Iterator <IntWritable> — это тип данных для списка значений для этого ключа.
Следующий аргумент имеет тип OutputCollector <Text, IntWritable>, который собирает выходные данные фазы редуктора.
Метод Redu () начинается с копирования значения ключа и инициализации счетчика частоты до 0.
Текстовый ключ = t_key;
int частотаForCountry = 0;
Затем, используя « а» цикл, мы перебираем список значений , связанных с ключом и рассчитать конечную частоту путем суммирования всех значений.
while (values.hasNext()) {
// replace type of value with the actual type of our value
IntWritable value = (IntWritable) values.next();
frequencyForCountry += value.get();
}
Теперь мы передаем результат на выходной коллектор в виде ключа и полученного подсчета частоты .
Ниже код делает это
output.collect(key, new IntWritable(frequencyForCountry));
Объяснение класса SalesCountryDriver
В этом разделе мы поймем реализацию класса SalesCountryDriver
1. Начнем с указания имени пакета для нашего класса. SalesCountry — это название нашего пакета. Обратите внимание, что выходные данные компиляции SalesCountryDriver.class будут помещены в каталог с именем этого пакета: SalesCountry .
Вот строка, указывающая имя пакета, за которым следует код для импорта библиотечных пакетов.
2. Определите класс драйвера, который создаст новое клиентское задание, объект конфигурации и объявит классы Mapper и Reducer.
Класс драйвера отвечает за настройку задания MapReduce для запуска в Hadoop. В этом классе мы указываем имя задания, тип данных ввода / вывода и имена классов мапперов и редукторов .
3. В приведенном ниже фрагменте кода мы устанавливаем входные и выходные каталоги, которые используются для использования входного набора данных и вывода результатов соответственно.
arg [0] и arg [1] являются аргументами командной строки, передаваемыми командой, заданной в MapReduce, т. е.
$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales
4. Запустите нашу работу
Ниже кода запускается выполнение задания MapReduce
try {
// Run the job
JobClient.runJob(job_conf);
} catch (Exception e) {
e.printStackTrace();
}























