Apache Storm — это свободно распространяемая система вычислений в реальном времени с открытым исходным кодом, работающая на JVM. Для начала мы приведем очень простой пример. Ранее мы реализовали задание подсчета слов при помощи scala и загрузили его в hdinsight .
Мы сосредоточимся на той же концепции подсчета слов, но для случаев в реальном времени, и реализуем топологию подсчета слов с использованием Apache Storm. Наш исходный код будет основан на официальных примерах штормов.
Шторм работает с носиками и болтами.
Сначала мы реализуем носик, который будет генерировать поддельные события данных. В нашем случае предложения.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package com.gkatzioura.scala.storm import org.apache.storm.spout.SpoutOutputCollector import org.apache.storm.task.TopologyContext import org.apache.storm.topology.OutputFieldsDeclarer import org.apache.storm.topology.base.BaseRichSpout import org.apache.storm.tuple.{Fields, Values} import org.apache.storm.utils.Utils import scala.util.Random /** * Created by gkatzioura on 2/17/17. */ class RandomSentenceSpout extends BaseRichSpout { var _ collector : SpoutOutputCollector = _ var _ rand : Random = _ override def nextTuple() : Unit = { Utils.sleep( 100 ) val sentences = Array( "the cow jumped over the moon" , "an apple a day keeps the doctor away" , "four score and seven years ago" , "snow white and the seven dwarfs" , "i am at two with nature" ) val sentence = sentences( _ rand.nextInt(sentences.length)) _ collector.emit( new Values(sentence)) } override def open(conf : java.util.Map[ _ , _ ], context : TopologyContext, collector : SpoutOutputCollector) : Unit = { _ collector = collector _ rand = Random } override def declareOutputFields(declarer : OutputFieldsDeclarer) : Unit = { declarer.declare( new Fields( "word" )) } } |
Следующим шагом является реализация болта, который разбивает предложения и выдает их.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.gkatzioura.scala.storm import java.text.BreakIterator import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} import org.apache.storm.topology.base.BaseBasicBolt import org.apache.storm.tuple.{Fields, Tuple, Values} /** * Created by gkatzioura on 2/18/17. */ class SplitSentenceBolt extends BaseBasicBolt { override def execute(input : Tuple, collector : BasicOutputCollector) : Unit = { val sentence = input.getString( 0 ) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first var end : Int = start while (end! = BreakIterator.DONE) { end = boundary.next val word = sentence.substring(start,end).replaceAll( "\\s+" , "" ) start = end if (!word.equals( "" )) { collector.emit( new Values(word)) } } } override def declareOutputFields(declarer : OutputFieldsDeclarer) : Unit = { declarer.declare( new Fields( "word" )) } } |
И последний шаг — болт подсчета слов.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.gkatzioura.scala.storm import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} import org.apache.storm.topology.base.BaseBasicBolt import org.apache.storm.tuple.{Fields, Tuple, Values} /** * Created by gkatzioura on 2/18/17. */ class WordCountBolt extends BaseBasicBolt{ val counts = scala.collection.mutable.Map[String,Int]() override def execute(input : Tuple, collector : BasicOutputCollector) : Unit = { val word = input.getString( 0 ) val optCount = counts.get(word) if (optCount.isEmpty) { counts.put(word, 1 ) } else { counts.put(word,optCount.get+ 1 ) } collector.emit( new Values(word,counts)) } override def declareOutputFields(declarer : OutputFieldsDeclarer) : Unit = { declarer.declare( new Fields( "word" , "count" )); } } |
Последний шаг — создание нашей топологии, которая заботится о том, работаем ли мы локально или в кластерной среде.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package com.gkatzioura.scala.storm import org.apache.storm.{Config, LocalCluster, StormSubmitter} import org.apache.storm.topology.TopologyBuilder import org.apache.storm.tuple.Fields /** * Created by gkatzioura on 2/18/17. */ object WordCountTopology { def main(args : Array[String]) : Unit = { println( "Hello, world!" ) val builder = new TopologyBuilder builder.setSpout( "spout" , new RandomSentenceSpout, 5 ) builder.setBolt( "split" , new SplitSentenceBolt, 8 ).shuffleGrouping( "spout" ) builder.setBolt( "count" , new WordCountBolt, 12 ).fieldsGrouping( "split" , new Fields( "word" )) val conf = new Config() conf.setDebug( true ) if (args ! = null && args.length > 0 ) { conf.setNumWorkers( 3 ) StormSubmitter.submitTopology(args( 0 ), conf, builder.createTopology()) } else { conf.setMaxTaskParallelism( 3 ) val cluster = new LocalCluster cluster.submitTopology( "word-count" , conf, builder.createTopology()) Thread.sleep( 10000 ) cluster.shutdown() } } } |
Теперь мы будем строить наше приложение. Для этого нам нужно включить плагин сборки в наш файл plugins.sbt.
1
|
addSbtPlugin( "com.eed3si9n" % "sbt-assembly" % "0.14.3" ) |
Наш файл SBT выглядит следующим образом
1
2
3
4
5
6
7
8
9
|
name := "ScalaStorm" version := "1.0" scalaVersion := "2.12.1" scalacOptions + = "-Yresolve-term-conflict:package" libraryDependencies + = "org.apache.storm" % "storm-core" % "1.0.2" % "provided" |
И тогда мы выпускаем сборку
1
|
sbt clean compile assembly |
Вы можете найти исходный код на github .
В следующем посте мы развернем наше приложение Storm для HDInsight.
Ссылка: | WordCount с Storm и Scala от нашего партнера по JCG Эммануила Гкатзиураса в блоге gkatzioura . |