Статьи

Обработка запасов в реальном времени с помощью Apache NiFi и Apache Kafka, часть 1

Реализация потокового варианта использования от REST до Hive с Apache NiFi и Apache Kafka

Часть 1

С Apache Kafka 2.0 и Apache NiFi 1.8 появилось много новых функций и возможностей. Пришло время проверить их.

Итак, чтобы спланировать, что мы собираемся делать, у меня есть схема архитектуры высокого уровня. Мы собираемся принять ряд источников, включая каналы REST, социальные каналы, сообщения, изображения, документы и реляционные данные.

Мы будем использовать NiFi, а затем фильтровать, обрабатывать и сегментировать его по темам Kafka. Данные Kafka будут в формате Apache Avro со схемами, указанными в реестре схем Hortonworks. Spark и NiFi будут выполнять дополнительную обработку событий, а также машинное обучение и глубокое обучение. Это будет храниться в Druid для аналитики и сводок в реальном времени. Hive, HDFS и S3 будут хранить данные для постоянного хранения. Мы будем делать панели мониторинга с Superset и Spark SQL + Zeppelin.

Мы также будем отправлять очищенные и агрегированные данные подписчикам через Kafka и NiFi. Мы перейдем к Dockerized приложениям, прослушивателям сообщений, веб-клиентам, каналам Slack и спискам рассылки электронной почты.

Чтобы быть полезными на нашем предприятии, мы будем иметь полную авторизацию, аутентификацию, аудит, шифрование данных и передачу данных через Apache Ranger, Apache Atlas и Apache NiFi. Для контроля исходного кода будут использоваться NiFi Registry и GitHub.

У нас будут возможности администрирования через Apache Ambari.

Пример макета сервера:

NiFi Flows

В реальном времени доступны бесплатные биржевые данные от IEX без лицензионного ключа. К счастью, потоки данных очень быстрые, для Apache NiFi и Kafka это не проблема.

Используйте разные записи из тем и сохраняйте их в HDFS в отдельных каталогах и таблицах.

Давайте разделим один большой REST-файл на отдельные интересующие записи. Наш канал REST содержит массивы цитат, графиков и новостей.

Давайте подтолкнем некоторые сообщения, чтобы расслабиться

Мы можем легко использовать несколько тем в Apache NiFi.

Запрашивать данные легко, так как они в движении, так как у нас есть схемы

Мы создаем схемы для каждой из наших тем Кафки

Мы можем отслеживать все эти сообщения, проходящие через Кафку в Амбари (а также гораздо более подробно в Cloudera SMM).

Я читаю данные, а затем могу передать их брокерам Kafka 1.0 и 2.0.

Как только данные отправлены, NiFi сообщит нам.

Используемые проекты

  • Апач Кафка
  • Apache NiFi
  • Апач друид
  • Apache Hive на Кафке
  • Apache Hive от друидов
  • Apache Hive на JDBC
  • Апачский Цеппелин
  • НЛП — Apache OpenNLP и Стэнфордский CoreNLP
  • Horotnworks Schema Registry
  • Нифи Реестр
  • Apache Ambari
  • Поиск по журналу
  • Hortonworks SMM
  • Hortonworks Data Plane Services (DPS)

источники

ОСТАЛЬНЫЕ

Раковины

  • Apache Hadoop HDFS
  • Апач Кафка
  • Apache Hive
  • слабина
  • S3
  • Апач друид
  • Apache HBase

темы

  • iextradingnews
  • iextradingquote
  • iextradingchart
  • акции
  • кибер

HDFS каталоги

hdfs dfs -mkdir -p /iextradingnews

hdfs dfs -mkdir -p /iextradingquote

hdfs dfs -mkdir -p /iextradingchart

hdfs dfs -mkdir -p /stocks

hdfs dfs -mkdir -p /cyber

hdfs dfs -chmod -R 777 /

PutHDFS

  • /${kafka.topic}
  • /iextradingchart/859496561256574.orc
  • /iextradingnews/855935960267509.orc
  • /iextradingquote/859143934804532.orc

Столы Улей

CREATE EXTERNAL TABLE IF NOT EXISTS iextradingchart (`date` STRING, open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE, volume INT, unadjustedVolume INT, change DOUBLE, changePercent DOUBLE, vwap DOUBLE, label STRING, changeOverTime INT)
STORED AS ORC
LOCATION '/iextradingchart';

CREATE EXTERNAL TABLE IF NOT EXISTS iextradingquote (symbol STRING, companyName STRING, primaryExchange STRING, sector STRING, calculationPrice STRING, open DOUBLE, openTime BIGINT, close DOUBLE, closeTime BIGINT, high DOUBLE, low DOUBLE, latestPrice DOUBLE, latestSource STRING, latestTime STRING, latestUpdate BIGINT, latestVolume INT, iexRealtimePrice DOUBLE, iexRealtimeSize INT, iexLastUpdated BIGINT, delayedPrice DOUBLE, delayedPriceTime BIGINT, extendedPrice DOUBLE, extendedChange DOUBLE, extendedChangePercent DOUBLE, extendedPriceTime BIGINT, previousClose DOUBLE, change DOUBLE, changePercent DOUBLE, iexMarketPercent DOUBLE, iexVolume INT, avgTotalVolume INT, iexBidPrice INT, iexBidSize INT, iexAskPrice INT, iexAskSize INT, marketCap INT, peRatio DOUBLE, week52High DOUBLE, week52Low DOUBLE, ytdChange DOUBLE) 
STORED AS ORC
LOCATION '/iextradingquote';

CREATE EXTERNAL TABLE IF NOT EXISTS iextradingnews (`datetime` STRING, headline STRING, source STRING, url STRING, summary STRING, related STRING, image STRING) 
STORED AS ORC 
LOCATION '/iextradingnews';

Schemas

{ "type": "record", "name": "iextradingchart", "fields": [  {  "name": "date",  "type": [  "string",  "null"  ]  },  {  "name": "open",  "type": [  "double",  "null"  ]  },  {  "name": "high",  "type": [  "double",  "null"  ]  },  {  "name": "low",  "type": [  "double",  "null"  ]  },  {  "name": "close",  "type": [  "double",  "null"  ]  },  {  "name": "volume",  "type": [  "int",  "null"  ]  },  {  "name": "unadjustedVolume",  "type": [  "int",  "null"  ]  },  {  "name": "change",  "type": [  "double",  "null"  ]  },  {  "name": "changePercent",  "type": [  "double",  "null"  ]  },  {  "name": "vwap",  "type": [  "double",  "null"  ]  },  {  "name": "label",  "type": [  "string",  "null"  ]  },  {  "name": "changeOverTime",  "type": [  "int",  "null"  ]  } ]}{ "type": "record", "name": "iextradingquote", "fields": [  {  "name": "symbol",  "type": [  "string",  "null"  ],  "doc": "Type inferred from '\"HDP\"'"  },  {  "name": "companyName",  "type": [  "string",  "null"  ],  "doc": "Type inferred from '\"Hortonworks Inc.\"'"  },  {  "name": "primaryExchange",  "type": [  "string",  "null"  ],  "doc": "Type inferred from '\"Nasdaq Global Select\"'"  },  {  "name": "sector",  "type": [  "string",  "null"  ],  "doc": "Type inferred from '\"Technology\"'"  },  {  "name": "calculationPrice",  "type": [  "string",  "null"  ],  "doc": "Type inferred from '\"close\"'"  },  {  "name": "open",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '16.3'"  },  {  "name": "openTime",  "type": [  "long",  "null"  ],  "doc": "Type inferred from '1542033000568'"  },  {  "name": "close",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '15.76'"  },  {  "name": "closeTime",  "type": [  "long",  "null"  ],  "doc": "Type inferred from '1542056400520'"  },  {  "name": "high",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '16.37'"  },  {  "name": "low",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '15.2'"  },  {  "name": "latestPrice",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '15.76'"  },  {  "name": "latestSource",  "type": [  "string",  "null"  ],  "doc": "Type inferred from '\"Close\"'"  },  {  "name": "latestTime",  "type": [  "string",  "null"  ],  "doc": "Type inferred from '\"November 12, 2018\"'"  },  {  "name": "latestUpdate",  "type": [  "long",  "null"  ],  "doc": "Type inferred from '1542056400520'"  },  {  "name": "latestVolume",  "type": [  "int",  "null"  ],  "doc": "Type inferred from '4012339'"  },  {  "name": "iexRealtimePrice",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '15.74'"  },  {  "name": "iexRealtimeSize",  "type": [  "int",  "null"  ],  "doc": "Type inferred from '43'"  },  {  "name": "iexLastUpdated",  "type": [  "long",  "null"  ],  "doc": "Type inferred from '1542056397411'"  },  {  "name": "delayedPrice",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '15.76'"  },  {  "name": "delayedPriceTime",  "type": [  "long",  "null"  ],  "doc": "Type inferred from '1542056400520'"  },  {  "name": "extendedPrice",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '15.85'"  },  {  "name": "extendedChange",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '0.09'"  },  {  "name": "extendedChangePercent",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '0.00571'"  },  {  "name": "extendedPriceTime",  "type": [  "long",  "null"  ],  "doc": "Type inferred from '1542059622726'"  },  {  "name": "previousClose",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '16.24'"  },  {  "name": "change",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '-0.48'"  },  {  "name": "changePercent",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '-0.02956'"  },  {  "name": "iexMarketPercent",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '0.03258'"  },  {  "name": "iexVolume",  "type": [  "int",  "null"  ],  "doc": "Type inferred from '130722'"  },  {  "name": "avgTotalVolume",  "type": [  "int",  "null"  ],  "doc": "Type inferred from '2042809'"  },  {  "name": "iexBidPrice",  "type": [  "int",  "null"  ],  "doc": "Type inferred from '0'"  },  {  "name": "iexBidSize",  "type": [  "int",  "null"  ],  "doc": "Type inferred from '0'"  },  {  "name": "iexAskPrice",  "type": [  "int",  "null"  ],  "doc": "Type inferred from '0'"  },  {  "name": "iexAskSize",  "type": [  "int",  "null"  ],  "doc": "Type inferred from '0'"  },  {  "name": "marketCap",  "type": [  "int",  "null"  ],  "doc": "Type inferred from '1317308142'"  },  {  "name": "peRatio",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '-7.43'"  },  {  "name": "week52High",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '26.22'"  },  {  "name": "week52Low",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '15.2'"  },  {  "name": "ytdChange",  "type": [  "double",  "null"  ],  "doc": "Type inferred from '-0.25696247383444343'"  } ]}{ "type" : "record", "name" : "iextradingchart", "fields" : [ { "name" : "date", "type" :  ["string","null"]  }, { "name" : "open", "type" : ["double","null"]  }, { "name" : "high", "type" : ["double","null"]  }, { "name" : "low", "type" : ["double","null"]  }, { "name" : "close", "type" : ["double","null"]  }, { "name" : "volume", "type" : ["int","null"]  }, { "name" : "unadjustedVolume", "type" : ["int","null"]  }, { "name" : "change", "type" : ["double","null"]  }, { "name" : "changePercent", "type" : ["double","null"]  }, { "name" : "vwap", "type" : ["double","null"]  }, { "name" : "label", "type" :  ["string","null"] }, { "name" : "changeOverTime", "type" : ["int","null"]  } ] }

Сообщения Slack

Файл: $ {‘filename’}

Смещение: $ {‘kafka.offset’}

Раздел: $ {‘kafka.partition’}

Тема: $ {‘kafka.topic’}

UUID: $ {‘uuid’}

Количество записей: $ {‘record.count’}

Размер файла: $ {fileSize: div (1024)} K

Смотрите jsonpath.com

Разделяет

$. *. Котировка

$. *. График

$. *. Новости

Array to Single

$. *

GETHTTP

URL

https://api.iextrading.com/1.0/stock/market/batch?symbols=hdp&types=quote,news,chart&range=1y&last=25000

Имя файла

 marketbatch.hdp.${'hdp':append(${now():format('yyyymmddHHMMSS'):append(${md5}):append('.json')})} Данные предоставлены бесплатно IEX. Ознакомьтесь с условиями использования IEX.

IEX Цена в реальном времени 

Запросы

SELECT * FROM FLOWFILE

WHERE latestPrice > week52Low

SELECT * FROM FLOWFILE

WHERE latestPrice <= week52Low

Пример вывода

Файл: 855957937589894

Смещение: 22460

Раздел: 0

Тема: iextradingquote

UUID: b2a8e797-2249-4689-9a78-4339ddb5ecb4

Количество записей:

Размер файла: 3K

Визуализация данных в Apache Zeppelin с Hive и Spark SQL

Создать таблицы поверх файлов Apache ORC в HDFS очень просто.

Нажмите некоторые сообщения, чтобы расслабиться