Статьи

WordCount со Штормом и Скалой

Apache Storm — это свободно распространяемая система вычислений в реальном времени с открытым исходным кодом, работающая на JVM. Для начала мы приведем очень простой пример. Ранее мы реализовали задание подсчета слов при помощи scala и загрузили его в hdinsight .
Мы сосредоточимся на той же концепции подсчета слов, но для случаев в реальном времени, и реализуем топологию подсчета слов с использованием Apache Storm. Наш исходный код будет основан на официальных примерах штормов.

Шторм работает с носиками и болтами.

Сначала мы реализуем носик, который будет генерировать поддельные события данных. В нашем случае предложения.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.gkatzioura.scala.storm
 
 
import org.apache.storm.spout.SpoutOutputCollector
import org.apache.storm.task.TopologyContext
import org.apache.storm.topology.OutputFieldsDeclarer
import org.apache.storm.topology.base.BaseRichSpout
import org.apache.storm.tuple.{Fields, Values}
import org.apache.storm.utils.Utils
 
import scala.util.Random
 
/**
  * Created by gkatzioura on 2/17/17.
  */
class RandomSentenceSpout extends BaseRichSpout {
 
  var _collector:SpoutOutputCollector = _
  var _rand:Random = _
 
  override def nextTuple(): Unit = {
 
    Utils.sleep(100)
 
    val sentences = Array("the cow jumped over the moon","an apple a day keeps the doctor away",
      "four score and seven years ago","snow white and the seven dwarfs","i am at two with nature")
    val sentence = sentences(_rand.nextInt(sentences.length))
    _collector.emit(new Values(sentence))
  }
 
  override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
    _collector = collector
    _rand = Random
  }
 
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }
 
}

Следующим шагом является реализация болта, который разбивает предложения и выдает их.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.gkatzioura.scala.storm
 
import java.text.BreakIterator
 
import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}
 
/**
  * Created by gkatzioura on 2/18/17.
  */
class SplitSentenceBolt extends BaseBasicBolt {
 
  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
 
    val sentence = input.getString(0)
    val boundary = BreakIterator.getWordInstance
 
    boundary.setText(sentence)
    var start = boundary.first
    var end:Int = start
 
    while(end!=BreakIterator.DONE) {
 
      end = boundary.next
      val word = sentence.substring(start,end).replaceAll("\\s+","")
      start = end
      if(!word.equals("")) {
        collector.emit(new Values(word))
      }
    }
  }
 
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("word"))
  }
}

И последний шаг — болт подсчета слов.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.gkatzioura.scala.storm
 
import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
import org.apache.storm.topology.base.BaseBasicBolt
import org.apache.storm.tuple.{Fields, Tuple, Values}
 
/**
  * Created by gkatzioura on 2/18/17.
  */
class WordCountBolt extends BaseBasicBolt{
 
  val counts = scala.collection.mutable.Map[String,Int]()
 
  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
 
    val word = input.getString(0)
 
    val optCount = counts.get(word)
    if(optCount.isEmpty) {
      counts.put(word,1)
    } else {
      counts.put(word,optCount.get+1)
    }
 
    collector.emit(new Values(word,counts))
  }
 
  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
 
    declarer.declare(new Fields("word","count"));
  }
 
}

Последний шаг — создание нашей топологии, которая заботится о том, работаем ли мы локально или в кластерной среде.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.gkatzioura.scala.storm
 
import org.apache.storm.{Config, LocalCluster, StormSubmitter}
import org.apache.storm.topology.TopologyBuilder
import org.apache.storm.tuple.Fields
 
/**
  * Created by gkatzioura on 2/18/17.
  */
object WordCountTopology  {
 
  def main(args: Array[String]): Unit = {
    println("Hello, world!")
    val builder = new TopologyBuilder
    builder.setSpout("spout", new RandomSentenceSpout, 5)
    builder.setBolt("split", new SplitSentenceBolt, 8).shuffleGrouping("spout")
    builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split", new Fields("word"))
 
    val conf = new Config()
    conf.setDebug(true)
 
    if (args != null && args.length > 0) {
      conf.setNumWorkers(3)
      StormSubmitter.submitTopology(args(0), conf, builder.createTopology())
    }
    else {
      conf.setMaxTaskParallelism(3)
      val cluster = new LocalCluster
      cluster.submitTopology("word-count", conf, builder.createTopology())
      Thread.sleep(10000)
      cluster.shutdown()
    }
  }
}

Теперь мы будем строить наше приложение. Для этого нам нужно включить плагин сборки в наш файл plugins.sbt.

1
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Наш файл SBT выглядит следующим образом

1
2
3
4
5
6
7
8
9
name := "ScalaStorm"
 
version := "1.0"
 
scalaVersion := "2.12.1"
 
scalacOptions += "-Yresolve-term-conflict:package"
 
libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided"

И тогда мы выпускаем сборку

1
sbt clean compile assembly

Вы можете найти исходный код на github .

В следующем посте мы развернем наше приложение Storm для HDInsight.

Ссылка: WordCount с Storm и Scala от нашего партнера по JCG Эммануила Гкатзиураса в блоге gkatzioura .