Статьи

Использование MongoDB с Hadoop & Spark: Часть 2 — Пример Hive

Первоначально Написано Мэттом Каланом

Добро пожаловать во вторую часть нашей серии из трех статей о MongoDB и Hadoop. В первой части мы рассказали о Hadoop и о том, как его настроить. В этом посте мы рассмотрим пример Hive.

  1. Введение и настройка Hadoop и MongoDB
  2. Пример улья
  3. Пример искры и ключевые выводы

Для получения более подробной информации о сценарии использования см. Первый абзац части 1 .

Резюме

Вариант использования : объединение 1-минутных интервалов цен на акции в 5-минутные интервалы.
Вход :: 1-минутные интервалы цен на акции в базе данных MongoDB.
Простой анализ : выполняется в:
 — Hive
 — Spark.
Выход : 5-минутные интервалы цен на акции в Hadoop.

Пример улья

Я запустил следующий пример из командной строки Hive (просто набрал команду «hive» без параметров), а не из редактора Hue Cloudera, поскольку для этого потребовались бы дополнительные шаги установки. Я сразу заметил, что люди критикуют Hive за то, что все компилируется в MapReduce, что занимает много времени. Я запустил большинство вещей всего с 20 записями, чтобы запросы выполнялись быстро.

Это создает определение таблицы в Hive, которое соответствует структуре данных в MongoDB. MongoDB имеет динамическую схему для переменных форм данных, но Hive и SQL нуждаются в определении схемы.

 CREATE EXTERNAL TABLE minute_bars
(
    
id STRUCT,
    Symbol STRING,
    Timestamp STRING,
    Day INT,
    Open DOUBLE,
    High DOUBLE,
    Low DOUBLE,
    Close DOUBLE,
    Volume INT
)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id",
 "Symbol":"Symbol", "Timestamp":"Timestamp", "Day":"Day", "Open":"Open", "High":"High", "Low":"Low", "Close":"Close", "Volume":"Volume"}')
TBLPROPERTIES('mongo.uri'='mongodb://localhost:27017/marketdata.minbars');

Недавние изменения в репозитории Apache Hive делают необходимые сопоставления, даже если вы сохраняете имена полей одинаковыми. Это должно быть изменено в MongoDB Hadoop Connector в ближайшее время, если не к тому времени, когда вы прочитаете это.

Затем я выполнил следующую команду, чтобы создать таблицу Hive для 5-минутных баров:

 CREATE TABLE five_minute_bars
(
 
   id STRUCT,
    Symbol STRING,
    Timestamp STRING,
    Open DOUBLE,
    High DOUBLE,
    Low DOUBLE,
    Close DOUBLE
);

Этот оператор вставки использует оконные функции SQL для группировки 5-минутных периодов и определения OHLC для 5-минутного периода. Есть определенно другие способы сделать это, но вот один, который я понял. Группировка в SQL немного отличается от группировки в инфраструктуре агрегации MongoDB (в которой вы можете легко получить первую и последнюю группу), поэтому мне понадобилось некоторое время, чтобы вспомнить, как это сделать с помощью подзапроса.

Подзапрос берет каждую группу из 5 1-минутных записей / документов, сортирует их по времени и берет цены открытия, максимума, минимума и закрытия до этой записи за каждый 5-минутный период. Затем внешнее предложение WHERE выбирает последний 1-минутный бар в этом периоде (поскольку эта строка в подзапросе содержит правильную информацию OHLC для своего 5-минутного периода). Я определенно приветствую более простые для понимания запросы, но вы можете запустить подзапрос самостоятельно, чтобы увидеть, что он делает.

 INSERT INTO TABLE five_minute_bars
SELECT m.id, m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close
FROM
(SELECT
 
    id,
    Symbol,
    FIRST_VALUE(Timestamp)
    OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
            ORDER BY Timestamp)
 
    as OpenTime,
 
    LAST_VALUE(Timestamp)
    OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
            ORDER BY Timestamp)
 
    as CloseTime,
 
    FIRST_VALUE(Open)
    OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
            ORDER BY Timestamp)
 
    as Open,
    MAX(High)
    OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
            ORDER BY Timestamp)
 
    as High,
 
    MIN(Low)
    OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
            ORDER BY Timestamp)
 
    as Low,
    LAST_VALUE(Close)
    OVER (
            PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
            ORDER BY Timestamp)
 
    as Close
FROM minute_bars)
as m
WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4;

Я определенно вижу выгоду от возможности использовать SQL для доступа к данным в MongoDB и, возможно, в других базах данных и форматах файлов, все с теми же командами, в то время как различия в отображении обрабатываются в объявлениях таблиц. Недостатком является то, что задержка довольно высока, но это может быть сделано с возможностью масштабирования по горизонтали между многими узлами. Я думаю, что это привлекательность Hive для большинства людей — они могут масштабироваться до очень больших объемов данных с использованием традиционного SQL, и задержка не является главной проблемой.

Пост № 3 в этой серии блогов показывает похожие примеры использования Spark.

  1. Введение и настройка Hadoop и MongoDB
  2. Пример улья
  3. Пример искры и ключевые выводы

Чтобы узнать больше, посмотрите наше видео на MongoDB и Hadoop. Мы подробно расскажем о соединителе MongoDB для Hadoop и о том, как его можно применять для получения новых бизнес-идей.

СМОТРЕТЬ MONGODB & HADOOP

<< Читать часть 1