Учебники

Apache Storm — рабочий пример

Мы рассмотрели основные технические детали Apache Storm, и теперь пришло время написать несколько простых сценариев.

Сценарий — Анализатор журнала мобильных вызовов

Мобильный вызов и его продолжительность будут переданы в качестве ввода для Apache Storm, и Storm обработает и сгруппирует вызов между одним и тем же вызывающим абонентом и получателем и их общим количеством вызовов.

Создание носика

Носик является компонентом, который используется для генерации данных. По сути, носик будет реализовывать интерфейс IRichSpout. Интерфейс «IRichSpout» имеет следующие важные методы —

  • open — предоставляет носику среду для выполнения. Исполнители запустят этот метод для инициализации носика.

  • nextTuple — испускает сгенерированные данные через коллектор.

  • close — этот метод вызывается, когда носик отключается.

  • DeclareOutputFields — Объявляет схему вывода кортежа.

  • ack — подтверждает, что обрабатывается определенный кортеж

  • fail — указывает, что определенный кортеж не обработан и не подлежит повторной обработке.

open — предоставляет носику среду для выполнения. Исполнители запустят этот метод для инициализации носика.

nextTuple — испускает сгенерированные данные через коллектор.

close — этот метод вызывается, когда носик отключается.

DeclareOutputFields — Объявляет схему вывода кортежа.

ack — подтверждает, что обрабатывается определенный кортеж

fail — указывает, что определенный кортеж не обработан и не подлежит повторной обработке.

открыто

Подпись открытого метода выглядит следующим образом:

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf — Предоставляет конфигурацию шторма для этого излива.

  • context — предоставляет полную информацию о месте излива в топологии, его идентификаторе задачи, входной и выходной информации.

  • сборщик — позволяет нам выпустить кортеж, который будет обработан болтами.

conf — Предоставляет конфигурацию шторма для этого излива.

context — предоставляет полную информацию о месте излива в топологии, его идентификаторе задачи, входной и выходной информации.

сборщик — позволяет нам выпустить кортеж, который будет обработан болтами.

nextTuple

Сигнатура метода nextTuple выглядит следующим образом:

nextTuple()

nextTuple () периодически вызывается из того же цикла, что и методы ack () и fail (). Он должен освободить управление потоком, когда нет работы, чтобы другие методы имели возможность вызываться. Поэтому первая строка nextTuple проверяет, завершена ли обработка. Если это так, он должен оставаться в режиме ожидания не менее одной миллисекунды, чтобы снизить нагрузку на процессор перед возвратом.

близко

Подпись метода close выглядит следующим образом:

close()

declareOutputFields

Подпись метода DeclareOutputFields выглядит следующим образом:

declareOutputFields(OutputFieldsDeclarer declarer)

Объявление — используется для объявления идентификаторов выходного потока, полей вывода и т. д.

Этот метод используется для указания схемы вывода кортежа.

извед

Суть метода ack заключается в следующем:

ack(Object msgId)

Этот метод подтверждает, что определенный кортеж был обработан.

потерпеть поражение

Сигнатура метода nextTuple выглядит следующим образом:

ack(Object msgId)

Этот метод сообщает, что определенный кортеж не был полностью обработан. Storm обработает определенный кортеж.

FakeCallLogReaderSpout

В нашем сценарии нам нужно собрать данные журнала вызовов. Информация из журнала звонков содержит.

  • номер звонящего
  • номер получателя
  • продолжительность

Поскольку у нас нет информации о журналах вызовов в реальном времени, мы будем генерировать поддельные журналы вызовов. Поддельная информация будет создана с использованием класса Random. Полный код программы приведен ниже.

Кодирование — FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Создание болта

Bolt — это компонент, который принимает кортежи в качестве входных данных, обрабатывает кортежи и создает новые кортежи в качестве выходных данных. Болты будут реализовывать интерфейс IRichBolt . В этой программе для выполнения операций используются два класса болтов CallLogCreatorBolt и CallLogCounterBolt .

Интерфейс IRichBolt имеет следующие методы —

  • подготовить — предоставляет болту среду для выполнения. Исполнители запустят этот метод для инициализации носика.

  • execute — обработать один кортеж ввода.

  • очистка — вызывается, когда затвор собирается отключиться.

  • DeclareOutputFields — Объявляет схему вывода кортежа.

подготовить — предоставляет болту среду для выполнения. Исполнители запустят этот метод для инициализации носика.

execute — обработать один кортеж ввода.

очистка — вызывается, когда затвор собирается отключиться.

DeclareOutputFields — Объявляет схему вывода кортежа.

Подготовить

Подпись метода подготовки выглядит следующим образом:

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf — Предоставляет конфигурацию шторма для этого болта.

  • контекст — предоставляет полную информацию о месте болта в топологии, его идентификаторе задачи, информации ввода и вывода и т. д.

  • collector — позволяет нам генерировать обработанный кортеж.

conf — Предоставляет конфигурацию шторма для этого болта.

контекст — предоставляет полную информацию о месте болта в топологии, его идентификаторе задачи, информации ввода и вывода и т. д.

collector — позволяет нам генерировать обработанный кортеж.

выполнять

Подпись метода execute следующая:

execute(Tuple tuple)

Здесь кортеж — это входной кортеж, который нужно обработать.

Метод execute обрабатывает один кортеж за раз. Доступ к данным кортежа можно получить с помощью метода getValue класса Tuple. Нет необходимости немедленно обрабатывать входной кортеж. Несколько кортежей можно обрабатывать и выводить как один кортеж. Обработанный кортеж может быть передан с помощью класса OutputCollector.

уборка

Подпись метода очистки выглядит следующим образом:

cleanup()

declareOutputFields

Подпись метода DeclareOutputFields выглядит следующим образом:

declareOutputFields(OutputFieldsDeclarer declarer)

Здесь описатель параметров используется для объявления идентификаторов выходного потока, полей вывода и т. Д.

Этот метод используется для указания схемы вывода кортежа

Журнал звонков Creator Bolt

Болт создателя журнала вызовов получает кортеж журнала вызовов. Кортеж журнала вызовов содержит номер звонящего, номер получателя и продолжительность звонка. Этот болт просто создает новое значение, комбинируя номер звонящего и номер получателя. Формат нового значения — «Номер вызывающего абонента — Номер получателя», и оно называется новым полем «Вызов». Полный код приведен ниже.

Кодирование — CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Счетчик звонков

Болт счетчика журнала вызовов принимает вызов и его продолжительность в виде кортежа. Этот болт инициализирует объект словаря (Map) в методе prepare. В методе execute он проверяет кортеж и создает новую запись в объекте словаря для каждого нового значения «call» в кортеже и устанавливает значение 1 в объекте словаря. Для уже доступной записи в словаре она просто увеличивает свое значение. Проще говоря, этот болт сохраняет вызов и его счет в объекте словаря. Вместо сохранения вызова и его количества в словаре, мы также можем сохранить его в источнике данных. Полный код программы выглядит следующим образом:

Кодирование — CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Создание топологии

Топология Storm — это в основном структура Thrift. Класс TopologyBuilder предоставляет простые и легкие методы для создания сложных топологий. Класс TopologyBuilder имеет методы для установки spout (setSpout) и для установки болта (setBolt) . Наконец, TopologyBuilder имеет createTopology для создания топологии. Используйте следующий фрагмент кода для создания топологии —

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

Методы shuffleGrouping и fieldsGrouping помогают установить группировку потока для носика и болтов.

Локальный кластер

В целях разработки мы можем создать локальный кластер, используя объект «LocalCluster», а затем передать топологию, используя метод submitTopology класса «LocalCluster». Одним из аргументов для submitTopology является экземпляр класса Config. Класс «Config» используется для настройки параметров конфигурации перед отправкой топологии. Эта опция конфигурации будет объединена с конфигурацией кластера во время выполнения и отправлена ​​всем задачам (носик и болт) методом подготовки. Как только топология будет передана в кластер, мы подождем 10 секунд, пока кластер вычислит представленную топологию, а затем остановим кластер, используя метод «shutdown» «LocalCluster». Полный код программы выглядит следующим образом:

Кодирование — LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Сборка и запуск приложения

Полное приложение имеет четыре кода Java. Они —

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

Приложение может быть построено с помощью следующей команды —

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Приложение можно запустить с помощью следующей команды —

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Выход

Как только приложение запустится, оно выведет полную информацию о процессе запуска кластера, обработке носика и болтов и, наконец, о процессе отключения кластера. В «CallLogCounterBolt» мы распечатали вызов и детали его подсчета. Эта информация будет отображаться на консоли следующим образом —

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Языки не JVM

Штормовые топологии реализуются интерфейсами Thrift, что позволяет легко передавать топологии на любом языке. Storm поддерживает Ruby, Python и многие другие языки. Давайте посмотрим на привязку Python.

Python Binding

Python — это интерпретируемый, интерактивный, объектно-ориентированный и высокоуровневый язык программирования общего назначения. Storm поддерживает Python для реализации своей топологии. Python поддерживает генерацию, привязку, фиксацию и регистрацию.

Как известно, болты могут быть определены на любом языке. Болты, написанные на другом языке, выполняются как подпроцессы, и Storm связывается с этими подпроцессами с помощью сообщений JSON через stdin / stdout. Сначала возьмем образец болта WordCount, который поддерживает связывание Python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Здесь класс WordCount реализует интерфейс IRichBolt и работает с реализацией Python, указанным в качестве аргумента супер-метода «splitword.py». Теперь создайте реализацию Python с именем «splitword.py».

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

Это пример реализации для Python, который считает слова в данном предложении. Точно так же вы можете связать с другими языками поддержки.