Статьи

Временные ряды MongoDB: введение в структуру агрегирования

В моих предыдущих постах я говорил о пакетном импорте и о производительности MongoDB из коробки . Между тем, MongoDB была удостоена звания СУБД года , поэтому я решил предложить более тщательный анализ ее реального использования.

Поскольку теория лучше понимается в прагматическом контексте, я сначала представлю вам наши требования к виртуальному проекту.

Вступление

Наш виртуальный проект имеет следующие требования:

  1. он должен хранить ценные события времени, представленные как v = f (t)
  2. он должен агрегировать записи минимума, максимума, среднего и количества по:
    • секунд в минуту
    • минут в час
    • часов в день
    • дни в году
  3. агрегация секунд в минутах рассчитывается в реальном времени (поэтому она должна быть очень быстрой)
  4. все остальные агрегаты рассчитываются пакетным процессором (поэтому они должны быть относительно быстрыми)

Модель данных

Я предложу два варианта моделирования данных, каждый из которых имеет свои плюсы и минусы.

  1. Первая версия использует автоматически назначенную MongoDB «_id» по умолчанию, и это упрощает вставки, поскольку мы можем делать это партиями, не опасаясь столкновения с метками времени.
    Если в каждую миллисекунду записывается 10 значений, то в итоге мы получим 10 различных документов. Этот пост обсудит этот вариант модели данных.

    1
    2
    3
    4
    5
    {
        "_id" : ObjectId("52cb898bed4bd6c24ae06a9e"),
        "created_on" : ISODate("2012-11-02T01:23:54.010Z")
        "value" : 0.19186609564349055
    }
  2. Вторая версия использует количество миллисекунд с начала эпохи в качестве поля «_id», а значения хранятся в массиве «values» .
    Если в каждую миллисекунду записывается 10 значений, то мы получим один отдельный документ с 10 записями в массиве «values» . Будущий пост будет посвящен этой сжатой модели данных.

    1
    2
    3
    4
    5
    6
    7
    {
            "_id" : 1348436178673,
            "values" : [
                    0.7518879524432123,
                    0.0017396819312125444
            ]
    }

Вставка данных

Как и в моем предыдущем посте, я буду использовать 50M документов для проверки логики агрегации. Я выбрал этот номер, потому что я тестирую на своем обычном ПК . В вышеупомянутом посте мне удалось вставить более 80000 документов в секунду. На этот раз я выберу более реальный подход и начну с создания коллекции и индексов до вставки данных.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
MongoDB shell version: 2.4.6
connecting to: random
> db.dropDatabase()
{ "dropped" : "random", "ok" : 1 }
> db.createCollection("randomData");
{ "ok" : 1 }
> db.randomData.ensureIndex({"created_on" : 1});
> db.randomData.getIndexes()
[
        {
                "v" : 1,
                "key" : {
                        "_id" : 1
                },
                "ns" : "random.randomData",
                "name" : "_id_"
        },
        {
                "v" : 1,
                "key" : {
                        "created_on" : 1
                },
                "ns" : "random.randomData",
                "name" : "created_on_1"
        }
]

Теперь пришло время вставить 50M документов.

1
2
3
4
mongo random --eval "var arg1=50000000;arg2=1" create_random.js
...
Job#1 inserted 49900000 documents.
Job#1 inserted 50000000 in 2852.56s

На этот раз нам удалось импортировать 17500 документов в секунду. При такой скорости нам потребуется 550B записей в год, что более чем достаточно для нашего варианта использования.

Сжатие данных

Во-первых, нам нужно проанализировать нашу статистику сбора и для этого нам нужно использовать команду stats :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
db.randomData.stats()
{
        "ns" : "random.randomData",
        "count" : 50000000,
        "size" : 3200000096,
        "avgObjSize" : 64.00000192,
        "storageSize" : 5297451008,
        "numExtents" : 23,
        "nindexes" : 2,
        "lastExtentSize" : 1378918400,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 3497651920,
        "indexSizes" : {
                "_id_" : 1623442912,
                "created_on_1" : 1874209008
        },
        "ok" : 1
}

Текущий размер индекса составляет почти 3,5 ГБ, и это почти половина моей доступной оперативной памяти. К счастью, MongoDB поставляется с компактной командой, которую мы можем использовать для дефрагментации наших данных. Это занимает много времени, особенно потому, что у нас большой общий размер индекса.

1
2
db.randomData.runCommand("compact");
Compacting took 1523.085s

Давайте посмотрим, сколько места мы сэкономили благодаря сжатию:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
db.randomData.stats()
{
        "ns" : "random.randomData",
        "count" : 50000000,
        "size" : 3200000032,
        "avgObjSize" : 64.00000064,
        "storageSize" : 4415811584,
        "numExtents" : 24,
        "nindexes" : 2,
        "lastExtentSize" : 1149206528,
        "paddingFactor" : 1,
        "systemFlags" : 1,
        "userFlags" : 0,
        "totalIndexSize" : 2717890448,
        "indexSizes" : {
                "_id_" : 1460021024,
                "created_on_1" : 1257869424
        },
        "ok" : 1
}

Мы освободили почти 800 МБ данных, и это будет удобно для наших операций агрегации с интенсивным использованием ОЗУ.

Объяснение логики агрегации

Все четыре отчета об агрегации похожи, поскольку они отличаются только:

  1. интервал выбора времени
  2. гранулярность по времени

Поэтому мы можем начать с первого отчета, который объединяет значения по секундам. Мы будем использовать метод объяснения, чтобы взглянуть на внутреннюю работу нашего агрегата.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
load(pwd() + "/../../util/date_util.js");
var minDate = new Date(Date.UTC(2012, 1, 10, 11, 25, 30));
var maxDate = new Date(Date.UTC(2012, 1, 10, 11, 25, 35));
var result = db.randomData.runCommand('aggregate', { pipeline:
[
    {
        $match: {
            "created_on" : {
                $gte: minDate,
                $lt : maxDate  
            }
        }
    },
    {
        $project: {
            _id : 0,
            created_on : 1,
            value : 1
        }
    },
    {
        $group: {
                "_id": {
                    "year" : {
                        $year : "$created_on"
                    },
                    "dayOfYear" : {
                        $dayOfYear : "$created_on"
                    },
                    "hour" : {
                        $hour : "$created_on"
                    },
                    "minute" : {
                        $minute : "$created_on"
                    },
                    "second" : {
                        $second : "$created_on"
                    },
                },
                "count": {
                    $sum: 1
                },
                "avg": {
                    $avg: "$value"
                },
                "min": {
                    $min: "$value"
                },
                "max": {
                    $max: "$value"
                }      
            }
    },
    {
        $sort: {
            "_id.year" : 1,
            "_id.dayOfYear" : 1,
            "_id.hour" : 1,
            "_id.minute" : 1,
            "_id.second" : 1
        }  
    }
], explain: true});
printjson(result);

Который выводит следующий результат

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
{
        "serverPipeline" : [
                {
                        "query" : {
                                "created_on" : {
                                        "$gte" : ISODate("2012-02-10T11:25:30Z"),
                                        "$lt" : ISODate("2012-02-10T11:25:35Z")
                                }
                        },
                        "projection" : {
                                "created_on" : 1,
                                "value" : 1,
                                "_id" : 0
                        },
                        "cursor" : {
                                "cursor" : "BtreeCursor created_on_1",
                                "isMultiKey" : false,
                                "n" : 5,
                                "nscannedObjects" : 5,
                                "nscanned" : 5,
                                "nscannedObjectsAllPlans" : 5,
                                "nscannedAllPlans" : 5,
                                "scanAndOrder" : false,
                                "indexOnly" : false,
                                "nYields" : 0,
                                "nChunkSkips" : 0,
                                "millis" : 0,
                                "indexBounds" : {
                                        "created_on" : [
                                                [
                                                        ISODate("2012-02-10T11:25:30Z"),
                                                        ISODate("2012-02-10T11:25:35Z")
                                                ]
                                        ]
                                },
                                "allPlans" : [
                                        {
                                                "cursor" : "BtreeCursor created_on_1",
                                                "n" : 5,
                                                "nscannedObjects" : 5,
                                                "nscanned" : 5,
                                                "indexBounds" : {
                                                        "created_on" : [
                                                                [
                                                                        ISODate("2012-02-10T11:25:30Z"),
                                                                        ISODate("2012-02-10T11:25:35Z")
                                                                ]
                                                        ]
                                                }
                                        }
                                ],
                                "oldPlan" : {
                                        "cursor" : "BtreeCursor created_on_1",
                                        "indexBounds" : {
                                                "created_on" : [
                                                        [
                                                                ISODate("2012-02-10T11:25:30Z"),
                                                                ISODate("2012-02-10T11:25:35Z")
                                                        ]
                                                ]
                                        }
                                },
                                "server" : "VLAD:27017"
                        }
                },
                {
                        "$project" : {
                                "_id" : false,
                                "created_on" : true,
                                "value" : true
                        }
                },
                {
                        "$group" : {
                                "_id" : {
                                        "year" : {
                                                "$year" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "dayOfYear" : {
                                                "$dayOfYear" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "hour" : {
                                                "$hour" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "minute" : {
                                                "$minute" : [
                                                        "$created_on"
                                                ]
                                        },
                                        "second" : {
                                                "$second" : [
                                                        "$created_on"
                                                ]
                                        }
                                },
                                "count" : {
                                        "$sum" : {
                                                "$const" : 1
                                        }
                                },
                                "avg" : {
                                        "$avg" : "$value"
                                },
                                "min" : {
                                        "$min" : "$value"
                                },
                                "max" : {
                                        "$max" : "$value"
                                }
                        }
                },
                {
                        "$sort" : {
                                "sortKey" : {
                                        "_id.year" : 1,
                                        "_id.dayOfYear" : 1,
                                        "_id.hour" : 1,
                                        "_id.minute" : 1,
                                        "_id.second" : 1
                                }
                        }
                }
        ],
        "ok" : 1
}

Среда агрегации использует шаблон проектирования канала и фильтра , а наш конвейер состоит из следующих операций:

  1. Сопоставление : эта операция аналогична предложению WHERE SQL, и она является первой, которую мы используем, поскольку мы используем наш индекс «create_on» (например, это подтверждается результатами объяснения: «курсор»: «BtreeCursor made_on_1 ″ ,») , Мы не используем закрывающий индекс (например, «indexOnly»: false ), потому что это было бы избыточным для нашей установки 8 ГБ ОЗУ.
  2. Проект : Эта операция аналогична предложению SELECT SQL и используется для удаления поля «_id» из нашего рабочего набора (что бесполезно для нашей логики отчетности).
  3. Group : эта операция похожа на предложение GROUP BY SQL и выполняет все вычисления в памяти. Вот почему мы отфильтровали рабочий набор до его группировки.
  4. Сортировка : эта операция аналогична предложению ORDER BY SQL, и мы используем ее для сортировки результатов в хронологическом порядке.

Базовый скрипт агрегации

Поскольку наши четыре отчета похожи, мы можем сгруппировать всю логику в один скрипт:

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
function printResult(dataSet) {
    dataSet.result.forEach(function(document)  {
        printjson(document);
    });
}
 
function aggregateData(fromDate, toDate, groupDeltaMillis, enablePrintResult) {    
 
    print("Aggregating from " + fromDate + " to " + toDate);
 
    var start = new Date();
 
    var groupBy = {
        "year" : {
            $year : "$created_on"
        },
        "dayOfYear" : {
            $dayOfYear : "$created_on"
        }
    };
 
    var sortBy = {
            "_id.year" : 1,
            "_id.dayOfYear" : 1
    }; 
 
    var appendSeconds = false;
    var appendMinutes = false;
    var appendHours = false;
 
    switch(groupDeltaMillis) {
        case ONE_SECOND_MILLIS :
            appendSeconds = true;          
        case ONE_MINUTE_MILLIS :
            appendMinutes = true;          
        case ONE_HOUR_MILLIS :
            appendHours = true;    
    }  
 
    if(appendHours) {
        groupBy["hour"] = {
            $hour : "$created_on"  
        };
        sortBy["_id.hour"] = 1;
    }
    if(appendMinutes) {
        groupBy["minute"] = {
            $minute : "$created_on"
        };
        sortBy["_id.minute"] = 1;
    }
    if(appendSeconds) {
        groupBy["second"] = {
            $second : "$created_on"
        };
        sortBy["_id.second"] = 1;
    }  
 
    var pipeline = [
        {
            $match: {
                "created_on" : {
                    $gte: fromDate,
                    $lt : toDate   
                }
            }
        },
        {
            $project: {
                _id : 0,
                created_on : 1,
                value : 1
            }
        },
        {
            $group: {
                    "_id": groupBy,
                    "count": {
                        $sum: 1
                    },
                    "avg": {
                        $avg: "$value"
                    },
                    "min": {
                        $min: "$value"
                    },
                    "max": {
                        $max: "$value"
                    }      
                }
        },
        {
            $sort: sortBy
        }
    ];
 
    var dataSet = db.randomData.aggregate(pipeline);
    var aggregationDuration = (new Date().getTime() - start.getTime())/1000;   
    print("Aggregation took:" + aggregationDuration + "s");
    if(dataSet.result != null && dataSet.result.length > 0) {
        print("Fetched :" + dataSet.result.length + " documents.");
        if(enablePrintResult) {
            printResult(dataSet);
        }
    }
    var aggregationAndFetchDuration = (new Date().getTime() - start.getTime())/1000;
    if(enablePrintResult) {
        print("Aggregation and fetch took:" + aggregationAndFetchDuration + "s");
    }  
    return {
        aggregationDuration : aggregationDuration,
        aggregationAndFetchDuration : aggregationAndFetchDuration
    };
}

Время для результатов

Давайте проверим первые три отчета, используя следующий скрипт:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
load(pwd() + "/../../util/date_util.js");
load(pwd() + "/aggregate_base_report.js");
 
var deltas = [
{
    matchDeltaMillis: ONE_MINUTE_MILLIS,
    groupDeltaMillis: ONE_SECOND_MILLIS,
    description: "Aggregate all seconds in a minute"
},
{
    matchDeltaMillis: ONE_HOUR_MILLIS,
    groupDeltaMillis: ONE_MINUTE_MILLIS,
    description: "Aggregate all minutes in an hour"
},
{
    matchDeltaMillis: ONE_DAY_MILLIS,
    groupDeltaMillis: ONE_HOUR_MILLIS,
    description: "Aggregate all hours in a day"
}
];
 
var testFromDate = new Date(Date.UTC(2012, 5, 10, 11, 25, 59));
 
deltas.forEach(function(delta) {   
    print('Aggregating ' + description);
    var timeInterval = calibrateTimeInterval(testFromDate, delta.matchDeltaMillis);
    var fromDate = timeInterval.fromDate;
    var toDate = timeInterval.toDate;
    aggregateData(fromDate, toDate, delta.groupDeltaMillis, true); 
});

Даем нам следующие результаты:

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
MongoDB shell version: 2.4.6
connecting to: random
Aggregating Aggregate all seconds in a minute
Aggregating from Sun Jun 10 2012 14:25:00 GMT+0300 (GTB Daylight Time) to Sun Jun 10 2012 14:26:00 GMT+0300 (GTB Daylight Time)
Fetched :45 documents.
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 25,
                "second" : 0
        },
        "count" : 1,
        "avg" : 0.4924355132970959,
        "min" : 0.4924355132970959,
        "max" : 0.4924355132970959
}
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 25,
                "second" : 1
        },
        "count" : 1,
        "avg" : 0.10043778014369309,
        "min" : 0.10043778014369309,
        "max" : 0.10043778014369309
}
...
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 25,
                "second" : 59
        },
        "count" : 1,
        "avg" : 0.16304525500163436,
        "min" : 0.16304525500163436,
        "max" : 0.16304525500163436
}
Aggregating from Sun Jun 10 2012 14:00:00 GMT+0300 (GTB Daylight Time) to Sun Jun 10 2012 15:00:00 GMT+0300 (GTB Daylight Time)
Fetched :60 documents.
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 0
        },
        "count" : 98,
        "avg" : 0.4758610369979727,
        "min" : 0.004005654249340296,
        "max" : 0.9938081130385399
}
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 1
        },
        "count" : 100,
        "avg" : 0.5217278444720432,
        "min" : 0.003654648782685399,
        "max" : 0.9981840122491121
}
...
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 11,
                "minute" : 59
        },
        "count" : 92,
        "avg" : 0.5401836506308705,
        "min" : 0.01764239347539842,
        "max" : 0.9997266652062535
}
Aggregating Aggregate all hours in a day
Aggregating from Sun Jun 10 2012 03:00:00 GMT+0300 (GTB Daylight Time) to Mon Jun 11 2012 03:00:00 GMT+0300 (GTB Daylight Time)
Fetched :24 documents.
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 0
        },
        "count" : 5727,
        "avg" : 0.4975644027204364,
        "min" : 0.00020139524713158607,
        "max" : 0.9997993060387671
}
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 1
        },
        "count" : 5799,
        "avg" : 0.49519448930962623,
        "min" : 0.00011728447861969471,
        "max" : 0.9999530822969973
}
...
{
        "_id" : {
                "year" : 2012,
                "dayOfYear" : 162,
                "hour" : 23
        },
        "count" : 5598,
        "avg" : 0.49947314951339256,
        "min" : 0.00009276834316551685,
        "max" : 0.9999523421283811
}

Следите за обновлениями, мой следующий пост покажет вам, как оптимизировать эти запросы агрегации.

  • Код доступен на GitHub .