Статьи

MongoDB Pub / Sub с закрытыми коллекциями

Если вы следили за этим блогом в течение какого-то времени, вы знаете, что моей базой данных NoSQL является MongoDB. Одна вещь , которая MongoDB не известна, однако, строит публикации / подписки системы. Redis , с другой стороны, как известно , имеющий высокой пропускной способностью, низкой задержкой PUB / протокол к югу. Я всегда задавался вопросом: могу ли я построить похожую систему поверх ограниченных коллекций MongoDB , и если да, то какова будет производительность. Читайте дальше, чтобы узнать, как это получилось …

Закрытые Коллекции

Если вы раньше не слышали об ограниченных коллекциях, то это отличная небольшая функция MongoDB, которая позволяет создавать высокопроизводительную циклическую очередь. Укупоренные коллекции имеют следующие приятные особенности:

  • Они «запоминают» порядок вставки своих документов
  • Они хранят вставленные документы в порядке вставки на диске
  • Они удаляют самые старые документы в коллекции автоматически при вставке новых документов

Тем не менее, вы отказываетесь от некоторых вещей с закрытыми коллекциями:

  • У них фиксированный максимальный размер
  • Вы не можете осквернить ограниченную коллекцию
  • Любые обновления документов в закрытой коллекции не должны приводить к росту документа. (т.е. не все $setоперации будут работать, и нет $pushили не $pushAllбудет)
  • Вы не можете явно .remove()документы из ограниченной коллекции

Чтобы создать ограниченную коллекцию, просто введите следующую команду (все приведенные ниже примеры написаны на Python, но вы можете использовать любой драйвер, который вам нужен, включая mongoоболочку Javascript ):

db.create_collection(
    'capped_collection',
    capped=True,
    size=size_in_bytes,     # required
    max=max_number_of_docs, # optional
    autoIndexId=False)      # optional

В приведенном выше примере я создал коллекцию, которая занимает size_in_bytesбайты на диске, будет содержать не более max_number_of_docs, и не будет создавать индекс на _idполе, как это обычно происходит. Выше я упоминал, что ограниченная коллекция помнит порядок вставки своих документов. Если вы выдадите find()без указания сортировки или с сортировкой ('$natural', 1), MongoDB отсортирует ваш результат в порядке вставки. ( ($natural, -1)аналогичным образом отсортирует результат в обратном порядке вставки.) Поскольку порядок вставки такой же, как и порядок на диске, эти запросы выполняются очень быстро. Чтобы увидеть это, давайте создадим две коллекции: одну с крышкой и одну без крышки, и заполним обе небольшими документами:

size = 100000

# Create the collection
db.create_collection(
    'capped_collection', 
    capped=True, 
    size=2**20, 
    autoIndexId=False)
db.create_collection(
    'uncapped_collection', 
    autoIndexId=False)

# Insert small documents into both
for x in range(size):
    db.capped_collection.insert({'x':x}, manipulate=False)
    db.uncapped_collection.insert({'x':x}, manipulate=False)

# Go ahead and index the 'x' field in the uncapped collection
db.uncapped_collection.ensure_index('x')

Теперь мы можем увидеть прирост производительности, выполнив find()на каждом. Для этого я буду использовать IPython , IPyMongo и магическую %timeitфункцию:

In [72] (test): %timeit x=list(db.capped_collection.find())
1000 loops, best of 3: 708 us per loop
In [73] (test): %timeit x=list(db.uncapped_collection.find().sort('x'))
1000 loops, best of 3: 912 us per loop

Таким образом, мы получаем умеренное ускорение, что приятно, но не впечатляюще. Что действительно интересно с закрытыми коллекциями, так это то, что они поддерживают настраиваемые курсоры .

Приспосабливаемые курсоры

Если вы запрашиваете ограниченную коллекцию в порядке вставки, вы можете передать специальный флаг, find()который говорит, что она должна «следовать за хвостом» коллекции, если новые документы вставляются, а не возвращать результат запроса по коллекции в время запроса было начато. Это поведение похоже на поведение команды Unix tail -f, отсюда и ее название. Чтобы увидеть это поведение, давайте запросим нашу ограниченную коллекцию с помощью «обычного» курсора, а также «настраиваемого» курсора. Во-первых, «обычный» курсор:

In [76] (test): cur = db.capped_collection.find()

In [77] (test): cur.next()
       Out[77]: {u'x': 0}

In [78] (test): cur.next()
       Out[78]: {u'x': 1}

In [79] (test): db.capped_collection.insert({'y': 1})
       Out[79]: ObjectId('515f205cfb72f0385c3c2414')

In [80] (test): list(cur)
       Out[80]:
[{u'x': 2},
 ...
 {u'x': 99}]

Обратите внимание, что документ, который мы вставили {'y': 1}, не включен в результат, поскольку он был вставлен после того, как мы начали итерацию . Теперь давайте попробуем настраиваемый курсор:

In [81] (test): cur = db.capped_collection.find(tailable=True)

In [82] (test): cur.next()
       Out[82]: {u'x': 1}

In [83] (test): cur.next()
       Out[83]: {u'x': 2}

In [84] (test): db.capped_collection.insert({'y': 2})
       Out[84]: ObjectId('515f20ddfb72f0385c3c2415')

In [85] (test): list(cur)
       Out[85]:
[{u'x': 3},
 ...
 {u'x': 99},
 {u'_id': ObjectId('515f205cfb72f0385c3c2414'), u'y': 1},
 {u'_id': ObjectId('515f20ddfb72f0385c3c2415'), u'y': 2}]

Теперь мы видим, что как «y» документ, который мы создали ранее, так и документ, созданный во время итерации этого курсора, включены в результат.

В ожидании данных

В то время как настраиваемые курсоры удобны для выбора вставок, которые произошли, когда мы перебирали курсор, одна вещь, в которой нуждается настоящая система публикаций / подсистем, — это низкая задержка. Опрос коллекции, чтобы увидеть, были ли вставлены сообщения, не является началом с точки зрения задержки, потому что вы должны сделать одну из двух вещей:

  • Опрос постоянно, используя огромные ресурсы сервера
  • Опрос периодически, увеличивая время ожидания

У настраиваемых курсоров есть еще одна опция, которую вы можете использовать, чтобы «исправить» вышеуказанные проблемы: await_dataфлаг. Этот флаг указывает MongoDB фактически подождать секунду или две на исчерпанном настраиваемом курсоре, чтобы увидеть, будут ли вставлены дополнительные данные. В PyMongo способ установить этот флаг довольно прост:

cur = db.capped_collection.find(
    tailable=True,
    await_data=True)

Создание паба / подсистемы

Хорошо, теперь, когда у нас есть ограниченная коллекция с настраиваемыми курсорами, ожидающими данные, как мы можем превратить это в систему pub / sub? Основной подход:

  • Мы используем одну ограниченную коллекцию среднего размера (скажем, 32 КБ) для всех сообщений
  • Публикация сообщения состоит из вставки документа в эту коллекцию в следующем формате: { 'k': topic, 'data': data }
  • Subscribing to the collection is a tailable query on the collection, using a regular expression to only get the messages we’re interested in.

The actual query we use is similar to the following:

def get_cursor(collection, topic_re, await_data=True):
    options = { 'tailable': True }
    if await_data:
        options['await_data'] = True
    cur = collection.find(
        { 'k': topic_re },
        **options)
    cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes
    return cur

Once we have the get_cursor function, we can do something like the following to execute the query:

import re, time
while True:
    cur = get_cursor(
        db.capped_collection, 
        re.compile('^foo'), 
        await_data=True)
    for msg in cur:
        do_something(msg)
    time.sleep(0.1)

Of course, the system above has a couple of problems:

  • We have to receive every message in the collection before we get to the ‘end’
  • We have to go back to the beginning if we ever exhaust the cursor (and its await_data delay)

The way we can avoid these problems is by adding a sequence number to each message.

Sequences

«But wait,» I imagine you to say, «MongoDB doesn’t have an autoincrement field like MySQL! How can we generate sequences?» The answer lies in the find_and_modify() command, coupled with the $inc operator in MongoDB. To construct our sequence generator, we can use a dedicated «sequence» collection that contains nothing but counters. Each time we need a new sequence number, we perform a find_and_modify() with $inc and get the new number. The code for this turns out to be very short:

class Sequence(object):

    def __init__(self, db, name='mongotools.sequence'):
        self._db = db
        self._name = name

    def cur(self, name):
        doc = self._db[self._name].find_one({'_id': name})
        if doc is None: return 0
        return doc['value']

    def next(self, sname, inc=1):
        doc = self._db[self._name].find_and_modify(
            query={'_id': sname},
            update={'$inc': { 'value': inc } },
            upsert=True,
            new=True)
        return doc['value']

Once we have the ability to generate sequences, we can now add a sequence number to our messages on publication:

def pub(collection, sequence, key, data=None):
    doc = dict(
        ts=sequence.next(collection.name),
        k=key,
        data=data)
    collection.insert(doc, manipulate=False)

Our subscribing query, unfortunately, needs to get a bit more complicated:

def get_cursor(collection, topic_re, last_id=-1, await_data=True):
    options = { 'tailable': True }
    spec = { 
        'ts': { '$gt': last_id }, # only new messages
        'k': topic_re }
    if await_data:
        options['await_data'] = True
    cur = collection.find(spec, **options)
    cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes
    return cur

And our dispatch loop likewise must keep track of the sequence number:

import re, time
last_id = -1
while True:
    cur = get_cursor(
        db.capped_collection, 
        re.compile('^foo'), 
        await_data=True)
    for msg in cur:
        last_id = msg['ts']
        do_something(msg)
    time.sleep(0.1)

We an actually improve upon this a tiny bit by finding the ts field of the last value in the collection and using it to initialize our last_id value:

last_id = -1
cur = db.capped_collection.find().sort([('$natural', -1)])
for msg in cur:
    last_id = msg['ts']
    break
...

So we’ve fixed the problem of processing messages multiple times, but we still have a slow scan of the whole capped collection on startup. Can we fix this? It turns out we can, but not without questionable «magic.»

Now, for some questionable magic…

You may be wondering why I would use a strange name like ts to hold a sequence number. It turns out that there is poorly documented option for cursors that we can abuse to substantially speed up the initial scan of the capped collection: the oplog_replay option. As is apparent from the name of the option, it is mainly used to replay the «oplog», that magic capped collection that makes MongoDB’s replication internals work so well. The oplog uses a ts field to indicate the timestamp of a particular operation, and the oplog_replay option requires the use of a ts field in the query.

Now since oplog_replay isn’t really intended to be (ab)used by us mere mortals, it’s not directly exposed in the PyMongo driver. However, we can manage to get to it via some trickery:

from pymongo.cursor import _QUERY_OPTIONS

def get_cursor(collection, topic_re, last_id=-1, await_data=True):
    options = { 'tailable': True }
    spec = { 
        'ts': { '$gt': last_id }, # only new messages
        'k': topic_re }
    if await_data:
        options['await_data'] = True
    cur = collection.find(spec, **options)
    cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes
    if await:
        cur = cur.add_option(_QUERY_OPTIONS['oplog_replay'])
    return cur

(Yeah, I know it’s bad to import an underscore-prefixed name from another module. But it’s marginally better than simply saying oplog_replay_option=8, which is the other way to make this whole thing work….)

Performance

So now we have the skeleton of a pubsub system using capped collections. If you’d like to use it yourself, all the code is available on Github in the MongoTools project. So how does it perform? Well obviously the performance depends on the particular type of message passing you’re doing. In the MongoTools project, there are a couple of Python example programs latency_test_pub.py and latency_test_sub.py in the mongotools/examples/pubsub directory that allow you to do your own benchmarking. In my personal benchmarking, running everything locally with small messages, I’m able to get about 1100 messages per second with a latency of 2.5ms (with publishing options -n 1 -c 1 -s 0), or about 33,000 messages per second with a latency of 8ms (this is with -n 100 -c 1 -s 0). For pure publishing bandwidth (the subscriber can’t consume this many messages per second), I seem to max out at around 75,000 messages (inserts) per second.

So what do you think? With MongoTools pubsub module is MongoDB a viable competitor to Redis as a low-latency, high-bandwidth pub/sub channel? Let me know in the comments below!