Hadoop — отличная технология, созданная на основе Java.
Сегодня мы будем использовать Scala для реализации простой задачи по сокращению карты, а затем запустим ее с помощью HDInsight. Мы добавим сборочный плагин в нашу сборку.
01
02
03
04
05
06
07
08
09
10
11
12
13
|
addSbtPlugin( "com.eed3si9n" % "sbt-assembly" % "0.14.3" ) Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors. assemblyMergeStrategy in assembly := { case PathList( "META-INF" , xs @ _ *) = > MergeStrategy.discard case x = > MergeStrategy.first } libraryDependencies + = "org.apache.hadoop" % "hadoop-core" % "1.2.1" |
Мы будем использовать WordCount в качестве примера. Исходный класс Java должен быть преобразован в класс Scala.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package com.gkatzioura.scala import java.lang.Iterable import java.util.StringTokenizer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} import scala.collection.JavaConverters. _ /** * Created by gkatzioura on 2/14/17. */ package object WordCount { class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] { val one = new IntWritable( 1 ) val word = new Text() override def map(key : Object, value : Text, context : Mapper[Object, Text, Text, IntWritable] # Context) : Unit = { val itr = new StringTokenizer(value.toString) while (itr.hasMoreTokens()) { word.set(itr.nextToken()) context.write(word, one) } } } class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] { override def reduce(key : Text, values : Iterable[IntWritable], context : Reducer[Text, IntWritable, Text, IntWritable] # Context) : Unit = { var sum = values.asScala.foldLeft( 0 )( _ + _ .get) context.write(key, new IntWritable(sum)) } } def main(args : Array[String]) : Unit = { val configuration = new Configuration val job = Job.getInstance(configuration, "word count" ) job.setJarByClass( this .getClass) job.setMapperClass(classOf[TokenizerMapper]) job.setCombinerClass(classOf[IntSumReader]) job.setReducerClass(classOf[IntSumReader]) job.setOutputKeyClass(classOf[Text]) job.setOutputKeyClass(classOf[Text]); job.setOutputValueClass(classOf[IntWritable]); FileInputFormat.addInputPath(job, new Path(args( 0 ))) FileOutputFormat.setOutputPath(job, new Path(args( 1 ))) System.exit( if (job.waitForCompletion( true )) 0 else 1 ) } } |
Тогда мы будем строить наш пример
1
|
sbt clean compile assembly |
Наш новый jar будет находиться в target / scala-2.12 / ScalaHadoop-assembly-1.0.jar. В следующем посте мы запустим наш код с помощью Azure HDInsight .
Вы можете найти код на GitHub .
Ссылка: | WordCount о Hadoop со Scala от нашего партнера по JCG Эммануила Гкациоураса в блоге gkatzioura . |