Учебники

Apache Storm — Трезубец

Трайдент — это продолжение Шторма. Как и Storm, Trident был также разработан Twitter. Основной причиной разработки Trident является предоставление высокоуровневой абстракции поверх Storm наряду с обработкой потоков с отслеживанием состояния и распределенными запросами с низкой задержкой.

Trident использует носик и болт, но эти низкоуровневые компоненты автоматически создаются Trident перед выполнением. У Trident есть функции, фильтры, объединения, группировка и агрегация.

Trident обрабатывает потоки как серию пакетов, которые называются транзакциями. Обычно размер этих небольших пакетов будет порядка тысяч или миллионов кортежей, в зависимости от входного потока. Таким образом, Trident отличается от Storm, который выполняет обработку по кортежу.

Концепция пакетной обработки очень похожа на транзакции базы данных. Каждой транзакции присваивается идентификатор транзакции. Транзакция считается успешной, как только вся ее обработка завершена. Однако сбой в обработке одного из кортежей транзакции приведет к повторной передаче всей транзакции. Для каждого пакета Trident будет вызывать beginCommit в начале транзакции и фиксировать в конце транзакции.

Топология трезубца

Trident API предоставляет простой вариант создания топологии Trident с использованием класса «TridentTopology». По сути, топология Trident получает входной поток из носика и выполняет упорядоченную последовательность операций (фильтрацию, агрегацию, группировку и т. Д.) В потоке. Storm Tuple заменяется на Trident Tuple, а болты заменяются операциями. Простая топология Trident может быть создана следующим образом:

TridentTopology topology = new TridentTopology();

Трайдент Туплс

Кортеж Trident — это именованный список значений. Интерфейс TridentTuple — это модель данных топологии Trident. Интерфейс TridentTuple — это основная единица данных, которая может обрабатываться топологией Trident.

Трезубец Носик

Носик Trident похож на носик Storm, с дополнительными опциями для использования функций Trident. На самом деле, мы все еще можем использовать IRichSpout, который мы использовали в топологии Storm, но он будет не транзакционным по своей природе, и мы не сможем использовать преимущества, предоставляемые Trident.

Основной носик, имеющий все функциональные возможности для использования функций Trident, является «ITridentSpout». Он поддерживает как транзакционную, так и непрозрачную транзакционную семантику. Другими источниками являются IBatchSpout, IPartitionedTridentSpout и IOpaquePartitionedTridentSpout.

В дополнение к этим универсальным носикам, Trident имеет множество примеров реализации носика trident. Одним из них является носик FeederBatchSpout, который мы можем использовать для простой отправки именованного списка кортежей трезубца, не беспокоясь о пакетной обработке, параллелизме и т. Д.

Создание FeederBatchSpout и подача данных могут быть выполнены, как показано ниже —

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Трезубец Операции

Trident полагается на «Операцию Trident» для обработки входного потока кортежей трезубца. Trident API имеет ряд встроенных операций для обработки потоковой обработки от простого к сложному. Эти операции варьируются от простой проверки до сложной группировки и агрегации кортежей трезубца. Давайте пройдемся по самым важным и часто используемым операциям.

Фильтр

Фильтр — это объект, используемый для выполнения проверки ввода. Фильтр Trident получает подмножество полей кортежа trident в качестве входных данных и возвращает либо true, либо false, в зависимости от того, выполнены определенные условия или нет. Если true возвращается, тогда кортеж сохраняется в выходном потоке; в противном случае кортеж удаляется из потока. Фильтр будет в основном наследоваться от класса BaseFilter и реализовывать метод isKeep . Вот пример реализации операции фильтра —

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Функция фильтра может быть вызвана в топологии с использованием метода «каждый». Класс «Поля» может использоваться для указания входных данных (подмножество кортежа трезубца). Пример кода выглядит следующим образом —

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

функция

Функция — это объект, используемый для выполнения простой операции с одним кортежем трезубца. Он принимает подмножество полей кортежа трезубца и испускает ноль или более новых полей кортежа трезубца.

Функция в основном наследуется от класса BaseFunction и реализует метод execute . Пример реализации приведен ниже —

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Как и операция Filter, функция Function может вызываться в топологии с использованием каждого метода. Пример кода выглядит следующим образом —

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

агрегирование

Агрегация — это объект, используемый для выполнения операций агрегации во входном пакете, разделе или потоке. Трайдент имеет три типа агрегации. Они заключаются в следующем —

  • агрегатагрегирует каждую партию кортежа трезубца изолированно. В процессе агрегации кортежи первоначально перераспределяются с использованием глобальной группировки, чтобы объединить все разделы одного пакета в один раздел.

  • partitionAggregate — агрегирует каждый раздел вместо всего пакета трезубец кортежа. Выходные данные агрегата разделов полностью заменяют входной кортеж. Выходные данные агрегата раздела содержат один кортеж поля.

  • persistentaggregate — агрегирует все кортежи трезубца во всех пакетах и ​​сохраняет результат либо в памяти, либо в базе данных.

агрегатагрегирует каждую партию кортежа трезубца изолированно. В процессе агрегации кортежи первоначально перераспределяются с использованием глобальной группировки, чтобы объединить все разделы одного пакета в один раздел.

partitionAggregate — агрегирует каждый раздел вместо всего пакета трезубец кортежа. Выходные данные агрегата разделов полностью заменяют входной кортеж. Выходные данные агрегата раздела содержат один кортеж поля.

persistentaggregate — агрегирует все кортежи трезубца во всех пакетах и ​​сохраняет результат либо в памяти, либо в базе данных.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Операция агрегации может быть создана с использованием CombinerAggregator, ReducerAggregator или универсального интерфейса Aggregator. Агрегатор «count», используемый в приведенном выше примере, является одним из встроенных агрегаторов. Он реализован с использованием «CombinerAggregator». Реализация выглядит следующим образом:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

группирование

Операция группировки является встроенной операцией и может вызываться методом groupBy . Метод groupBy перераспределяет поток, выполняя partitionBy для указанных полей, а затем внутри каждого раздела группирует кортежи, чьи групповые поля равны. Обычно мы используем «groupBy» вместе с «persistentAggregate», чтобы получить группированную агрегацию. Пример кода выглядит следующим образом —

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Слияние и соединение

Слияние и объединение могут быть выполнены с использованием методов «слияния» и «соединения» соответственно. Слияние объединяет один или несколько потоков. Объединение аналогично объединению, за исключением того, что при объединении используется поле кортежа трезубца с обеих сторон для проверки и объединения двух потоков. Кроме того, объединение будет работать только на уровне партии. Пример кода выглядит следующим образом —

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Государственное обслуживание

Трайдент предоставляет механизм для поддержания государства. Информация о состоянии может храниться в самой топологии, в противном случае вы также можете хранить ее в отдельной базе данных. Причина заключается в том, чтобы поддерживать состояние, при котором во время обработки происходит сбой любого кортежа, а затем сбойный кортеж повторяется. Это создает проблему при обновлении состояния, поскольку вы не уверены, было ли ранее обновлено состояние этого кортежа или нет. Если перед обновлением состояния произошел сбой в кортеже, то повторная попытка кортежа сделает состояние стабильным. Однако если после обновления состояния произошел сбой в кортеже, повторная попытка того же кортежа снова увеличит число в базе данных и сделает состояние нестабильным. Чтобы убедиться, что сообщение обработано только один раз, необходимо выполнить следующие шаги:

  • Обработайте кортежи небольшими партиями.

  • Присвойте уникальный идентификатор каждой партии. Если партия повторяется, ей присваивается тот же уникальный идентификатор.

  • Обновления состояния заказываются между партиями. Например, обновление состояния второго пакета будет невозможно до тех пор, пока обновление состояния для первого пакета не будет завершено.

Обработайте кортежи небольшими партиями.

Присвойте уникальный идентификатор каждой партии. Если партия повторяется, ей присваивается тот же уникальный идентификатор.

Обновления состояния заказываются между партиями. Например, обновление состояния второго пакета будет невозможно до тех пор, пока обновление состояния для первого пакета не будет завершено.

Распределенный RPC

Распределенный RPC используется для запроса и получения результата из топологии Trident. Storm имеет встроенный распределенный RPC-сервер. Распределенный сервер RPC получает запрос RPC от клиента и передает его в топологию. Топология обрабатывает запрос и отправляет результат на распределенный RPC-сервер, который перенаправляется распределенным RPC-сервером клиенту. Распределенный RPC-запрос Trident выполняется как обычный RPC-запрос, за исключением того факта, что эти запросы выполняются параллельно.

Когда использовать Trident?

Как и во многих случаях использования, если требование обрабатывать запрос только один раз, мы можем достичь этого, написав топологию в Trident. С другой стороны, в случае Storm будет трудно добиться ровно однократной обработки. Следовательно, Trident будет полезен для тех случаев, когда вам требуется ровно один раз обработки. Trident подходит не для всех вариантов использования, особенно для высокопроизводительных, поскольку он добавляет сложности в Storm и управляет состоянием.

Рабочий пример Трайдента

Мы собираемся преобразовать наше приложение для анализа журнала вызовов, разработанное в предыдущем разделе, в среду Trident. Приложение Trident будет относительно простым по сравнению с обычным штормом благодаря высокоуровневому API. Storm в основном потребуется для выполнения любой из функций Function, Filter, Aggregate, GroupBy, Join и Merge в Trident. Наконец, мы запустим сервер DRPC, используя класс LocalDRPC, и произведем поиск по ключевому слову, используя метод execute класса LocalDRPC.

Форматирование информации о звонке

Целью класса FormatCall является форматирование информации о вызове, содержащей «Номер вызывающего абонента» и «Номер получателя». Полный код программы выглядит следующим образом:

Кодирование: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Целью класса CSVSplit является разделение входной строки на основе «запятая (,)» и испускание каждого слова в строке. Эта функция используется для анализа входного аргумента распределенных запросов. Полный код выглядит следующим образом —

Кодирование: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

Это основное приложение. Сначала приложение будет инициализировать TridentTopology и информацию о вызывающем абоненте с помощью FeederBatchSpout . Поток топологии Trident может быть создан с использованием метода newStream класса TridentTopology. Точно так же поток DRPC топологии Trident может быть создан с использованием метода newDRCPStream класса TridentTopology. Простой сервер DRCP может быть создан с использованием класса LocalDRPC. LocalDRPC имеет метод execute для поиска по ключевому слову. Полный код приведен ниже.

Кодирование: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Сборка и запуск приложения

Полное приложение имеет три кода Java. Они заключаются в следующем —

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

Приложение может быть построено с помощью следующей команды —

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Приложение можно запустить с помощью следующей команды —

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Выход

Как только приложение будет запущено, оно выведет полную информацию о процессе запуска кластера, обработке операций, информации о сервере DRPC и клиенте и, наконец, о процессе остановки кластера. Этот вывод будет отображаться на консоли, как показано ниже.