NoSQL Zone is brought to you in partnership with:

I've been a software engineer for about 7 years now. In the beginning, I started my career in the financial industry in Wall Street, working on overly complex systems that almost made me lose interest in the software craft. A couple years later, I decided to move on to working with startups and haven't looked back since. I particularly, enjoy working with Redis and using it's simple datastructures to efficiently solve complex problems. Santosh is a DZone MVB and is not an employee of DZone and has posted 9 posts at DZone. You can read more from them at their website. View Full User Profile

Parallelizing Work with Redis

02.28.2013
| 2391 views |
  • submit to reddit

Curator's Note:  Here's a Redis how-to from back in 2011. 

Anyone who has heard of Redis has probably also heard of Resque, which is a lightweight queue'ing system. To the uninitiated it might seem strange, or maybe even impossible, to construct a queue'ing system using just a key-value store. In this article, I’m going to break down some of the primitives redis exposes that make building a queue'ing system over it trivial and show how Redis is so much more than just a key-value store.

The problem

Let’s say, you are a mathematician and have just come up with this super performant way of computing factors of numbers. You decide to write up the following sinatra service:

def compute_factors(number)
  factors = crazy_performant_computation number
end

get "/compute_factors" do
  number, post_back_url = params[:number].to_i, params[:post_back_url]
  RestClient.post post_back_url, factors => compute_factors(number).to_json
  "OK"
end

You soon start seeing crazy traffic and realize, performant as your factor computation algorithm is, it’s not fast enough to keep up with the speed at which you are getting requests to your service.

First pass at optimization by forking

You realize that it’s going to be far more efficient to fork off a new Process or Thread and have that perform the computation and post back the result. So your code now changes to:

get "/compute_factors" do
  number, post_back_url = params[:number].to_i, params[:post_back_url]
  Process.fork do
    RestClient.post post_back_url, factors => compute_factors(number).to_json
  end
  "OK"
end

While, this is great you soon realize that filling up the process table in your OS is not such a good idea.

Capping process creation using a process pool

It is exactly this problem that a process pool was meant to solve. The basic idea is that you would still like to perform your time-intensive task in the background, but would like to put a cap on the number of background processes you have running. There are some excellent libraries that solve this problem such as Delayed Job and Resque. However, being the hacker that you are, you decide to roll one yourself. There are however a bunch of issues that these libraries solve and you decide to pull a pen and paper and note them down to ensure that you are not missing anything:

Cap how many workers you create

You need to have a way to cap the number of background workers you create, that way you don’t have the same problem you were having before.

Control worker creation and destruction

You would like to be able to boot up and bring down your workers reasonably gracefully.

Handle race conditions

You realize, that spinning new processes means that you now have to ensure your code is concurrent-safe. Redis provides, some wonderful atomic operations out-of-the-box so this shouldn’t be too hard.

Second pass using BRPOP

Redis supports a couple of interesting data-structures including lists, sets and hashes. Redis lists have a command called RPOP which basically lets you pop an item off the tail of a list, in essence treating it like a queue. The RPOP command comes with a blocking variant of itself called BRPOP that blocks on the call to popping an element from the list. You can also specify a timeout for how long (in seconds) you would like to block on the call.

def compute_factors(number)
  factors = crazy_performant_computation number
end

NUMBER_OF_WORKERS = (ENV['NUMBER_OF_WORKERS'] || 50).to_i
NUMBER_OF_WORKERS.times do
  Process.fork do
    redis = Redis.new
    loop do
      val = redis.brpop "work_queue", 1
      unless val
        puts "Process: #{Process.pid} is exiting"
        exit 0
      end

      number, postback_url = Marshal.load val.last
      RestClient.post postback_url, factors => compute_factors(number).to_json
    end
  end
end

redis = Redis.new
get "/compute_factors" do
  number, post_back_url = params[:number].to_i, params[:post_back_url]
  redis.lpush "work_queue", Marshal.dump([number, post_back_url])
  "OK"
end

So you now have solved a bunch of problems in this new approach. We have a fixed number of workers running to handle our background processing – so now our process table getting filled is not subject to traffic conditions. Race conditions are handled for us by Redis, since BRPOP is atomic and guarantees no two workers will do duplicate work. And finally, workers destroy themselves if they break out of the brpop call due to their timeout being hit, in this case 1 second. So, that’s quite a slew of problems that have been solved for us by virtue of just using redis. We soon start, seeing a different problem though. As traffic in our site lags, workers seem to be dying off since their timeout is being hit. We’d really like to now have the workers block for a longer time than just 1 second, while also having the option to kill them off sooner if we need to. That way, they’ll not be hanging around for any longer than they have to.

Gracefully shutting down workers

Our mandate now is to shutdown our workers gracefully, using redis and little bit of UNIX signals magic (for examples of using signals in this area checkout Unicorn Is Unix and the Unicorn web-server. Our code now morphs to:

def compute_factors(number)
  factors = crazy_performant_computation number
end

NUMBER_OF_WORKERS = (ENV['NUMBER_OF_WORKERS'] || 50).to_i
NUMBER_OF_WORKERS.times do
  Process.fork do
    redis = Redis.new
    loop do
      val = redis.brpop "work_queue", 30
      unless val
        puts "Process: #{Process.pid} is signing off due to timeout!"
        exit 0
      end

      if val.last == "DIE!"
        puts "Process: #{Process.pid} has been asked to kill itself by parent"
        exit 0
      end

      number, postback_url = Marshal.load val.last
      RestClient.post postback_url, factors => compute_factors(number).to_json
    end
  end
end
redis = Redis.new
get "/compute_factors" do
  number, post_back_url = params[:number].to_i, params[:post_back_url]
  redis.lpush "work_queue", Marshal.dump([number, post_back_url])
  "OK"
end

`echo #{Process.pid} > /tmp/factors.pid`
puts "Parent process wrote PID to /tmp/factors.pid"

trap('QUIT') do
  NUMBER_OF_WORKERS.times do
    redis.lpush "work_queue", "DIE!"
  end
end

We have now bumped up the timeout to 30 seconds and also have in place a way to bring down the workers near instantly. This is accomplished by the web-server trapping the QUIT signal and when it does, it pushes a “DIE!” message onto the redis “work_queue”. It pushes this message the same number of times as the NUMBER_OF_WORKERS. And since BRPOP is an atomic and concurrent-safe operation we are now supporting the bringing down of workers via redis. How cool is that! To gracefully shutdown the server and workers we just need to:

kill -s QUIT `cat /tmp/factors.pid`

Conclusion

The next time you need to get some background job action going, stop yourself from just grabbing a library. Instead, toy around with redis lists a little. You’ll be surprised by how much you can accomplish with just straight redis primitives.




Published at DZone with permission of Santosh Kumar, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)