Apache Storm — это свободно распространяемая система вычислений в реальном времени с открытым исходным кодом, работающая на JVM. Для начала мы приведем очень простой пример. Ранее мы реализовали задание подсчета слов при помощи scala и загрузили его в hdinsight .
Мы сосредоточимся на той же концепции подсчета слов, но для случаев в реальном времени, и реализуем топологию подсчета слов с использованием Apache Storm. Наш исходный код будет основан на официальных примерах штормов.
Шторм работает с носиками и болтами.
Сначала мы реализуем носик, который будет генерировать поддельные события данных. В нашем случае предложения.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package com.gkatzioura.scala.stormimport org.apache.storm.spout.SpoutOutputCollectorimport org.apache.storm.task.TopologyContextimport org.apache.storm.topology.OutputFieldsDeclarerimport org.apache.storm.topology.base.BaseRichSpoutimport org.apache.storm.tuple.{Fields, Values}import org.apache.storm.utils.Utilsimport scala.util.Random/** * Created by gkatzioura on 2/17/17. */class RandomSentenceSpout extends BaseRichSpout { var _collector:SpoutOutputCollector = _ var _rand:Random = _ override def nextTuple(): Unit = { Utils.sleep(100) val sentences = Array("the cow jumped over the moon","an apple a day keeps the doctor away", "four score and seven years ago","snow white and the seven dwarfs","i am at two with nature") val sentence = sentences(_rand.nextInt(sentences.length)) _collector.emit(new Values(sentence)) } override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = { _collector = collector _rand = Random } override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { declarer.declare(new Fields("word")) }} |
Следующим шагом является реализация болта, который разбивает предложения и выдает их.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.gkatzioura.scala.stormimport java.text.BreakIteratorimport org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}import org.apache.storm.topology.base.BaseBasicBoltimport org.apache.storm.tuple.{Fields, Tuple, Values}/** * Created by gkatzioura on 2/18/17. */class SplitSentenceBolt extends BaseBasicBolt { override def execute(input: Tuple, collector: BasicOutputCollector): Unit = { val sentence = input.getString(0) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first var end:Int = start while(end!=BreakIterator.DONE) { end = boundary.next val word = sentence.substring(start,end).replaceAll("\\s+","") start = end if(!word.equals("")) { collector.emit(new Values(word)) } } } override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { declarer.declare(new Fields("word")) }} |
И последний шаг — болт подсчета слов.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.gkatzioura.scala.stormimport org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}import org.apache.storm.topology.base.BaseBasicBoltimport org.apache.storm.tuple.{Fields, Tuple, Values}/** * Created by gkatzioura on 2/18/17. */class WordCountBolt extends BaseBasicBolt{ val counts = scala.collection.mutable.Map[String,Int]() override def execute(input: Tuple, collector: BasicOutputCollector): Unit = { val word = input.getString(0) val optCount = counts.get(word) if(optCount.isEmpty) { counts.put(word,1) } else { counts.put(word,optCount.get+1) } collector.emit(new Values(word,counts)) } override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { declarer.declare(new Fields("word","count")); }} |
Последний шаг — создание нашей топологии, которая заботится о том, работаем ли мы локально или в кластерной среде.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package com.gkatzioura.scala.stormimport org.apache.storm.{Config, LocalCluster, StormSubmitter}import org.apache.storm.topology.TopologyBuilderimport org.apache.storm.tuple.Fields/** * Created by gkatzioura on 2/18/17. */object WordCountTopology { def main(args: Array[String]): Unit = { println("Hello, world!") val builder = new TopologyBuilder builder.setSpout("spout", new RandomSentenceSpout, 5) builder.setBolt("split", new SplitSentenceBolt, 8).shuffleGrouping("spout") builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split", new Fields("word")) val conf = new Config() conf.setDebug(true) if (args != null && args.length > 0) { conf.setNumWorkers(3) StormSubmitter.submitTopology(args(0), conf, builder.createTopology()) } else { conf.setMaxTaskParallelism(3) val cluster = new LocalCluster cluster.submitTopology("word-count", conf, builder.createTopology()) Thread.sleep(10000) cluster.shutdown() } }} |
Теперь мы будем строить наше приложение. Для этого нам нужно включить плагин сборки в наш файл plugins.sbt.
|
1
|
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") |
Наш файл SBT выглядит следующим образом
|
1
2
3
4
5
6
7
8
9
|
name := "ScalaStorm"version := "1.0"scalaVersion := "2.12.1"scalacOptions += "-Yresolve-term-conflict:package"libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided" |
И тогда мы выпускаем сборку
|
1
|
sbt clean compile assembly |
Вы можете найти исходный код на github .
В следующем посте мы развернем наше приложение Storm для HDInsight.
| Ссылка: | WordCount с Storm и Scala от нашего партнера по JCG Эммануила Гкатзиураса в блоге gkatzioura . |