Big Data/Analytics Zone is brought to you in partnership with:

Scott is a Senior Software Architect at Altamira Corporation. He has been developing enterprise and web applications for over 15 years professionally, and has developed applications using Java, Ruby/Rails, Groovy/Grails and Python. His main areas of interest include object-oriented design, system architecture, testing, and frameworks of all types including Spring, Hibernate, Ruby on Rails, Grails, and Django. In addition, Scott enjoys learning new languages to make himself a better and more well-rounded developer a la The Pragmatic Programmers' advice to "learn one language per year." Scott is a DZone MVB and is not an employee of DZone and has posted 43 posts at DZone. You can read more from them at their website. View Full User Profile

Distributed Lock using Zookeeper

01.08.2013
| 3201 views |
  • submit to reddit

This article is by Stephen Mouring, Jr.

On my project we have a number of software components that run concurrently, some on a cron, and some as part of our build process. Many of these components need to mutate data in our data store and have the possibility of conflicting with one another. What is worse is that many of these processes run on separate machines making language level or even file system level synchronization impossible. 

Zookeeper is a natural solution to the problem. It is a distributed system for, among other things, managing coordination across a cluster of machines. Zookeeper manages information as a hierarchical system of "nodes" (much like a file system). Each node can contain data or can contain child nodes. 

Zookeeper supports several types of nodes. A node can be either "ephemeral" or "persistent" meaning it is either deleted when the process that created it ends or it remains until manually deleted. A node can also be "sequential" meaning each time a node is created with a given name, a sequence number is postfixed to that name. This allows you to create a series of nodes with the same name that are ordered in the same order they were created. 

To solved our problem we need to have a locking mechanism that works across processes and across machines that allows one holder of the lock to execute at a given time. Below is the Java code we wrote to solve the problem. I will go through it step by step. 

public class DistributedLock {
private final ZooKeeper zk;
private final String lockBasePath;
private final String lockName;
private String lockPath;
public DistributedLock(ZooKeeper zk, String lockBasePath, String lockName) {
this.zk = zk;
this.lockBasePath = lockBasePath;
this.lockName = lockName;
}
public void lock() throws IOException {
try {
// lockPath will be different than (lockBasePath + "/" + lockName) becuase of the sequence number ZooKeeper appends
lockPath = zk.create(lockBasePath + "/" + lockName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
final Object lock = new Object();
synchronized(lock) {
while(true) {
List<String> nodes = zk.getChildren(lockBasePath, new Watch() {
@Override
public void process(WatchedEvent event) {
synchronized (lock) {
lock.notifyAll();
}
}
});
Collections.sort(nodes); // ZooKeeper node names can be sorted lexographically
if (lockPath.endsWith(nodes.get(0)) {
return;
} else {
lock.wait();
}
}
}
} catch (KeeperException e) {
throw new IOException (e);
} catch (InterruptedException e) {
throw new IOException (e);
}
}
public void unlock() throws IOException {
try {
zk.delete(lockPath, -1);
lockPath = null;
} catch (KeeperException e) {
throw new IOException (e);
} catch (InterruptedException e) {
throw new IOException (e);
}
}
}

(Disclaimer: Credit for this code goes to Aaron McCurry for developing the core mechanism of this lock as well as the design for using ZooKeeper. Kudos to Aaron!) 

Each process that wants to use the lock should instantiate an object of the DistributedLock class. The DistributedLock constructor takes three parameters. The first parameter is a reference to the ZooKeeper client. The second parameter is the "base path" where you want your lock nodes to reside in. Remember that ZooKeeper stores its nodes like a file system, so think of this base path as the directory you want your lock nodes created in. The third parameter is the name of the lock to use. Note you should use the same lock name for every process that you want to share the same lock. The lock name is the common reference that multiple processes lock on. 

Note: This class can support multiple locks if you use a different lock name for each lock you want to create. Say you have two data stores (A and B). You have several processes that need mutate A and B. You could use two different lock names (say LockA and LockB) to represent the locks for each data store. Any process that needs to mutate data store A could create a DistributedLock with a lockname of LockA. Likewise, any process that needs to mutate data store B could create a DistributedLock with a lockname of LockB. A proces that needs to mutate both datastores would create two DistributedLock objects (one with lock name of LockA and one with a lock name of LockB). 

Once your process has created a DistributedLock object it can then call the lock() method to attempt to acquire the lock. The lock() method will block until the lock is acquired. 

 // lockPath will be different than (lockBasePath + "/" + lockName) becuase of the sequence number ZooKeeper appends
lockPath = zk.create(lockBasePath + "/" + lockName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

First of all, the lock() method creates a node in ZooKeeper to represent its "position in line" waiting for the lock. The node created is EPHEMERAL which means if our process dies for some reason, its lock or request for the lock with automatically disappear thanks to ZooKeeper's node management, so we do not have worry about timing out nodes or cleaning up stale nodes. 

 final Object lock = new Object();

 synchronized(lock) {
while(true) {
List<String> nodes = zk.getChildren(lockBasePath, new Watch() {
@Override
public void process(WatchedEvent event) {
synchronized (lock) {
lock.notifyAll();
}
}
});




// Sequential ZooKeeper node names can be sorted lexographically! 
Collections.sort(nodes); 




// Are we the "topmost" node? (The node with the lowest sequence number that is.)
if (lockPath.endsWith(nodes.get(0)) {
return;
} else {
lock.wait();
}
}
}

To understand the code above you need to understand how ZooKeeper works. ZooKeeper operates through a system of callbacks. When you call getChildren() you can pass in a "watcher" that will get called anytime the list of children changes.

The gist of what we are doing here is this. We are creating an ordered list of nodes (sharing the same name). Whenever the list changes, every process that has registered a node is notified. Since the nodes are ordered, one node will be "on top" or in other words have the lowest sequence number. That node is the node that owns the lock. When a process detects that its node is the top most node, it proceeds to execute. When it is finished, it deletes its node, triggering a notification to all other processes who then determine who the next node is who has the lock. 

The tricky part of the code from a Java perspective is the use of nested synchronized blocks. The nested synchronization structure is used to ensure that the DistributedLock is able to process every update it gets from ZooKeeper and does not "lose" an update if two or more updates come from ZooKeeper in quick succession.

The inner synchronized block in the Watcher method is called from an outside thread whenever ZooKeeper reports a change to its children. Since the Watcher callback is in a synchronized block keyed to the same Java lock object as the outer synchronized block, it means that the update from ZooKeeper cannot be processed until the contents of the outer synchronized block is finished. In other words, when an update comes in from ZooKeeper, it fires a notifyAll() which wakes up the loop in the lock() method. That lock method gets the updated children and sets a new Watcher. (Watchers have to be reset once they fire as they are not a perpetual callback. They fire once and then disappear.) If the newly reset Watcher fires before the rest of the loop executes, it will block because it is synchronized on the same Java lock object as the loop. The loop finishes its pass, and if it has not acquired the distrubted lock, it waits on the Java lock object. This frees the Watcher to execute whenever a new update comes, repeating the cycle.

Once the lock() method returns, it means your process has the dsitributed lock and can continue to execute its business logic. Once it is complete it can release the lock by calling the unlock() method. 

public void unlock() throws IOException {
try {
zk.delete(lockPath, -1);
lockPath = null;
} catch (KeeperException e) {
throw new IOException (e);
} catch (InterruptedException e) {
throw new IOException (e);
}
}

All unlock() does is explictly delete this process's node which notifies all the other waiting processes and allows the next one in line to go. Because the nodes are EPHEMERAL, the process can exit without unlocking and ZooKeeper will eventually reap its node allowing the next process to execute. This is a good thing because it means if your process ends prematurely without you having a chance to call unlock() it will not block the remaining processes. Note that it is best to explicitly call unlock() if you can, because it is much faster than waiting for ZooKeeper to reap your node. You will delay the other processes less if you explicity unlock. 

Published at DZone with permission of Scott Leberknight, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)