В этой главе мы узнаем, как интегрировать Kafka с Apache Storm.
О Шторме
Первоначально Storm был создан Натаном Марцем и командой BackType. За короткое время Apache Storm стал стандартом для распределенной системы обработки в реальном времени, которая позволяет обрабатывать огромный объем данных. Storm работает очень быстро, и тест показал, что он обрабатывает более миллиона кортежей в секунду на узел. Apache Storm работает непрерывно, потребляя данные из настроенных источников (Spouts) и передает данные по конвейеру обработки (Bolts). Комбинированные, носики и болты составляют топологию.
Интеграция со Storm
Kafka и Storm естественным образом дополняют друг друга, а их мощное сотрудничество позволяет осуществлять потоковую аналитику в реальном времени для быстро перемещающихся больших данных. Интеграция Kafka и Storm облегчает разработчикам прием и публикацию потоков данных из топологий Storm.
Концептуальный поток
Носик является источником потоков. Например, носик может читать кортежи из темы Кафки и выдавать их в виде потока. Болт потребляет входные потоки, обрабатывает и, возможно, испускает новые потоки. Болты могут делать что угодно, от запуска функций, фильтрации кортежей, выполнения потоковых агрегаций, потоковых объединений, общения с базами данных и многого другого. Каждый узел в топологии Storm выполняется параллельно. Топология работает бесконечно, пока вы не прекратите ее. Storm автоматически переназначит все неудачные задачи. Кроме того, Storm гарантирует, что не произойдет потери данных, даже если машины выйдут из строя и сообщения будут отброшены.
Давайте подробно рассмотрим API интеграции Kafka-Storm. Существует три основных класса для интеграции Kafka с Storm. Они заключаются в следующем —
BrokerHosts — ZkHosts & StaticHosts
BrokerHosts — это интерфейс, а ZkHosts и StaticHosts — две его основные реализации. ZkHosts используется для динамического отслеживания брокеров Kafka, сохраняя детали в ZooKeeper, в то время как StaticHosts используется для ручной / статической настройки брокеров Kafka и его данных. ZkHosts — это простой и быстрый способ доступа к брокеру Kafka.
Подпись ZkHosts выглядит следующим образом —
public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)
Где brokerZkStr — это хост ZooKeeper, а brokerZkPath — путь ZooKeeper для поддержки сведений о брокере Kafka.
KafkaConfig API
Этот API используется для определения параметров конфигурации для кластера Kafka. Подпись Кафки Кон-Фига определяется следующим образом
public KafkaConfig(BrokerHosts hosts, string topic)
Хосты — BrokerHosts могут быть ZkHosts / StaticHosts.
Тема — название темы.
Хосты — BrokerHosts могут быть ZkHosts / StaticHosts.
Тема — название темы.
SpoutConfig API
Spoutconfig — это расширение KafkaConfig, которое поддерживает дополнительную информацию ZooKeeper.
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
-
Hosts — BrokerHosts может быть любой реализацией интерфейса BrokerHosts.
-
Тема — название темы.
-
zkRoot — корневой путь ZooKeeper.
-
id — носик хранит состояние смещений, которые он потребляет в Zookeeper. Идентификатор должен однозначно идентифицировать ваш носик.
Hosts — BrokerHosts может быть любой реализацией интерфейса BrokerHosts.
Тема — название темы.
zkRoot — корневой путь ZooKeeper.
id — носик хранит состояние смещений, которые он потребляет в Zookeeper. Идентификатор должен однозначно идентифицировать ваш носик.
SchemeAsMultiScheme
SchemeAsMultiScheme — это интерфейс, который определяет, как преобразованный ByteBuffer из Kafka превращается в штормовый кортеж. Он является производным от MultiScheme и принимает реализацию класса Scheme. Существует множество реализаций класса Scheme, и одной из таких реализаций является StringScheme, которая анализирует байт как простую строку. Он также контролирует наименование вашего поля вывода. Подпись определяется следующим образом.
public SchemeAsMultiScheme(Scheme scheme)
-
Схема — байтовый буфер, потребляемый от кафки.
Схема — байтовый буфер, потребляемый от кафки.
KafkaSpout API
KafkaSpout — это наша реализация spout, которая будет интегрироваться с Storm. Он извлекает сообщения из темы kafka и отправляет их в экосистему Storm в виде кортежей. KafkaSpout получает информацию о конфигурации от SpoutConfig.
Ниже приведен пример кода для создания простого носика Кафки.
// ZooKeeper connection string BrokerHosts hosts = new ZkHosts(zkConnString); //Creating SpoutConfig Object SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName UUID.randomUUID().toString()); //convert the ByteBuffer to String. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //Assign SpoutConfig to KafkaSpout. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Создание болта
Bolt — это компонент, который принимает кортежи в качестве входных данных, обрабатывает кортежи и создает новые кортежи в качестве выходных данных. Болты будут реализовывать интерфейс IRichBolt. В этой программе для выполнения операций используются два класса болтов WordSplitter-Bolt и WordCounterBolt.
Интерфейс IRichBolt имеет следующие методы —
-
Подготовить — Предоставляет болту среду для выполнения. Исполнители запустят этот метод для инициализации носика.
-
Выполнить — обработать один кортеж ввода.
-
Очистка — вызывается, когда затвор собирается отключиться.
-
DeclareOutputFields — Объявляет схему вывода кортежа.
Подготовить — Предоставляет болту среду для выполнения. Исполнители запустят этот метод для инициализации носика.
Выполнить — обработать один кортеж ввода.
Очистка — вызывается, когда затвор собирается отключиться.
DeclareOutputFields — Объявляет схему вывода кортежа.
Давайте создадим SplitBolt.java, который реализует логику для разделения предложения на слова, и CountBolt.java, который реализует логику для разделения уникальных слов и подсчета его появления.
SplitBolt.java
import java.util.Map; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class SplitBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word: words) { word = word.trim(); if(!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public void cleanup() {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
CountBolt.java
import java.util.Map; import java.util.HashMap; import backtype.storm.tuple.Tuple; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class CountBolt implements IRichBolt{ Map<String, Integer> counters; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple input) { String str = input.getString(0); if(!counters.containsKey(str)){ counters.put(str, 1); }else { Integer c = counters.get(str) +1; counters.put(str, c); } collector.ack(input); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counters.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Отправка в топологию
Топология Storm — это в основном структура Thrift. Класс TopologyBuilder предоставляет простые и легкие методы для создания сложных топологий. Класс TopologyBuilder имеет методы для установки spout (setSpout) и для установки болта (setBolt). Наконец, TopologyBuilder имеет createTopology для создания топологии. Методы shuffleGrouping и fieldsGrouping помогают установить группировку потока для носика и болтов.
Локальный кластер. В целях разработки мы можем создать локальный кластер, используя объект LocalCluster,
а затем передать топологию, используя метод submitTopology
класса LocalCluster
.
KafkaStormSample.java
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import java.util.ArrayList; import java.util.List; import java.util.UUID; import backtype.storm.spout.SchemeAsMultiScheme; import storm.kafka.trident.GlobalPartitionInformation; import storm.kafka.ZkHosts; import storm.kafka.Broker; import storm.kafka.StaticHosts; import storm.kafka.BrokerHosts; import storm.kafka.SpoutConfig; import storm.kafka.KafkaConfig; import storm.kafka.KafkaSpout; import storm.kafka.StringScheme; public class KafkaStormSample { public static void main(String[] args) throws Exception{ Config config = new Config(); config.setDebug(true); config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); String zkConnString = "localhost:2181"; String topic = "my-first-topic"; BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic, UUID.randomUUID().toString()); kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.forceFromStart = true; kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig)); builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout"); builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("KafkaStormSample", config, builder.create-Topology()); Thread.sleep(10000); cluster.shutdown(); } }
Перед тем как приступить к компиляции, для интеграции Kakfa-Storm требуется куратор клиентской библиотеки ZooKeeper. Куратор версии 2.9.1 поддерживает Apache Storm версии 0.9.5 (которую мы используем в этом руководстве). Загрузите указанные ниже jar-файлы и поместите их в путь к классу java.
- Куратор-клиент-2.9.1.jar
- Куратор-каркасного 2.9.1.jar
После включения файлов зависимостей, скомпилируйте программу с помощью следующей команды:
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
выполнение
Запустите Kafka Producer CLI (объяснено в предыдущей главе), создайте новую тему под названием my-first-topic
и предоставьте несколько примеров сообщений, как показано ниже —
hello kafka storm spark test message another test message
Теперь запустите приложение, используя следующую команду —
java -cp «/path/to/Kafka/apache-storm-0.9.5/lib/*» :. KafkaStormSample
Пример вывода этого приложения указан ниже —