Hadoop — отличная технология, созданная на основе Java.
Сегодня мы будем использовать Scala для реализации простой задачи по сокращению карты, а затем запустим ее с помощью HDInsight. Мы добавим сборочный плагин в нашу сборку.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors.assemblyMergeStrategy in assembly := { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first}libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" |
Мы будем использовать WordCount в качестве примера. Исходный класс Java должен быть преобразован в класс Scala.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package com.gkatzioura.scalaimport java.lang.Iterableimport java.util.StringTokenizerimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.Pathimport org.apache.hadoop.io.{IntWritable, Text}import org.apache.hadoop.mapreduce.lib.input.FileInputFormatimport org.apache.hadoop.mapreduce.lib.output.FileOutputFormatimport org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}import scala.collection.JavaConverters._/** * Created by gkatzioura on 2/14/17. */package object WordCount { class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] { val one = new IntWritable(1) val word = new Text() override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = { val itr = new StringTokenizer(value.toString) while (itr.hasMoreTokens()) { word.set(itr.nextToken()) context.write(word, one) } } } class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] { override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = { var sum = values.asScala.foldLeft(0)(_ + _.get) context.write(key, new IntWritable(sum)) } } def main(args: Array[String]): Unit = { val configuration = new Configuration val job = Job.getInstance(configuration,"word count") job.setJarByClass(this.getClass) job.setMapperClass(classOf[TokenizerMapper]) job.setCombinerClass(classOf[IntSumReader]) job.setReducerClass(classOf[IntSumReader]) job.setOutputKeyClass(classOf[Text]) job.setOutputKeyClass(classOf[Text]); job.setOutputValueClass(classOf[IntWritable]); FileInputFormat.addInputPath(job, new Path(args(0))) FileOutputFormat.setOutputPath(job, new Path(args(1))) System.exit(if(job.waitForCompletion(true)) 0 else 1) }} |
Тогда мы будем строить наш пример
|
1
|
sbt clean compile assembly |
Наш новый jar будет находиться в target / scala-2.12 / ScalaHadoop-assembly-1.0.jar. В следующем посте мы запустим наш код с помощью Azure HDInsight .
Вы можете найти код на GitHub .
| Ссылка: | WordCount о Hadoop со Scala от нашего партнера по JCG Эммануила Гкациоураса в блоге gkatzioura . |