Статьи

Apache Storm: Как настроить KafkaBolt с Flux

Flux в мини-среде, которая может помочь нам определить и развернуть топологию Storm .

Flux имеет различные оболочки, которые помогают вам определить требуемый поток (ы) и инициализировать ваши болты и носики (используя конструктор с аргументами или без них и автоматически вызывать пользовательские методы конфигурации через отражение).

Вам нужно только использовать Flux, чтобы добавить его в качестве зависимости в ваш «pom.xml», сконфигурировать его через один файл YAML (посмотрите примеры потоков ), а затем использовать его в качестве основного класса для развертывания вашей топологии в кластере Storm (или как локальный тест).

Для инициализации KafkaBolt необходимы следующие шаги:

  1. Определите «topicSelector» с помощью метода « withTopicSelector »
  2. Определите «kafkaMapper» с помощью метода « withTupleToKafkaMapper »
  3. Определите «kafkaProducerProps» с помощью метода « withProducerProperties »
  4. Инициализируйте « org.apache.storm.kafka.bolt.KafkaBolt » с вышеуказанной конфигурацией
  5. Включить выше KafkaBolt как часть потока

Пример конфигурации минимального потока для KafkaBolt :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
components:
  - id: "stringScheme"
    className: "org.apache.storm.kafka.StringScheme"
 
  - id: "stringMultiScheme"
    className: "org.apache.storm.spout.SchemeAsMultiScheme"
    constructorArgs:
      - ref: "stringScheme"
 
  - id: "zkHosts"
    className: "org.apache.storm.kafka.ZkHosts"
    constructorArgs:
      - "localhost:2181"
 
  - id: "topicSelector"
    className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector"
    constructorArgs:
      - "myTopicName"
 
  - id: "kafkaMapper"
    className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"
 
  - id: "kafkaProducerProps"
    className: "java.util.Properties"
    configMethods:
      - name: "put"
        args:
          - "bootstrap.servers"
          - "localhost:9092"
      - name: "put"
        args:
          - "acks"
          - "1"
      - name: "put"
        args:
          - "key.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
      - name: "put"
        args:
          - "value.serializer"
          - "org.apache.kafka.common.serialization.StringSerializer"
 
bolts:   
  - id: "bolt-kafka"
    className: "org.apache.storm.kafka.bolt.KafkaBolt"
    parallelism: 1
    configMethods:
      - name: "withProducerProperties"
        args: [ref: "kafkaProducerProps"]
      - name: "withTopicSelector"
        args: [ref: "topicSelector"]
      - name: "withTupleToKafkaMapper"
        args: [ref: "kafkaMapper"]
 
streams:
  - name: "spout --> kafkaBolt"
    from: "spout-1"
    to: "bolt-kafka"
    grouping:
      type: LOCAL_OR_SHUFFLE

Для полного рабочего примера конфигурации проверьте это , которое может использоваться следующим образом .

Пример команды для развертывания вашей топологии в Storm:

1
storm jar target/sentiment-analysis-storm-0.0.1-SNAPSHOT.jar org.apache.storm.flux.Flux --remote --c nimbus.host=192.168.1.200 src/test/resources/flux/topology_kafka.yaml

Конфигурация потока для KafkaSpout уже описана как официальный пример потока. Flux — действительно полезная среда, которая устраняет необходимость в специальном коде для определения и инициализации топологии.