Saturday, 11 August 2012

Performance Tip - Select only required columns from HBase

HBase is column-oriented database and allows you to select only the required columns or columns family using Scan objects. For a query on some specific columns, choose only those columns in the scan object to get better performance. Performance is better as the less data needs to be read from the disks and could be read faster.

To set the specific column/column family use the following methods of Scan class.

Scan scan = new Scan();
//Set the specific column name of a column family
//scan.addColumn(Bytes.toBytes(<family>), Bytes.toBytes(<qualifier>));
scan.addColumn(Bytes.toBytes("data"), Bytes.toBytes("firstName"));

//Or set the complete column family
//scan.addFamily(Bytes.toBytes(<family>));
//scan.addFamily(Bytes.toBytes("data"));

Also select only the required rows by using the setStartRow and setStopRow methods of the scan object.

By selecting only the required columns; processing of 1 Billion records of my previous blog could be much faster.

How to calculate the Compression in HBase?

The compression in HBase can really reduce the storage requirements, as the row-key, column-family and the column names repeat for each column of a record. There are many algorithm for compression including LZO, GZIP and SNAPPY. The choice really depends on the compression level and performance of the algorithm. Please look the following link to setup and configure the Compression in HBase.

http://hbase.apache.org/book/compression.html

After setting up the compression, the following formula could be used to get a approximate percentage size reduced by the compression:

100 - StoreFileSize *100 / ((avgKeyLen+avgValueLen+8)*entries)

StoreFileSize: Is the size of the file on the HDFS
avgKeyLen,avgValueLen and entries are output from HFile tool

Use the following commands to check the regions in the table.
hadoop fs -lsr /hbase/<table name>

Now choose a store file with larger size, as it would have more records and check the size.
hadoop fs -lsr /hbase/<table name>/<region>/data/<store file>
-rw-r--r-- 1 prafkum supergroup 25855018 2012-08-11 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/880ababfda4643f59956b90da2dc3d3f


Also run the HFile tool utility on the store file. Please view my previous blog for details.
hbase org.apache.hadoop.hbase.io.hfile.HFile -v -m -f /hbase/<table name>/<region>/data/<store file>

avgKeyLen=29,
avgValueLen=4,
entries=1146564

Using above formula to calculate the compression ratio:

100 - 25855018*100/((29+4+8)*1146564) = 45% compression

Please note that the compression computation is not accurate as the store file also have header along with key and values of the columns. Also the compression could be different in different store files. But of-course you get a good clue of the compression ratio.


Comparison of RCFile and Column-oriented database

This blog tries to list the differences and synergies of the RCFile format of Hadoop and Column-oriented database (like Cassandra, HBase, Hypertable). What parameters should be considered to decide? I am trying to put my thoughts and comments are welcome to add the other factors.

  • Static vs Dynamic Data: The application data could be static(append only) and might never changes like the events from the web server and other fact data. On the other hand the application data could be dynamic and needs to keep the latest status like User profiles, bank transactions etc. For Dynamic data we must go for the column-oriented databases as it constantly needs to updated/deleted BUT for static data the RCFile format seems a better fit (of-course based on the other use cases also)
  • Point Query: Point query needs to retrieve the exact records on the request from application. The Point queries are easily possible with Column-oriented database as they keep the data sorted based on the Row-key and allows the user to search the specific row with the row-key. The Point Queries could be a challenge with RCFile, as it needs to be start a MapReduce job to find a specific row.
  • Range Query:  Range Query is to retrieve all the records with some matching criteria e.g. "All users in a specific group". Range query are easily possible both with RCFile format and with Column-oriented databases. The performance is an important factor to decided based on the use-cases for the range query.
  • Storage Requirements: The storage is important factor and the storage requirements should be calculated for all column-oriented options. Different compression algorithm should be used to find the best.
  • Performance: Based on the application needs, it could be most critical requirement. So needs to be tested with different options and evaluate based on the different parameters e.g. compression.The performance should be faster if the query require less columns compare to all columns.
  • Others: There are many features provided by column-oriented databases like Multiple Versions, Column Families, and storing the Binary data and should be considered before taking a final decision.

This is still work in process and the comments are highly welcome..

Friday, 10 August 2012

Column-oriented storage in Hadoop using RCFile

Hadoop stores the large amount of data on the distributed cluster using HDFS. Normally its stored in the CSV file format with some delimiter. For example the logs of the web server or the csv exports from the RDBMS systems. To process the large files with huge data the MapReduce paradigm is used and it can really scale with massive parallel-processing of the cluster.

During the processing all the record including all the columns are read and processed, BUT what if I need to process only few columns out of many? For example if I would like to get the sum of a specific column, why all the columns should be read and impact the Disk IO?

There are column oriented databases like Cassandra, HBase, Hypertable and Vertica. For details of Column-oriented storage see the wiki page.

There are many advantages of using the column-oriented database; but here we see the storing of the data in Hadoop file system in column-oriented format using RCFile.

RCFile (Record Columnar File) format can partition the data horizontally(rows) and vertically(columns) and allows to fetch only the specific columns during the processing and avoid the Disk IO penalty with all the columns.

The simplest way to create the RCFile format is using the Hive as follows:

CREATE TABLE USER_RC(
  id int,
  name string,
  age int,
  manager string,
  salary int
)
STORED AS RCFILE;


To store the data in this table in RCFile format, follow the following steps:

CREATE TABLE USER_TEXT(
  id int,
  name string,
  age int,
  manager string,
  salary int
)
ROW FORMAT DELIMITED fields terminated by ',';

load data local inpath '/tmp/user.txt' into table USER_TEXT;

INSERT OVERWRITE table USER_RC SELECT * from USER_TEXT;

Now run the hive query to sum the salary using the query from both the table:

select sum(salary) from USER_TEXT;
select sum(salary) from USER_RC:

It starts the map reduce job and watch the HDFS_BYTES_READ parameter to see the difference of the bytes read from the HDFS. You can see the huge difference of the data read; as the RCFile is reading only the salary column and the text format is reading the complete data to execute the query.

For example the following file in text format (/tmp/user.txt):

1,Charry,30,Joe,878
2,Roy,31,Joe,879
3,Robert,32,Joe,880
4,Barry,33,Joe,881

would be stored in RCFile fomat as follows and reads only the last row and skips all other data.
1,2,3,4
Charry,Roy,Robert,Barry
30,31,32,33
Joe,Joe,Joe,Joe
878,879,880,881

To confirm this format the following shows the browse of the file /user/hive/warehouse/user_rc/00000_0 in HDFS:


Thursday, 9 August 2012

How to verify the record size of HBase?

From my previous blog "How to calculate the record size of HBase?" its easy to calculate the record size of HBase and estimate the storage requirements. But how to verify it during the testing phase and in production environment?

HFile tool of HBase could be used to find the average key size, average value size and number of records per store file in HDFS. It can also be used to see the actual records in the store file.

To use if first browse the HDFS in path "hadoop fs -lsr /hbase/<table name>" and find the store file of the table as follows for 'employee' table:

 Prafulls-MacBook-Pro:~ prafkum$ hadoop fs -lsr /hbase/employee  
 -rw-r--r--  1 prafkum supergroup    521 2012-08-10 06:22 /hbase/employee/.tableinfo.0000000001  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:22 /hbase/employee/.tmp  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:22 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/.oldlogs  
 -rw-r--r--  1 prafkum supergroup    124 2012-08-10 06:22 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/.oldlogs/hlog.1344559953739  
 -rw-r--r--  1 prafkum supergroup    231 2012-08-10 06:22 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/.regioninfo  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/.tmp  
 drwxr-xr-x  - prafkum supergroup     0 2012-08-10 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data  
 -rw-r--r--  1 prafkum supergroup    722 2012-08-10 06:24 /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f  

Look for the files in "data" directory and choose any one e.g. /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f in the above output.

Now use the HFile tool as follows on store file. Please use only the -m option to print the meta data. Use another options like -p to print the actual content of file and -s for statistics. (Don't use them in production as the data might be huge in store file)
 Prafulls-MacBook-Pro:bin prafkum$ hbase org.apache.hadoop.hbase.io.hfile.HFile -v -p -s -m -f /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f  
 Scanning -> /hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f  
 12/08/10 06:32:42 INFO hfile.CacheConfig: Allocating LruBlockCache with maximum size 246.9m  
 K: row1/data:employeeId/1344560028049/Put/vlen=3 V: 123  
 K: row1/data:firstName/1344560042111/Put/vlen=3 V: Joe  
 K: row1/data:lastName/1344560058448/Put/vlen=6 V: Robert  
 Block index size as per heapsize: 416  
 reader=/hbase/employee/9c21ea1c93d5c8f96f84c888bbcf23e7/data/770ababfda4643f59956b90da2dc3d3f,  
   compression=none,  
   cacheConf=CacheConfig:enabled [cacheDataOnRead=true] [cacheDataOnWrite=false] [cacheIndexesOnWrite=false] [cacheBloomsOnWrite=false] [cacheEvictOnClose=false] [cacheCompressed=false],  
   firstKey=row1/data:employeeId/1344560028049/Put,  
   lastKey=row1/data:lastName/1344560058448/Put,  
   avgKeyLen=29,  
   avgValueLen=4,  
   entries=3,  
   length=722  
 Trailer:  
   fileinfoOffset=241,  
   loadOnOpenDataOffset=150,  
   dataIndexCount=1,  
   metaIndexCount=0,  
   totalUncomressedBytes=655,  
   entryCount=3,  
   compressionCodec=NONE,  
   uncompressedDataIndexSize=43,  
   numDataIndexLevels=1,  
   firstDataBlockOffset=0,  
   lastDataBlockOffset=0,  
   comparatorClassName=org.apache.hadoop.hbase.KeyValue$KeyComparator,  
   version=2  
 Fileinfo:  
   KEY_VALUE_VERSION = \x00\x00\x00\x01  
   MAJOR_COMPACTION_KEY = \x00  
   MAX_MEMSTORE_TS_KEY = \x00\x00\x00\x00\x00\x00\x00\x00  
   MAX_SEQ_ID_KEY = 10  
   TIMERANGE = 1344560028049....1344560058448  
   hfile.AVG_KEY_LEN = 29  
   hfile.AVG_VALUE_LEN = 4  
   hfile.LASTKEY = \x00\x04row1\x04datalastName\x00\x00\x019\x0E\x06PP\x04  
 Mid-key: \x00\x04row1\x04dataemployeeId\x00\x00\x019\x0E\x05\xD9\x91\x04  
 Bloom filter:  
   Not present  
 Stats:  
 Key length: count: 3     min: 28     max: 30     mean: 29.0  
 Val length: count: 3     min: 3     max: 6     mean: 4.0  
 Row size (bytes): count: 1     min: 123     max: 123     mean: 123.0  
 Row size (columns): count: 1     min: 3     max: 3     mean: 3.0  
 Key of biggest row: row1  
 Scanned kv count -> 3  

The above output could be used to validate the record size using formula
(8+avgKeyLen+avgValueLen)*columns per record

So for the above output
(8+29+4)*3=123 

and its equal to our calculation in the previous blog...

Please note that the result is approximate as its calculated based on the average values. Also the first parameter 8 is actually the "Key Length" and "Value Length" of 4 Byte each as described in the last blog.