Flux в мини-среде, которая может помочь нам определить и развернуть топологию Storm .
Flux имеет различные оболочки, которые помогают вам определить требуемый поток (ы) и инициализировать ваши болты и носики (используя конструктор с аргументами или без них и автоматически вызывать пользовательские методы конфигурации через отражение).
Вам нужно только использовать Flux, чтобы добавить его в качестве зависимости в ваш «pom.xml», сконфигурировать его через один файл YAML (посмотрите примеры потоков ), а затем использовать его в качестве основного класса для развертывания вашей топологии в кластере Storm (или как локальный тест).
Для инициализации KafkaBolt необходимы следующие шаги:
- Определите «topicSelector» с помощью метода « withTopicSelector »
- Определите «kafkaMapper» с помощью метода « withTupleToKafkaMapper »
- Определите «kafkaProducerProps» с помощью метода « withProducerProperties »
- Инициализируйте « org.apache.storm.kafka.bolt.KafkaBolt » с вышеуказанной конфигурацией
- Включить выше KafkaBolt как часть потока
Пример конфигурации минимального потока для KafkaBolt :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
components: - id: "stringScheme" className: "org.apache.storm.kafka.StringScheme" - id: "stringMultiScheme" className: "org.apache.storm.spout.SchemeAsMultiScheme" constructorArgs: - ref: "stringScheme" - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "localhost:2181" - id: "topicSelector" className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector" constructorArgs: - "myTopicName" - id: "kafkaMapper" className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper" - id: "kafkaProducerProps" className: "java.util.Properties" configMethods: - name: "put" args: - "bootstrap.servers" - "localhost:9092" - name: "put" args: - "acks" - "1" - name: "put" args: - "key.serializer" - "org.apache.kafka.common.serialization.StringSerializer" - name: "put" args: - "value.serializer" - "org.apache.kafka.common.serialization.StringSerializer" bolts: - id: "bolt-kafka" className: "org.apache.storm.kafka.bolt.KafkaBolt" parallelism: 1 configMethods: - name: "withProducerProperties" args: [ref: "kafkaProducerProps" ] - name: "withTopicSelector" args: [ref: "topicSelector" ] - name: "withTupleToKafkaMapper" args: [ref: "kafkaMapper" ] streams: - name: "spout --> kafkaBolt" from: "spout-1" to: "bolt-kafka" grouping: type: LOCAL_OR_SHUFFLE |
Для полного рабочего примера конфигурации проверьте это , которое может использоваться следующим образом .
Пример команды для развертывания вашей топологии в Storm:
1
|
storm jar target/sentiment-analysis-storm- 0.0 . 1 -SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host= 192.168 . 1.200 src/test/resources/flux/topology_kafka.yaml |
Конфигурация потока для KafkaSpout уже описана как официальный пример потока. Flux — действительно полезная среда, которая устраняет необходимость в специальном коде для определения и инициализации топологии.
Ссылка: | Apache Storm: Как настроить KafkaBolt с Flux от нашего партнера по JCG Адрианоса Дадиса в Java, Интеграция и достоинства исходного блога. |