SQL on Hadoop and the support for interactive, ad-hoc queries in Hadoop is in increasing demand and all the vendors are providing their answer to these requirements. In the open source world Cloudera's Impala, Apache Drill (backed by MapR), Hortonworks's Stinger initiatives are competing in this market, just to mention a few key players. There are also strong offerings from BI and analytics vendors such as Pivotal (HAWQ), Teradata (SQL-H) or IBM (BigSQL). In this post we will cover Pivotal Hadoop Distribution (Pivotal HD) and HAWQ, Pivotal's interactive distributed SQL query engine.
Getting started with Pivotal HD
Pivotal HD contains the most well-known open source components such as HDFS, MapReduce, YARN, Hive, Pig, HBase, Flume, Sqoop and Mahout. There are also additional components available such as the Command Center, Unified Storage Services, Data Loader, Spring and HAWQ as an add-on. (Pivotal has an offering called GemFire XD which is a distributed in-memory data grid but that is out of scope for our current discussion).
Let us take an example how to use Pivotal HD to answer the following question: what was the highest price of the Apple, Google and Nokia stocks ever and when those stocks reached the peak value? First we are going to develop a MapReduce algorithm to calculate these values and then we will run SQL queries in HAWQ to get the same result. Our test environment is based on Pivotal HD Single Node virtual machine running on VMWare VMPlayer and it is using a 64-bit CentOS 6.4 distribution. Pivotal HD virtual machine does not contain Eclipse so we had to download that separately from eclipse.org. Once we have the environment set, the next step is to create a maven project.
$ mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=highest_stock_price -DartifactId=highest_stock_priceThis command will create a pom.xml where we have the basic project settings and junit added as a dependency. Then we need to edit pom.xml and add the other relevant dependencies and build settings. After that we can start writing our Hadoop application in Eclipse. The code is also uploaded to Github (https://github.com/iszegedi/Pivotal-HD-and-HAWQ-blog) for your reference.
The key Java classes are HighestStockPriceDriver.java which is the main driver file for our MapReduce application, the HighestStockPriceMapper.java which contains the map() function and the HighestStockPriceReducer.java which is running the reduce() function. Then we can compile the code and package it into a jar file:
$ mvn clean compile $ mvn -DskipTests package
The next step is to copy our data sets into a Hadoop HDFS directory.
$ hadoop fs -mkdir /stock_demo/input $ hadoop fs -put *.csv /stock_demo/input/ $ hadoop fs -ls /stock_demo/input/ Found 3 items -rw-r--r-- 1 gpadmin hadoop 403395 2013-12-31 00:25 /stock_demo/input/apple.csv -rw-r--r-- 1 gpadmin hadoop 134696 2013-12-31 00:25 /stock_demo/input/google.csv -rw-r--r-- 1 gpadmin hadoop 248405 2013-12-31 00:25 /stock_demo/input/nokia.csv
The format of the files (apple.csv, nokia.csv, google.csv) is as follows (the columns are Symbol, Date, Open, High, Low, Close, Volume, Adj Close):
$ head -5 apple.csv AAPL,2013-09-06,498.44,499.38,489.95,498.22,12788700,498.22 AAPL,2013-09-05,500.25,500.68,493.64,495.27,8402100,495.27 AAPL,2013-09-04,499.56,502.24,496.28,498.69,12299300,498.69 AAPL,2013-09-03,493.10,500.60,487.35,488.58,11854600,488.58 AAPL,2013-08-30,492.00,492.95,486.50,487.22,9724900,487.22
Now we are ready to run our MapReduce algorithm on the data sets:
$ hadoop jar target/highest_stock_price-1.0.jar highest_stock_price/HighestStockPriceDriver /stock_demo/input/ /stock_demo/output/ $ hadoop fs -cat /stock_demo/output/part* AAPL: 2012-09-19 685.76 GOOG: 2013-07-15 924.69 NOK: 2000-06-19 42.24
We can check the status of the Hadoop job using the following command:
$ hadoop job -status job_1388420266428_0001 DEPRECATED: Use of this script to execute mapred command is deprecated. Instead use the mapred command for it. 13/12/31 00:31:15 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.YarnClientImpl is inited. 13/12/31 00:31:15 INFO service.AbstractService: Service:org.apache.hadoop.yarn.client.YarnClientImpl is started. 13/12/31 00:31:17 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server Job: job_1388420266428_0001 Job File: hdfs://pivhdsne:8020/user/history/done/2013/12/31/000000/job_1388420266428_0001_conf.xml Job Tracking URL : http://localhost:19888/jobhistory/job/job_1388420266428_0001 Uber job : false Number of maps: 3 Number of reduces: 1 map() completion: 1.0 reduce() completion: 1.0 Job state: SUCCEEDED .... ....
This will show us that there were 3 mappers and 1 reducer run. It will also show the number of input and output records and bytes.
HAWQ interactive distributed query engine
The common complaints with regards to the classic Hadoop MapReduce algorithms are that they require fairly extensive Java experience and they are rather tuned for batch type of data processing, they are not really suitable for exploratory data analysis using ad-hoc interactive queries. That is where HAWQ can come to the rescue. HAWQ is a massively parallel SQL query engine. The underlying engine is based on PostgreSQL (version 8.2.15, as of writing this post) so it can support the standard SQL statements out of the box. The key architecture components are HAWQ master, HAWQ segments, HAWQ storage and HAWQ interconnect.
HAWQ master is responsible for accepting the connections from the clients and it also manages the system tables containing metadata about HAWQ itself (however, no user data is stored on the master). The master then parses and optimises the queries and develops an execution plan which is then dispatched to the segments. HAWQ segments are the processing units, they are responsible of executing the local database operations on their own data sets.
HAWQ stores all the user data in HDFS. HAWQ interconnect refers to the UDP based inter-process communication between the segments. Now let us see how we can answer the same question about stock prices that we did with our MapReduce job. First we need to login to our client (psql which is the same client that we know well from PostgeSQL databases) and create our schema and table:
$ psql psql (8.2.15) Type "help" for help. gpadmin=# create schema stock_demo; gpadmin=# create table stock_demo.stock gpadmin-# ( gpadmin(# symbol TEXT, gpadmin(# date TEXT, gpadmin(# open NUMERIC(6,2), gpadmin(# high NUMERIC(6,2), gpadmin(# low NUMERIC(6,2), gpadmin(# close NUMERIC(6,2), gpadmin(# volume INTEGER, gpadmin(# adjclose NUMERIC(6,2) gpadmin(# ) gpadmin-# with (appendonly=true) distributed randomly;The next step is to load the data into this HAWQ table, we can use the following commands to do this:
$ cat google.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';" $ cat nokia.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';" $ cat apple.csv | psql -c "COPY stock_demo.stock FROM STDIN DELIMITER E'\,' NULL E'';"Now we can login again to our psql client and run the SQL queries:
gpadmin=# select count(*) from stock_demo.stock; count ------- 14296 (1 row) gpadmin=# select symbol, date, adjclose from stock_demo.stock where adjclose in gpadmin-# ( select max(adjclose) as max_adj_close from stock_demo.stock gpadmin(# group by symbol ) gpadmin-# order by symbol; symbol | date | adjclose --------+------------+---------- AAPL | 2012-09-19 | 685.76 GOOG | 2013-07-15 | 924.69 NOK | 2000-06-19 | 42.24 (3 rows)These SQL queries relied on HAWQ internal table,thus we had to load the data into it from our local file system. HAWQ also support the notion of external tables using PXF (Pivotal eXtension Framework). It is an external table interface in HAWQ that allows to read data directly from HDFS directories. It has a concept of fragmenters, accessors and resolvers which are used to split the data files into smaller chunks and read them into HAWQ without having the need to explicitly load them into HAWQ internal tables. If we want to use external table, we need to create it using the following SQL statement:
gpadmin=# create external table stock_demo.stock_pxf gpadmin-# ( gpadmin(# symbol TEXT, gpadmin(# date TEXT, gpadmin(# open NUMERIC(6,2), gpadmin(# high NUMERIC(6,2), gpadmin(# low NUMERIC(6,2), gpadmin(# close NUMERIC(6,2), gpadmin(# volume INTEGER, gpadmin(# adjclose NUMERIC(6,2) gpadmin(# ) gpadmin-# location ('pxf://pivhdsne:50070/stock_demo/input/*.csv?Fragmenter=HdfsDataFragmenter&Accessor=TextFileAccessor&Resolver=TextResolver') gpadmin-# format 'TEXT' (delimiter = E'\,');Then we can run the same queries against the external table as before:
gpadmin=# select count(*) from stock_demo.stock_pxf; count ------- 14296 (1 row) gpadmin=# select symbol, date, adjclose from stock_demo.stock_pxf where adjclose in gpadmin-# ( select max(adjclose) as max_adj_close from stock_demo.stock_pxf gpadmin(# group by symbol ) gpadmin-# order by symbol; symbol | date | adjclose --------+------------+---------- AAPL | 2012-09-19 | 685.76 GOOG | 2013-07-15 | 924.69 NOK | 2000-06-19 | 42.24 (3 rows)