Мы рассмотрели основные технические детали Apache Storm, и теперь пришло время написать несколько простых сценариев.
Сценарий — Анализатор журнала мобильных вызовов
Мобильный вызов и его продолжительность будут переданы в качестве ввода для Apache Storm, и Storm обработает и сгруппирует вызов между одним и тем же вызывающим абонентом и получателем и их общим количеством вызовов.
Создание носика
Носик является компонентом, который используется для генерации данных. По сути, носик будет реализовывать интерфейс IRichSpout. Интерфейс «IRichSpout» имеет следующие важные методы —
-
open — предоставляет носику среду для выполнения. Исполнители запустят этот метод для инициализации носика.
-
nextTuple — испускает сгенерированные данные через коллектор.
-
close — этот метод вызывается, когда носик отключается.
-
DeclareOutputFields — Объявляет схему вывода кортежа.
-
ack — подтверждает, что обрабатывается определенный кортеж
-
fail — указывает, что определенный кортеж не обработан и не подлежит повторной обработке.
open — предоставляет носику среду для выполнения. Исполнители запустят этот метод для инициализации носика.
nextTuple — испускает сгенерированные данные через коллектор.
close — этот метод вызывается, когда носик отключается.
DeclareOutputFields — Объявляет схему вывода кортежа.
ack — подтверждает, что обрабатывается определенный кортеж
fail — указывает, что определенный кортеж не обработан и не подлежит повторной обработке.
открыто
Подпись открытого метода выглядит следующим образом:
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
-
conf — Предоставляет конфигурацию шторма для этого излива.
-
context — предоставляет полную информацию о месте излива в топологии, его идентификаторе задачи, входной и выходной информации.
-
сборщик — позволяет нам выпустить кортеж, который будет обработан болтами.
conf — Предоставляет конфигурацию шторма для этого излива.
context — предоставляет полную информацию о месте излива в топологии, его идентификаторе задачи, входной и выходной информации.
сборщик — позволяет нам выпустить кортеж, который будет обработан болтами.
nextTuple
Сигнатура метода nextTuple выглядит следующим образом:
nextTuple()
nextTuple () периодически вызывается из того же цикла, что и методы ack () и fail (). Он должен освободить управление потоком, когда нет работы, чтобы другие методы имели возможность вызываться. Поэтому первая строка nextTuple проверяет, завершена ли обработка. Если это так, он должен оставаться в режиме ожидания не менее одной миллисекунды, чтобы снизить нагрузку на процессор перед возвратом.
близко
Подпись метода close выглядит следующим образом:
close()
declareOutputFields
Подпись метода DeclareOutputFields выглядит следующим образом:
declareOutputFields(OutputFieldsDeclarer declarer)
Объявление — используется для объявления идентификаторов выходного потока, полей вывода и т. д.
Этот метод используется для указания схемы вывода кортежа.
извед
Суть метода ack заключается в следующем:
ack(Object msgId)
Этот метод подтверждает, что определенный кортеж был обработан.
потерпеть поражение
Сигнатура метода nextTuple выглядит следующим образом:
ack(Object msgId)
Этот метод сообщает, что определенный кортеж не был полностью обработан. Storm обработает определенный кортеж.
FakeCallLogReaderSpout
В нашем сценарии нам нужно собрать данные журнала вызовов. Информация из журнала звонков содержит.
- номер звонящего
- номер получателя
- продолжительность
Поскольку у нас нет информации о журналах вызовов в реальном времени, мы будем генерировать поддельные журналы вызовов. Поддельная информация будет создана с использованием класса Random. Полный код программы приведен ниже.
Кодирование — FakeCallLogReaderSpout.java
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Создание болта
Bolt — это компонент, который принимает кортежи в качестве входных данных, обрабатывает кортежи и создает новые кортежи в качестве выходных данных. Болты будут реализовывать интерфейс IRichBolt . В этой программе для выполнения операций используются два класса болтов CallLogCreatorBolt и CallLogCounterBolt .
Интерфейс IRichBolt имеет следующие методы —
-
подготовить — предоставляет болту среду для выполнения. Исполнители запустят этот метод для инициализации носика.
-
execute — обработать один кортеж ввода.
-
очистка — вызывается, когда затвор собирается отключиться.
-
DeclareOutputFields — Объявляет схему вывода кортежа.
подготовить — предоставляет болту среду для выполнения. Исполнители запустят этот метод для инициализации носика.
execute — обработать один кортеж ввода.
очистка — вызывается, когда затвор собирается отключиться.
DeclareOutputFields — Объявляет схему вывода кортежа.
Подготовить
Подпись метода подготовки выглядит следующим образом:
prepare(Map conf, TopologyContext context, OutputCollector collector)
-
conf — Предоставляет конфигурацию шторма для этого болта.
-
контекст — предоставляет полную информацию о месте болта в топологии, его идентификаторе задачи, информации ввода и вывода и т. д.
-
collector — позволяет нам генерировать обработанный кортеж.
conf — Предоставляет конфигурацию шторма для этого болта.
контекст — предоставляет полную информацию о месте болта в топологии, его идентификаторе задачи, информации ввода и вывода и т. д.
collector — позволяет нам генерировать обработанный кортеж.
выполнять
Подпись метода execute следующая:
execute(Tuple tuple)
Здесь кортеж — это входной кортеж, который нужно обработать.
Метод execute обрабатывает один кортеж за раз. Доступ к данным кортежа можно получить с помощью метода getValue класса Tuple. Нет необходимости немедленно обрабатывать входной кортеж. Несколько кортежей можно обрабатывать и выводить как один кортеж. Обработанный кортеж может быть передан с помощью класса OutputCollector.
уборка
Подпись метода очистки выглядит следующим образом:
cleanup()
declareOutputFields
Подпись метода DeclareOutputFields выглядит следующим образом:
declareOutputFields(OutputFieldsDeclarer declarer)
Здесь описатель параметров используется для объявления идентификаторов выходного потока, полей вывода и т. Д.
Этот метод используется для указания схемы вывода кортежа
Журнал звонков Creator Bolt
Болт создателя журнала вызовов получает кортеж журнала вызовов. Кортеж журнала вызовов содержит номер звонящего, номер получателя и продолжительность звонка. Этот болт просто создает новое значение, комбинируя номер звонящего и номер получателя. Формат нового значения — «Номер вызывающего абонента — Номер получателя», и оно называется новым полем «Вызов». Полный код приведен ниже.
Кодирование — CallLogCreatorBolt.java
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Счетчик звонков
Болт счетчика журнала вызовов принимает вызов и его продолжительность в виде кортежа. Этот болт инициализирует объект словаря (Map) в методе prepare. В методе execute он проверяет кортеж и создает новую запись в объекте словаря для каждого нового значения «call» в кортеже и устанавливает значение 1 в объекте словаря. Для уже доступной записи в словаре она просто увеличивает свое значение. Проще говоря, этот болт сохраняет вызов и его счет в объекте словаря. Вместо сохранения вызова и его количества в словаре, мы также можем сохранить его в источнике данных. Полный код программы выглядит следующим образом:
Кодирование — CallLogCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Создание топологии
Топология Storm — это в основном структура Thrift. Класс TopologyBuilder предоставляет простые и легкие методы для создания сложных топологий. Класс TopologyBuilder имеет методы для установки spout (setSpout) и для установки болта (setBolt) . Наконец, TopologyBuilder имеет createTopology для создания топологии. Используйте следующий фрагмент кода для создания топологии —
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
Методы shuffleGrouping и fieldsGrouping помогают установить группировку потока для носика и болтов.
Локальный кластер
В целях разработки мы можем создать локальный кластер, используя объект «LocalCluster», а затем передать топологию, используя метод submitTopology класса «LocalCluster». Одним из аргументов для submitTopology является экземпляр класса Config. Класс «Config» используется для настройки параметров конфигурации перед отправкой топологии. Эта опция конфигурации будет объединена с конфигурацией кластера во время выполнения и отправлена всем задачам (носик и болт) методом подготовки. Как только топология будет передана в кластер, мы подождем 10 секунд, пока кластер вычислит представленную топологию, а затем остановим кластер, используя метод «shutdown» «LocalCluster». Полный код программы выглядит следующим образом:
Кодирование — LogAnalyserStorm.java
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
Сборка и запуск приложения
Полное приложение имеет четыре кода Java. Они —
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
Приложение может быть построено с помощью следующей команды —
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
Приложение можно запустить с помощью следующей команды —
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Выход
Как только приложение запустится, оно выведет полную информацию о процессе запуска кластера, обработке носика и болтов и, наконец, о процессе отключения кластера. В «CallLogCounterBolt» мы распечатали вызов и детали его подсчета. Эта информация будет отображаться на консоли следующим образом —
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
Языки не JVM
Штормовые топологии реализуются интерфейсами Thrift, что позволяет легко передавать топологии на любом языке. Storm поддерживает Ruby, Python и многие другие языки. Давайте посмотрим на привязку Python.
Python Binding
Python — это интерпретируемый, интерактивный, объектно-ориентированный и высокоуровневый язык программирования общего назначения. Storm поддерживает Python для реализации своей топологии. Python поддерживает генерацию, привязку, фиксацию и регистрацию.
Как известно, болты могут быть определены на любом языке. Болты, написанные на другом языке, выполняются как подпроцессы, и Storm связывается с этими подпроцессами с помощью сообщений JSON через stdin / stdout. Сначала возьмем образец болта WordCount, который поддерживает связывание Python.
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
Здесь класс WordCount реализует интерфейс IRichBolt и работает с реализацией Python, указанным в качестве аргумента супер-метода «splitword.py». Теперь создайте реализацию Python с именем «splitword.py».
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run()
Это пример реализации для Python, который считает слова в данном предложении. Точно так же вы можете связать с другими языками поддержки.