Статьи

WordCount на Hadoop со Scala

Hadoop – отличная технология, созданная на основе Java.

Сегодня мы будем использовать Scala для реализации простой задачи по сокращению карты, а затем запустим ее с помощью HDInsight. Мы добавим сборочный плагин в нашу сборку.

01
02
03
04
05
06
07
08
09
10
11
12
13
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
 
 
Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors.
 
 
 
assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}
 
libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"

Мы будем использовать WordCount в качестве примера. Исходный класс Java должен быть преобразован в класс Scala.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.gkatzioura.scala
 
import java.lang.Iterable
import java.util.StringTokenizer
 
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.JavaConverters._
 
/**
  * Created by gkatzioura on 2/14/17.
  */
package object WordCount {
 
  class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {
 
    val one = new IntWritable(1)
    val word = new Text()
 
    override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = {
      val itr = new StringTokenizer(value.toString)
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken())
        context.write(word, one)
      }
    }
  }
 
  class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] {
    override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {
      var sum = values.asScala.foldLeft(0)(_ + _.get)
      context.write(key, new IntWritable(sum))
    }
  }
 
 
  def main(args: Array[String]): Unit = {
    val configuration = new Configuration
    val job = Job.getInstance(configuration,"word count")
    job.setJarByClass(this.getClass)
    job.setMapperClass(classOf[TokenizerMapper])
    job.setCombinerClass(classOf[IntSumReader])
    job.setReducerClass(classOf[IntSumReader])
    job.setOutputKeyClass(classOf[Text])
    job.setOutputKeyClass(classOf[Text]);
    job.setOutputValueClass(classOf[IntWritable]);
    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path(args(1)))
    System.exit(if(job.waitForCompletion(true))  0 else 1)
  }
 
}

Тогда мы будем строить наш пример

1
sbt clean compile assembly

Наш новый jar будет находиться в target / scala-2.12 / ScalaHadoop-assembly-1.0.jar. В следующем посте мы запустим наш код с помощью Azure HDInsight .

Вы можете найти код на GitHub .

Ссылка: WordCount о Hadoop со Scala от нашего партнера по JCG Эммануила Гкациоураса в блоге gkatzioura .