Big Data/Analytics Zone is brought to you in partnership with:

Stacey Schneider is focused on helping evangelize how cloud technologies are transforming application development and delivery for VMware. Prior to its acquisition, Stacey led marketing and community management for application management software provider Hyperic, now a part of VMware’s management portfolio. Before her work in the cloud, she also held various technical leadership positions at CRM software pioneer Siebel Systems, including Director of Technology Product Marketing, managing the Technology Competency in Europe, and the Globalization professional services practice. She was also a part of Siebel’s Nexus project, which focused on building portable web applications that could be deployed across java application servers as well as .NET. Stacey is also the managing principal of SiliconSpark, a consulting agency that has helped over 12 software companies go to market on the web and across the cloud over the past 4 years. Stacey has posted 39 posts at DZone. You can read more from them at their website. View Full User Profile

Map-Reduce Style: Data Aware Distributed Querying in vFabric GemFire

11.28.2012
| 1846 views |
  • submit to reddit

Big, fast data is powering some of the most interesting computing opportunities in today’s market. But in order to get there, we need to change our approach to the data tier. Enterprises are trying to move from costly mainframe architectures to virtualized datacenters and utilize commodity hardware more efficiently. With the data tier, this means an architecture that scales horizontally by adding more commodity-based computing and storage at runtime.

To scale the data tier horizontally, companies use systems like vFabric GemFire, a distributed data system that is designed to specifically accommodate large data sets across commodity hardware nodes. In GemFire, data is spread across members of a cluster with members referred to as “nodes,” and the distribution of data across those nodes is called “partitioning.” vFabric GemFire then allows developers to query the data that resides across many nodes while retaining core values of very high performance at scale. How? In short, the answer is “Data Aware Querying” – a query API that allows a query to execute on selective nodes instead of all nodes in a map-reduce style.

To answer this question, this article covers the following:

  • Understanding Data Partitioning
  • Understanding Basic Data Querying
  • Using Custom Partitioning to Achieve Data-Aware Querying
  • Implementing Function Execution with Custom Partitioning

Understanding Data Partitioning

First, we should understand how data is mapped out in order to understand how we can store and access a lot of it quickly and in dynamic ways.

GemFire partitions data using keys, and, hence, a subset of keys and corresponding values are stored on a single node. This approach facilitates concurrent access to a large data set with very high throughput without affecting store/access latency in a cluster of nodes. A key is unique entity making store/access an O(1) operation (i.e. something that always takes the same amount of time and does not depend on the size of the input) and allows for the storage of duplicate values. Also, a key can either be an independent entity like a sequence number or be composed of references to multiple attributes in a value, letting the partitioning to be based on a composite key.

Partitioning data can increase query performance because it uses a partial scan of a large data set and avoids using a full data store scan or multiple random reads scattered across the whole data store.

Within GemFire, data is partitioned using a PartitionRegion. One partition, or node, consists of multiple buckets that are configured at startup. Buckets are distributed across multiple nodes deterministically based on the key. To add one additional piece of background information on buckets, they are the smallest unit of data in a partitioned region that can be moved from one partition (JVM) to another during rebalancing.

Understanding Basic Data Querying

GemFire provides a modern way to query the distributed data. A query is executed in a scatter-gather fashion – starting from a coordinator node and gathering results from other concerned nodes to the coordinator, and, finally, providing results to the application. All nodes where query is executed are considered data nodes, and the first node, which starts a query (or receives it from a client), becomes the coordinator. This allows the query to run in parallel on concerned data nodes and gather results on the coordinator node for final processing. For example, the coordinator for an ORDER BY query performs the merge sort of ordered result-sets chunks.

Before we get more advanced, let’s begin with a basic example. Again, GemFire distributes the data using keys in a key-value pair. Querying this data involves the usage of a SQL-like query language known as Object Query Language, or OQL. Without using any special partitioning in GemFire (as discussed later), keys end up having no relation with value. OQL queries are executed on values without specifying the distribution of data across nodes (i.e. the scatter phase). Without specifying distribution, all nodes must be queried. This is both inefficient and expensive to do across the network.

For an example, let’s say we have a Passenger object with a Flight field.

Passenger {

String name,
Date travelDate,
int age,
Flight flt,

}
Flight {

int flightId,
String origin,
String dest,

}

Let’s say 100 million Passenger objects are stored in the “Travelers” datastore (i.e. a datastore is called “Region” in GemFire), which is partitioned across 3 nodes, and we want to run the following query for all Passengers in the Region.

"SELECT p.travelDate, p.age,

FROM /Travelers p, p.flt f

WHERE

f.origin IN ('Boston', 'Chicago')

AND f.dest = 'Seattle'

AND p.age < 35"

(Note: The above, example query can be used by an airline to determine what movies to offer on flights from Boston OR Chicago to Seattle based on Passenger age being under 35.  The idea is that young travelers or families with children will want more family oriented movies, while adults may want news or recent episodes of a popular television series.)

This query would basically create a full table scan of 100 million records, which is highly inefficient. While GemFire supports the creation of indexes, we are omitting the discussion of indexes here to clarify the improvement due to data-aware partitioning alone.

Using Custom Partitioning to Achieve Data-aware Querying

Logically, a query will be more efficient if it is targeted to a specific location. Custom-partitioning or fixed partitioning (also, sometimes referred to as “column-based partitioning” in relational database terms) is a GemFire option used to deterministically distribute data. In GemFire 6.6.2, we can query the distributed (i.e. partitioned) data based on a column in a selective manner.

Using the same example as above, all Passenger data is partitioned across multiple GemFire nodes. In the Passenger object, Flight has an “origin” field. Data can be partitioned to certain buckets (i.e. sections within a partition) based on the origin city if we make it part of the key. This means the routing of Passenger data to a particular node would be based on “origin” provided in the Flight field. So, within a partition, only certain buckets would be queried instead of many nodes, a single node, or a partition. So, there would NOT be 100 million Passenger objects to iterate over. With a data-aware querying set up, the above query is executed to a limited data set. If we assume there are 100 million passengers, 50% of Passengers are below 35, there are 9 total origination cities, and no indexes exist, GemFire’s query engine iterates over ~11 million [100 * (50%) * (2/9) =11.11] passenger objects.

To custom-partition the data an application developer has to implement the PartitionResolver to plug-in their partition strategy for GemFire. A PartitionResolver might look like as follows,

/**
* This resolver stores all Passengers based on their location in one bucket.
* The region is configured to contain 9 buckets.
*
*/
public class MyPartitionResolver implements PartitionResolver {

//Know no of buckets in the partition region which is configurable for a partition region.

//9 different locations possible.

Map keyToRoutingObject = new HashMap();
keyToRoutingObject.put("Seattle", 8);
keyToRoutingObject.put("Chicago", 4);

  @Override
public Serializable getRoutingObject(EntryOperation opDetails) {
- - - - - -
//opDetails.getKey() returns key, used in region.put(key, value);
return keyToRoutingObject.get(opDetails.getKey().getOrigin()); //Could be "seq_num+origin"
}
}

All Passengers having same origin in Flight field will be routed to the same bucket on the same node as shown in the diagram below.

GemFire Function Execution Service

Implementing Function Execution with Custom Partitioning

GemFire’s Function Execution Service can then be used on this partitioned data to achieve a map-reduce way of operating on distributed data and query data where it is located. This is known as data-aware querying. The Function Execution Service task can be executed on a particular node or set of nodes. Functions are dropped on filtered nodes (in above diagram, Partition B for “Chicago” and Partition C for “Seattle”) and execute the code locally on each node. Query execution also happens ONLY locally using the new API. No remote or distributed querying is performed on a node. The difference between querying without function context and with function context is that in former case, the query runs all local buckets but in later it only runs on Buckets C and S.

To query through the new Query API inside a Function:

//EmpFunction
Class EmpFunction extends FunctionAdapter {

- - - - -
void execute(FunctionContext context) {

- - - - -
Query query = new Query(context.getArguments());
SelectResults results = query.execute(context); //New API
- - - - -

}
- - - - -

}

Execute above Function as follows in your Application code to execute the query:

// Application Client code.

Set filter = new HashSet();
filter.add("Seattle");
filter.add("Chicago");

Function empFunc = new EmpFunction("NAZFunction");
//Execute Function
ResultCollector rColl = FunctionService

.onRegion(getRegion("employee"))
.withArgs(query)
.withFilter(filter)

.execute(empFunc);

//Get Results
Object result = rColl.getResults();
SelectResults queryResults = getResults(result);

This approach provides a sophisticated way to effectively query distributed data while retaining very predictable performance.

About the Author: Shobhit Agarwal is a member of VMware’s Technical Staff, working on high-availability, low-latency, in-memory data management systems for virtual environments for the past two years. Agarwal graduated from Northeastern University with a MS in Computer Science specializing in Systems Engineering. His specialties include java development, distributed systems and data structures.

Published at DZone with permission of its author, Stacey Schneider. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)