Enterprise Integration Zone is brought to you in partnership with:

Calvin is the Co-founder of Segment.io. Calvin is a DZone MVB and is not an employee of DZone and has posted 4 posts at DZone. You can read more from them at their website. View Full User Profile

Building Purgatory: Using Node.js to Build a Delayed Queue

08.26.2013
| 4407 views |
  • submit to reddit

At Segment.io, we deal with a lot of important user data on a daily basis. Consequently, our top priorities are that we don’t lose your visitor’s data and that our incoming API stays available at all times.

As you might guess, all of the data that comes in has to be validated against our database. Our API servers maintain a connection to the DB to check that incoming requests are actually good.

If anything in the request doesn’t check out with the DB, we’ll try and return the proper HTTP error code right away.

We don’t immediately queue the message because we want to help our users at integration time. If a user mistypes their API key, we want to warn them as soon as possible. That way, they don’t push to production only to discover two weeks later that they haven’t recored any data.

So the question is, “If Segment.io does a query per tracked event, what happens when the entire database dies?” We obviously can’t throw 500s all over the place or drop our client data because of an internal problem. While we cache certain data, we want to ensure availability even if an entry is expired from the cache.

Enter Purgatory

Purgatory provides a place to store messages to be replayed later. It’s a small module which provides a nice set of primitives for publishing and consuming a delayed queue. We use RabbitMQ and node-amqp, but these principles are adaptable to any platform.

Publishing

To see how to send messages to purgatory, let’s look back at our failure case of when the database goes down. In the case of a lost connection, we should go ahead and publish to purgatory. We give the client the benefit of the doubt and assume that their request was good by returning a 200 immediately.

exports.ingest = function (data, options, callback) {
    validate(data, function (err, result) {
        if (err) {
            // If it's a connection problem, purgatory it
            if (connection.isUnavailable(err)) {
                purgatory.publish('my_key', data);
                return callback(); // end client request
            }
    [...]
}

Whenever the connection is not available, we publish to purgatory. We assume that no one is sending us bad data for the time being.

Subscribing

That’s all well and good, we can publish our messages to the alternative queue. So how do we specify how to pull messages off the queue?

The simplest way to do this is to provide a message handler. Whenever we check whether the message can be reprocessed, the handler is called and decides how to treat the message. It looks something like this:

/**
 * Subscribes a message handler for the given key
 * @param  {String}   key     the queue topic key
 * @param  {Function} handler ({ message      : message,
 *                               headers      : headers,
 *                               deliveryInfo : deliveryInfo },
 *                              callback (err) });
 */
purgatory.subscribe('my_key', function (delivery, callback) {
    var data = delivery.message;
    validate(data, function (err, cleaned) {
        if (err) {
            if (connection.isUnavailable(err)) return callback(err);
            else return callback(); // don't re-queue if it's a validation error
        } else {
    [...]
});

The handler calls back with an error only if the message should be re-queued. In our case, we don’t return an error if there is a validation problem - those messages should be thrown out anyway!

Implementation

Publishing messages in purgatory is a straightforward task. Anywhere in your code where you want a message to be re-processed you can publish to the purgatory exchange.

The most interesting components of purgatory come from handling queued messages. To handle messages, the purgatory module needs to satisfy three requirements:

  1. track various queue handlers by key
  2. process queued messages
  3. re-queue the messages if the database is still down

I’ve posted the full source in a gist, but I’ll also break down parts of the message handling code here.

Queue handlers

To keep track of the queue handlers, we keep an object mapping keys to their handlers. The subscribe function looks like this:

var handlers = {};

exports.subscribe = function (key, handler) {

    var currentHandler = handlers[key];
    handlers[key] = handler;

    // If we are already bound and have set the new handler, return
    if (currentHandler)
        return;

    // Otherwise set up the queue
    declareQueue(rabbit, key);
};

The closured variable allows us to declare new queue handlers depending on the app behavior without having to actually reconnect to the queue.

Processing and re-queuing

To see how purgatory processes those queued messages, let’s take a look at the declareQueue function:

var declareQueue = function (client, key) {

    // Create our queue
    client.queue(queueName(key), function (queue) {

        queue.bind('purgatory', key);

        queue.subscribe({ ack : true },
                        function (message, headers, deliveryInfo) {

            var delivery = { message      : message,
                             headers      : headers,
                             deliveryInfo : deliveryInfo };

            var handlerFn = handlers[key];

            handlerFn(delivery, function (err) {

                if (err) {
                    process.nextTick(function () {
                                        requeue(queue, key, message); });
                } else {
                    process.nextTick(function () { queue.shift(); });
                }
            });
        });
    });
}; 

You’ll notice that the queue is only declared once, but checks the handlers each time in case the handler function changes. When the queue is subscribed, we set the ack option to ensure that messages are only pulled off the queue as they are acknowledged. This ensures that the function doesn’t spin continuously while the database is down.

If we wanted to increase performance, we could even set a window of acking. For simplicity, my current implementation only acknowledges messages one at a time.

If the handler encounters an error indicating a connection problem, the message is put back in the queue. The requeue method then sets a timeout for 10 seconds before acking and attempting to process the message again.

If no error occurs, then purgatory can ack the message right away and run through all the queued messages.

Next steps

In our use of purgatory, we queue messages if there are database connection problems, but this approach can extended to nearly every type of service. The module itself is agnostic to the types of connection errors which can occur.

Our future goal is to have all of our services which do validation be backed by auxiliary queues. Even if a single queue goes down, the server should be able to round-robin between other available queues to eventually complete the request.

If there’s enough interest, I’ll gladly publish the entire code as a separate module on npm. Our use case may be specific, but we’ve found the purgatory module extremely useful at Segment.io. If you’ve found other techniques for providing fault tolerance between different pieces of infrastructure, I’d love to hear about them.





Published at DZone with permission of Calvin French-owen, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)