NoSQL Zone is brought to you in partnership with:

Rick Copeland is the principal consultant at Arborian Consulting, LLC, where he helps clients build custom web applications using Python and MongoDB. He previously worked as a lead software engineer at SourceForge, where he helped lead the transformation from a PHP/Postgres/MySQL codebase to a Python/MongoDB codebase. Rick is the primary author of Ming, a Python object mapper for MongoDB, and Zarkov, a realtime analytics platform based on MongoDB. Prior to GeekNet, Rick worked in fields from retail analytics to hardware chip design. Rick's personal blog is hosted at Just a Little Python. Rick has posted 25 posts at DZone. You can read more from them at their website. View Full User Profile

MongoDB Pub/Sub with Capped Collections

04.08.2013
| 4460 views |
  • submit to reddit

If you've been following this blog for any length of time, you know that my NoSQL database of choice is MongoDB. One thing that MongoDB isn't known for, however, is building a publish / subscribe system. Redis, on the other hand, is known for having a high-bandwith, low-latency pub/sub protocol. One thing I've always wondered is whether I can build a similar system atop MongoDB's capped collections, and if so, what the performance would be. Read on to find out how it turned out...

Capped Collections

If you've not heard of capped collections before, they're a nice little feature of MongoDB that lets you have a high-performance circular queue. Capped collections have the following nice features:

  • They "remember" the insertion order of their documents
  • They store inserted documents in the insertion order on disk
  • They remove the oldest documents in the collection automatically as new documents are inserted

However, you give up some things with capped collections:

  • They have a fixed maximum size
  • You cannot shard a capped collection
  • Any updates to documents in a capped collection must not cause a document to grow. (i.e. not all $set operations will work, and no $push or $pushAll will)
  • You may not explicitly .remove() documents from a capped collection

To create a capped collection, just issue the following command (all the examples below are in Python, but you can use any driver you want including the Javascript mongo shell):

db.create_collection(
    'capped_collection',
    capped=True,
    size=size_in_bytes,     # required
    max=max_number_of_docs, # optional
    autoIndexId=False)      # optional

In the example above, I've created a collection that takes up size_in_bytes bytes on disk, will contain no more than max_number_of_docs, and which does not create an index on the _id field as would normally happen. Above, I mentioned that the capped collection remembers the insertion order of its documents. If you issue a find() with no sort specified, or with a sort of ('$natural', 1), then MongoDB will sort your result in insertion order. (($natural, -1) will likewise sort the result in reverse insertion order.) Since insertion order is the same as the on-disk ordering, these queries are extremely fast. To see this, let's create two collections, one capped and one uncapped, and fill both with small documents:

size = 100000

# Create the collection
db.create_collection(
    'capped_collection', 
    capped=True, 
    size=2**20, 
    autoIndexId=False)
db.create_collection(
    'uncapped_collection', 
    autoIndexId=False)

# Insert small documents into both
for x in range(size):
    db.capped_collection.insert({'x':x}, manipulate=False)
    db.uncapped_collection.insert({'x':x}, manipulate=False)

# Go ahead and index the 'x' field in the uncapped collection
db.uncapped_collection.ensure_index('x')

Now we can see the performance gains by executing find() on each. For this, I'll use the IPython, IPyMongo, and the magic %timeit function:

In [72] (test): %timeit x=list(db.capped_collection.find())
1000 loops, best of 3: 708 us per loop
In [73] (test): %timeit x=list(db.uncapped_collection.find().sort('x'))
1000 loops, best of 3: 912 us per loop

So we get a moderate speedup, which is nice, but not spectacular. What becomes really interesting with capped collections is that they support tailable cursors.

Tailable cursors

If you're querying a capped collection in insertion order, you can pass a special flag to find() that says that it should "follow the tail" of the collection if new documents are inserted rather than returning the result of the query on the collection at the time the query was initiated. This behavior is similar to the behavior of the Unix tail -f command, hence its name. To see this behavior, let's query our capped collection with a 'regular' cursor as well as a 'tailable' cursor. First, the 'regular' cursor:

In [76] (test): cur = db.capped_collection.find()

In [77] (test): cur.next()
       Out[77]: {u'x': 0}

In [78] (test): cur.next()
       Out[78]: {u'x': 1}

In [79] (test): db.capped_collection.insert({'y': 1})
       Out[79]: ObjectId('515f205cfb72f0385c3c2414')

In [80] (test): list(cur)
       Out[80]:
[{u'x': 2},
 ...
 {u'x': 99}]

Notice above that the document we inserted {'y': 1} is not included in the result since it was inserted after we started iterating. Now, let's try a tailable cursor:

In [81] (test): cur = db.capped_collection.find(tailable=True)

In [82] (test): cur.next()
       Out[82]: {u'x': 1}

In [83] (test): cur.next()
       Out[83]: {u'x': 2}

In [84] (test): db.capped_collection.insert({'y': 2})
       Out[84]: ObjectId('515f20ddfb72f0385c3c2415')

In [85] (test): list(cur)
       Out[85]:
[{u'x': 3},
 ...
 {u'x': 99},
 {u'_id': ObjectId('515f205cfb72f0385c3c2414'), u'y': 1},
 {u'_id': ObjectId('515f20ddfb72f0385c3c2415'), u'y': 2}]

Now we see that both the "y" document we created before as well as the one created during this cursor's iteration included in the result.

Waiting on data

While tailable cursors are nice for picking up the inserts that happened while we were iterating over the cursor, one thing that a true pub/sub system needs is low latency. Polling the collection to see if messages have been inserted is a non-starter from a latency standpoint because you have to do one of two things:

  • Poll continuously, using prodigious server resources
  • Poll intermittently, increasing latency

Tailable cursors have another option you can use to "fix" the above problems: the await_data flag. This flag tells MongoDB to actually wait a second or two on an exhausted tailable cursor to see if more data is going to be inserted. In PyMongo, the way to set this flag is quite simple:

cur = db.capped_collection.find(
    tailable=True,
    await_data=True)

Building a pub/sub system

OK, now that we have a capped collection, with tailable cursors awaiting data, how can we make this into a pub/sub system? The basic approach is:

  • We use a single capped collection of moderate size (let's say 32kB) for all messages
  • Publishing a message consists of inserting a document into this collection with the following format: { 'k': topic, 'data': data }
  • Subscribing to the collection is a tailable query on the collection, using a regular expression to only get the messages we're interested in.

The actual query we use is similar to the following:

def get_cursor(collection, topic_re, await_data=True):
    options = { 'tailable': True }
    if await_data:
        options['await_data'] = True
    cur = collection.find(
        { 'k': topic_re },
        **options)
    cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes
    return cur

Once we have the get_cursor function, we can do something like the following to execute the query:

import re, time
while True:
    cur = get_cursor(
        db.capped_collection, 
        re.compile('^foo'), 
        await_data=True)
    for msg in cur:
        do_something(msg)
    time.sleep(0.1)

Of course, the system above has a couple of problems:

  • We have to receive every message in the collection before we get to the 'end'
  • We have to go back to the beginning if we ever exhaust the cursor (and its await_data delay)

The way we can avoid these problems is by adding a sequence number to each message.

Sequences

"But wait," I imagine you to say, "MongoDB doesn't have an autoincrement field like MySQL! How can we generate sequences?" The answer lies in the find_and_modify() command, coupled with the $inc operator in MongoDB. To construct our sequence generator, we can use a dedicated "sequence" collection that contains nothing but counters. Each time we need a new sequence number, we perform a find_and_modify() with $inc and get the new number. The code for this turns out to be very short:

class Sequence(object):

    def __init__(self, db, name='mongotools.sequence'):
        self._db = db
        self._name = name

    def cur(self, name):
        doc = self._db[self._name].find_one({'_id': name})
        if doc is None: return 0
        return doc['value']

    def next(self, sname, inc=1):
        doc = self._db[self._name].find_and_modify(
            query={'_id': sname},
            update={'$inc': { 'value': inc } },
            upsert=True,
            new=True)
        return doc['value']

Once we have the ability to generate sequences, we can now add a sequence number to our messages on publication:

def pub(collection, sequence, key, data=None):
    doc = dict(
        ts=sequence.next(collection.name),
        k=key,
        data=data)
    collection.insert(doc, manipulate=False)

Our subscribing query, unfortunately, needs to get a bit more complicated:

def get_cursor(collection, topic_re, last_id=-1, await_data=True):
    options = { 'tailable': True }
    spec = { 
        'ts': { '$gt': last_id }, # only new messages
        'k': topic_re }
    if await_data:
        options['await_data'] = True
    cur = collection.find(spec, **options)
    cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes
    return cur

And our dispatch loop likewise must keep track of the sequence number:

import re, time
last_id = -1
while True:
    cur = get_cursor(
        db.capped_collection, 
        re.compile('^foo'), 
        await_data=True)
    for msg in cur:
        last_id = msg['ts']
        do_something(msg)
    time.sleep(0.1)

We an actually improve upon this a tiny bit by finding the ts field of the last value in the collection and using it to initialize our last_id value:

last_id = -1
cur = db.capped_collection.find().sort([('$natural', -1)])
for msg in cur:
    last_id = msg['ts']
    break
...

So we've fixed the problem of processing messages multiple times, but we still have a slow scan of the whole capped collection on startup. Can we fix this? It turns out we can, but not without questionable "magic."

Now, for some questionable magic...

You may be wondering why I would use a strange name like ts to hold a sequence number. It turns out that there is poorly documented option for cursors that we can abuse to substantially speed up the initial scan of the capped collection: the oplog_replay option. As is apparent from the name of the option, it is mainly used to replay the "oplog", that magic capped collection that makes MongoDB's replication internals work so well. The oplog uses a ts field to indicate the timestamp of a particular operation, and the oplog_replay option requires the use of a ts field in the query.

Now since oplog_replay isn't really intended to be (ab)used by us mere mortals, it's not directly exposed in the PyMongo driver. However, we can manage to get to it via some trickery:

from pymongo.cursor import _QUERY_OPTIONS

def get_cursor(collection, topic_re, last_id=-1, await_data=True):
    options = { 'tailable': True }
    spec = { 
        'ts': { '$gt': last_id }, # only new messages
        'k': topic_re }
    if await_data:
        options['await_data'] = True
    cur = collection.find(spec, **options)
    cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes
    if await:
        cur = cur.add_option(_QUERY_OPTIONS['oplog_replay'])
    return cur

(Yeah, I know it's bad to import an underscore-prefixed name from another module. But it's marginally better than simply saying oplog_replay_option=8, which is the other way to make this whole thing work....)

Performance

So now we have the skeleton of a pubsub system using capped collections. If you'd like to use it yourself, all the code is available on Github in the MongoTools project. So how does it perform? Well obviously the performance depends on the particular type of message passing you're doing. In the MongoTools project, there are a couple of Python example programs latency_test_pub.py and latency_test_sub.py in the mongotools/examples/pubsub directory that allow you to do your own benchmarking. In my personal benchmarking, running everything locally with small messages, I'm able to get about 1100 messages per second with a latency of 2.5ms (with publishing options -n 1 -c 1 -s 0), or about 33,000 messages per second with a latency of 8ms (this is with -n 100 -c 1 -s 0). For pure publishing bandwidth (the subscriber can't consume this many messages per second), I seem to max out at around 75,000 messages (inserts) per second.

So what do you think? With MongoTools pubsub module is MongoDB a viable competitor to Redis as a low-latency, high-bandwidth pub/sub channel? Let me know in the comments below!

Published at DZone with permission of its author, Rick Copeland. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)