NoSQL Zone is brought to you in partnership with:

I'm a pseudo-superhero with my underwear on my head part-time. Most of the time I'm a software developer working at Signpost in New York City. I have a background in consulting for defense and finance and now moving into web and distributed systems. I have worked extensively with C++, C#, and Java as well as a variety of more specialized languages and technologies. I hope my experiences and work can be helpful to others. Contact me if you need any help. Matt is a DZone MVB and is not an employee of DZone and has posted 3 posts at DZone. You can read more from them at their website. View Full User Profile

Why (and How) to Replace Amazon SQS with MongoDB

02.10.2012
| 4567 views |
  • submit to reddit

What is Amazon SQS?

Amazon SQS (Simple Queue Service) is a reliable message queuing service hosted in the Amazon cloud. This service is ideal for sending messages between servers that need to acknowledge that processing has been completed. When a message is popped from the queue, it is not deleted, but marked with the client who has made the request. The client is then responsible for telling SQS to delete the message from the queue. If the client does not delete a message it has popped within a certain time frame, the client loses ownership of the message and it is made available for other clients.

How am I using it?

One of the systems I am using SQS for is a distributed email delivery service (using SMTP). Since there is not an asynchronous SMTP client for Java (that I know of), I am using JavaMail to deliver messages. Sending messages with JavaMail is pretty slow and can take a number of seconds per message, with a thread being consumed for each message sent. In order to send many many messages in parallel I decided to queue up the outoging messages and spin up many instances of the SMTP application. This approach is dead simple and scales wonderfully without needing to implement an asynchronous SMTP client of my own.

So what’s wrong with Amazon SQS?

The main problem with using SQS in the above scenario is that I can’t push an entire email message onto the SQS queue since each SQS message is limited to 8K of data. To get around this, I store the message in MongoDB and then queue the message ID on SQS. Each client then needs to pull a message from the queue and then look up the email in Mongo. There’s nothing really wrong with this approach, but it can be done better, faster, easier, and cheaper. Amazon charges me a fraction of a cent for every operation I perform on SQS. This doesn’t seem like much, but if I have 10 SMTP applications polling SQS 4 times a second all day every day regardless of whether there are new messages to send, this can add up. Plus, I have diagnostic applications watching the queue size to see if I need to spin up more instances or take down instances. Even if this adds up to $10/day, that’s still $3,650/year just to send out email. That’s too much for a startup with no financial backing!

The approach

I have been using MongoDB for a while now and am enamored with what it can do. I know that it can store lots of schema-less data in 4MB chunks (a document is limited to 4MB) and can store larger files through the use of GridFS. I know that it’s lightning fast (almost memcached speed) for indexed lookups and can handle thousands of operations per second without spiking the CPU over 10% even. I know that I’m paying for the CPU and hard drive space on Amazon EC2 already and thoroughly enjoy minimizing my monthly, weekly, and even daily costs. Blah. Blah. Blah. I want to implement this in Mongo!

With the introduction of server-side javascript and the findAndModify command, using MongoDB for a queue that can be accessed by any client language (of which there are a ton!) is just easy. Below is the code that I am using on my own projects.

The Code

sqs.js
function sqsQueueExists(name) {
    return db.queue[name].count() != 0;
};
 
function sqsQueueMessageCount(name) {
    return db.queue[name].count({
        alive: true,
        expires: {$lt: new Date}
    });
};
 
function sqsDeleteQueue(name) {
    db.queue[name].drop();
};
 
function sqsListQueues(prefix) {
    var regex;
    if (prefix)
        regex = new RegExp('^[^.]+\.queue\.' + prefix + '[^$]*$');
    else
        regex = /^[^.]+\.queue\.[^$]+$/;
 
    return db.system.namespaces.find({
        name: regex
    }).map(function (x) {
        return x.name.substring(x.name.indexOf('.') + 7);
    });
};
 
function sqsPushMessage(queue, message) {
    var _push = function(queue, message) {
        db.queue[queue].save({
            alive: true,
            expires: new Date(0),
            owner: new ObjectId('000000000000000000000000'),
            body: message
        });
    };
 
    if (message instanceof Array) {
        message.forEach(function(m) {
            _push(queue, m);
        });
    } else {
        _push(queue, message);
    }
};
 
function sqsPopMessage(queue, owner, count) {
    var now = new Date;
    // 10 second expiration, change this to what you want
    var expires = new Date(now.getTime() + 10000);
 
    if (!count) {
        count = 1;
    }
    var result = [];
    for (var i = 0; i < count; ++i) {
        var item = db.queue[queue].findAndModify({
            query: {
                alive: true,
                expires: {$lt: now}
            },
            update: {
                $set: {
                    expires: expires,
                    owner: owner
                }
            },
            new: true
        });
        if (friendlyEqual({}, item))
            break;
        result.push(item);
    }
    return result;
};
 
function sqsDeleteMessage(queue, owner, item_ids) {
    if (item_ids instanceof ObjectId)
        item_ids = [item_ids];
    db.queue[queue].update({
        alive: true,
        expires: {$gte: new Date},
        owner: owner,
        _id: {$in: item_ids}
    }, {
        $set: {
            alive: false
        }
    },
    false,
    true);
};

If you copy the code into a sqs.js file, you can then run the script below to create the stored procedures on the database of your choice. The alternative is to write the code in the MongoDB driver of your choice.

bash
$ for function in sqsQueueExists sqsQueueMessageCount sqsDeleteQueue sqsListQueues sqsPushMessage
	sqsPopMessage sqsDeleteMessage; do
	echo "db.system.js.save({_id: '$function', value: $function})" |
		mongo [db name] --quiet --shell sqs.js
done
MongoDB Shell
> use [db name];
> load('sqs.js');
> [sqsQueueExists, sqsQueueMessageCount, sqsDeleteQueue, sqsListQueues, sqsPushMessage,
	sqsPopMessage, sqsDeleteMessage].forEach(function (x) {
	var name = x.toString().match(/^function\s(\w+)/)[1];
	db.system.js.save({_id: name, value: x});
});

There are pieces of the SQS API that I have left out of this implementation for simplicity. These are based around the options provided, like changing the visibility timeout of a queue or individual message. This would be pretty trivial to add if necessary. However, if you are not using these features, leaving them out of the code will only make the code faster. To add options to a queue, just add another collection called queue that holds all queue names (in the _id field) and the applicable options. Then just do an query on the queue collection when the options are needed (the _id field is automatically indexed, so this will be fast).

I hope this helps you out!

Source: http://www.mattinsler.com/why-and-how-i-replaced-amazon-sqs-with-mongodb/

 

Published at DZone with permission of Matt Insler, author and DZone MVB.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)