
Базовая агрегация в MongoDB 2.0

В некоторых предыдущих статьях о mongodb и python , pymongo и gridfs я представил базу данных NoSQL MongoDB, как использовать ее из Python и как хранить в ней большие (более 16 МБ) файлы. Здесь я покажу вам некоторые функции, которые есть в текущей версии (2.0) MongoDB для выполнения агрегации. В следующем посте я познакомлю вас с новой структурой агрегации, включенной в MongoDB версии 2.1.

Основная агрегация

В MongoDB включены три оператора, полезных для выполнения базовых многодокументных запросов и агрегаций : count (), Different () и group ().


count () является самым основным. count () возвращает количество документов, соответствующих запросу курсора. Чтобы проиллюстрировать это, сначала создадим несколько документов в тестовой коллекции:

import random
import pymongo
conn = pymongo.Connection()
db = conn.agg
...     dict(x=random.random(), y=random.randint(0, 10)) 
...     for i in range(50)])

Теперь давайте посчитаем документы, где у равен 3:


Обычно это то, что мы хотим, но что если наш курсор использует skip () или limit ()? count () игнорирует те:

q = db.data.find({'y':3}).skip(1)

Если вы хотите принять во внимание пропуски и ограничения, вам придется выполнить некоторые (тривиальные) вычисления самостоятельно:

def the_real_count(count, skip=0, limit=None):
    result = max(0, count - skip)
    if limit is not None:
        result = min(result, limit)
    return result

Теперь мы можем использовать это:

the_real_count(q.count(), 1)


Другая полезная вещь — это проверка, какие именно значения возвращает запрос. Для этого MongoDB предоставляет метод Different ():

db.data.find({'y': {'$gt':3}}).distinct('y')
[10, 6, 5, 7, 8, 4, 9]

Обратите внимание, что вам нужно указать поле, для которого вы хотите получить разные значения.


С group () мы можем выполнять несколько более сложные вычисления. Группировка в MongoDB в основном эквивалентна операции redu () в Python; предоставляется начальное значение, а затем все документы в группе преобразуются в него с помощью функции сокращения. Фактическая функция group () принимает несколько параметров, большинство из которых должны быть указаны:


Это может быть одним из следующих:
  • Нет, что указывает на то, что весь документ является ключом группы
  • Список имен свойств, указывающих групповой ключ
  • Функция Javascript, которая будет возвращать ключ группы при вызове для каждого документа


Это условие используется для фильтрации
входных данных групповой операции (т. Е. Оно эквивалентно предложению WHERE в операторе SQL GROUP BY, а
не выражению HAVING.


Это начальный объект, который будет использоваться для «запуска» сокращения.


Это функция Javascript, которая будет использоваться для объединения всех соответствующих документов попарно, пока не останется только один.


Это необязательная функция Javascript, которая будет вызываться в результате сокращения для генерации окончательного значения.
Это может быть полезно, например, для вычисления среднего значения из суммы и числа.

Пример является иллюстративным. Чтобы увидеть количество и сумму всех значений x в нашей коллекции, сгруппированных по значениям y, только для значений y больше 3, запрос будет выглядеть следующим образом:

key = [ 'y' ]
condition = { 'y': { '$gt': 3 } }
initial = { 'count': 0, 'sum': 0 }
reduce = 'function(doc, out) { out.count++; out.sum += doc.x; }'
db.data.group(key, condition, initial, reduce)
[{u'y': 10.0, u'count': 6.0, u'sum': 3.9350696527378295}, 
 {u'y': 6.0, u'count': 10.0, u'sum': 5.312402119253351}, 
 {u'y': 5.0, u'count': 3.0, u'sum': 0.9414083462732217}, 
 {u'y': 7.0, u'count': 7.0, u'sum': 2.992867324959482}, 
 {u'y': 8.0, u'count': 6.0, u'sum': 1.429589852438617}, 
 {u'y': 4.0, u'count': 5.0, u'sum': 1.7804494395579495}, 
 {u'y': 9.0, u'count': 3.0, u'sum': 1.6105657361868966}]

If we would also like to return the mean x value, we can include a finalize function:

finalize = 'function(out) { out.mean = out.sum / out.count; }'
db.data.group(key, condition, initial, reduce, finalize)
[{u'y': 10.0, u'count': 6.0, u'sum': 3.9350696527378295, u'mean': 0.6558449421229716}, 
 {u'y': 6.0, u'count': 10.0, u'sum': 5.312402119253351, u'mean': 0.5312402119253351}, 
 {u'y': 5.0, u'count': 3.0, u'sum': 0.9414083462732217, u'mean': 0.3138027820910739}, 
 {u'y': 7.0, u'count': 7.0, u'sum': 2.992867324959482, u'mean': 0.42755247499421173}, 
 {u'y': 8.0, u'count': 6.0, u'sum': 1.429589852438617, u'mean': 0.23826497540643618}, 
 {u'y': 4.0, u'count': 5.0, u'sum': 1.7804494395579495, u'mean': 0.3560898879115899}, 
 {u'y': 9.0, u'count': 3.0, u'sum': 1.6105657361868966, u'mean': 0.5368552453956322}]



You might be thinking that group() is awesome, and it’s exactly what you need, but there are two things to be aware of with group() which may make it a non-starter for your needs:

  • group() cannot be used on sharded collections. It just doesn’t work. So if you’re doing sharding, you’ll have to go the mapreduce() route (below).
  • group(), since it uses Javascript, uses the SpiderMonkey global interpreter lock, meaning that all Javascript, whether it’s a db.eval call, group, mapreduce, or $where in a query, must be serialized. So don’t plan on doing a lot of group()s concurrently.

If you’re only using group() occasionally on a non-sharded collection, it might be fine. For a more general and (IMO) better solution, however, read on for mapreduce:


MongoDB provides an implementation of the MapReduce algorithm using the collection-level mapreduce method. Once again, we’re using Javascript, so the same caveats apply regarding the global Javascript interpreter lock, but since mapreduce provides support for sharding, we can actually squeeze a bit of parallelism out of it here.

One way to think of mapreduce as used in MongoDB is that it is a generalized group method. The particular differences between mapreudce and group are as follows:

  • mapreduce allows us to provide an initial map function which will be used to generate documents to be grouped by key, whereas group only allowed us to specify a method that returned the key for a given document.
  • mapreduce allows us to keep our results in a collection in the database, rather than returning them immediately the way group does. With mapreduce, we also have great flexibility on how those documents are stored in the collection. We can do one of the following:
  • store the results in their own collection, obliterating any existing collection of the same name,
  • store the results in a possibly pre-existing collection, overwriting any results that are already there, or
  • store the results in a possibly pre-existing collection, reducing the results with any results currently stored there.

So to take up our example from the group discussion above, we can do the same thing in mapreduce as follows:

query = { 'y': { '$gt': 3 } }
map = 'function() { emit(this.y, { count: 1, sum: this.x}); }'
reduce = '''function(key, values) {
...     var result = { count:0, sum: 0 };
...     values.forEach(function(value) {
...         result.count += value.count;
...         result.sum += value.sum; })
...     return result; }'''
finalize = '''function(k, v) { 
...     return { sum: v.sum, 
...              count: v.count,
...              mean: v.sum/v.count } }
... '''
db.data.map_reduce(map, reduce, {'inline':1}, query=query, finalize=finalize)
{u'counts': {u'input': 40, 
             u'reduce': 7, 
             u'emit': 40, 
             u'output': 7}, 
 u'timeMillis': 4, 
 u'ok': 1.0, 
 u'results': [
   {u'_id': 4.0, u'value': {
     u'count': 5.0, 
     u'sum': 1.7804494395579495, 
     u'mean': 0.3560898879115899}}, ...]}

Note in particular that the structure of our documents has changed. With group, we were working on ‘native’ documents from our collection. Now we are working with «keys» and «values», where the «keys» are whatever was the first argument to emit in our map fuction, and the «values» are whatever was the second argument.

The actual mapreduce method takes quite a few parameters in addition to the required map, reduce, and out:

required The Javascript function that calls emit(key, value) for each input document to the reduce. Note that in the map function, this refers to the current document being mapped.
required The Javascript function that takes a key and an array of values and which should return the
reduction of those values. Note that reduce may be called multiple times for a given key, and may be used to further reduce an already reduced value, so the value returned from reduce should be similar in structure to the values emitted from map.
required (at least in Python) An object specifying what to do with the results, whether to return them inline, or to put them into a collection, and how to handle putting them into a collection. Options are listed below.
optional A MongoDB query used to filter the collection before sending its documents to the map function.
optional This will cause the input to map to be sorted. Specifying a sort key that groups the documents by the key that map will emit() can lead to fewer reduce calls. Strictly a performance optimization unless limit is used, below.
optional, not sharding-compatible Limits the number of documents send to map.
optional The Javascript function called on the results of reduce. Useful, for generating secondary aggregates such as mean.
optional Used to set the scope variable for map, reduce, and finalize
If True, then shard the output collection on its _id field.

There are a few others, but they’re less commonly used. The official docs give more details.

The out parameter is one that deserves a bit more explanation. It specifies what to do with the result of the mapreduce job, and can be of one of the following formats:

{‘replace’: <collection name>}
The output is inserted into a collection, atomically replacing any existing collection with the same name.
{‘merge’: <collection name>}
The output is inserted into a collection, overwriting any values with the same key but not removing the entire collection as replace does.
{‘reduce’: <collection name>}
The contents of the collection are treated as inputs to the reduce step. This option allows for incremental updates to an output collection.

You can also specify that the result should be written to another database using the the out parameter. For this, the order of keys matters and if you’re using Python, you should use a collections.ordereddict or bson.SON object:

db.data.map_reduce(map, reduce, bson.SON([
...     ('replace', 'agg_result'), 
...     ('db', 'otheragg')
... ]), query=query, finalize=finalize)
Collection(Database(Connection('localhost', 27017), u'otheragg'), u'agg_result')
{u'_id': 4.0, 
 u'value': {
     u'count': 5.0, 
     u'sum': 1.7804494395579495, 
     u'mean': 0.3560898879115899}}

Mapreduce and Sharding

If you need to get any parallelism out of your mapreduce, you’re going to need to do it on a sharded cluster. If your input collection is sharded, then MongoDB will send a mapreduce job to each shard to be executed in parallel. Once all the shards are done, a final reduce/finalize will be run and the output collection will be written.

The details on how this happens in different versions vary, but the short answer is that if you’re using MongoDB versions under 2.1/2.2, you should not shard your output. In 2.2 and above, sharded output of mapreduce has been overhauled and has much better performance/parallelism.

So what do you think? Anyone using any of the aggregation features in MongoDB? Interested in doing so? I’d love to hear about it in the comments below.