Thursday, March 21, 2013

Information is the Oil of 21st century


Friday, August 31, 2012

Extending Hive : Custom Mapper & Reducer

Hive : A data warehouse system on top of hadoop for adhoc query.

  • Query get converted into map & reduce tasks and run in parallel on number of nodes and bring back result quickly from any kind of massive data. 
  • Hive also allow us to extend and write our implementation for map & reduce job. These can be done with scripts written in any programming language and then used as hive-functions.
  • Extending Mapper : By default hive support few inputFormat where every line is  separated by '\n' and fields are separated by ',' or '|' etc. But when data is not in a format which hive can understand, we can write logic to parse data in extending mapper class and then emit key-values pairs.  Hive provides us to extend GenericUDTF (User Defined Transform Functionclass for this purpose. 
  • Extending Reducer : Same way we can derive more results in custom reducer by extending GenericUDAF (User Defined Aggregate Function). Results can be filtered and passed to multiple/different tables.
These can be very useful when you need to analyse or query on set of data stored in very different format/unstructured. Extending mapper can be helpful to bring those in common key-value pair form and Extending reducer can again split or produce different set of data from map task results.

Thank You

What is Apache Sqoop?

Apache Sqoop is a tool to bulk import/export data into Hadoop ecosystem (HDFS, HBase or Hive).

  • works with number of databases and commercial data-warehouses.
  • available as command line tool, can be used in java with passing appropriate arguments.
  • graduated from the Incubator & became top-level-project in ASF.
Current version of Sqoop does a map-only job with all the transformation happen in map task. ( Sqoop2 will possibly have reduce task as well)
Fig : Sqoop 2 Architecture diagram taken from Cloudera.com
Example : 
alok@ubuntu:~/apache/sqoop-1.4.2$ bin/sqoop import --connect jdbc:mysql://<hostname>:3306/<dbname> --username <user> -P --driver com.mysql.jdbc.Driver --table <tablename> --hbase-table <hbase-tablename> --column-family <hbase-columnFamily> --hbase-create-table

or use it like this in your java programs - 
ArrayList<String> args = new ArrayList<String>();
args.add("--connect");
args.add("jdbc:mysql://<hostname>:3306/<dbname>");
args.add("--username");
args.add("<user>");
args.add("--driver");
args.add("com.mysql.jdbc.Driver");
args.add("--table");
args.add("<tablename>");
args.add("--hbase-table");
args.add("<hbase-tablename>");
args.add("--column-family");
args.add("<hbase-colFamilyName>");
args.add("--hbase-create-table");
args.add("--num-mappers");
args.add("2");

int ret = Sqoop.runTool(args.toArray(new String[args.size()]));
  • Sqoop can write data directly to HDFS or HBase or Hive.
  • It can also export data back to RDBMS tables from Hadoop.
  • Sqoop integrates with Oozie, allowing you to schedule and automate import and export tasks.


Monday, July 23, 2012

How to : working with HBase Coprocessor

HBase Coprocessor : It allows user code to get executed at each region(for a table) in region server. Clients only get the final responses from every region. HBase provides AggregateProtocol to support common aggregation (sum,avg,min,max,std) functionality.

Coprocessor framework is divided into : Endpoint : It allows you to write your own pluggable class which extends BaseEndpointCoprocessor and can have any number of methods which you want to be executed at table region server. Method executes much faster at regionserver and minimizes the network load as only results get transmitted to the client. Client need to do the final reduction on results returned by each region server.

Example : Below example illustrates just call to HBase coprocessor, A separate 'GroupByAggregationProtocol' interface extending 'CoprocessorProtocol' with methods required and Actual implementing class which implements 'GroupByAggregationProtocol' and extends 'BaseEndpointCoprocessor' must be created and deployed in each regionserver.
Map<byte[], Map<String, List<Long>>> resultFromCoprocessor = table
        .coprocessorExec(GroupByAggregationProtocol.class,
        <start-RowKey>,  // byte array or can be null
        <end-Rowkey>,   // byte array or can be null
        new Batch.Call<GroupByAggregationProtocol,  Map<String, List<Long>>>() {
             @Override
             public Map<String, List<Long>> call(GroupByAggregationProtocol aggregation)  throws IOException {
                return aggregation.getGroupBySum(filterList, scan);
             }
});
for (Map.Entry<byte[], Map<String, List<Long>>> entry : resultFromCoprocessor
 .entrySet()) {
 Map<String, List<Long>> en = entry.getValue();
 // Iterate through results from each regionserver   ......
       }    
}
Endpoint Coprocessors can be assumed as stored procedure in RDBMS.
Observers : It provides a hook to override few default methods of HBase when a event occurs.
It can be at three sub-levels
a) RegionObserver : handles/override Get, Put, Delete, Scan, and so on. It can be of type pre or post (eg : preGet, postDelete etc.)
b) MasterObserver : handles table creation, deletion and alter events. eg : preCreateTable or postCreateTable.
c) WALObserver : handles write-ahead log creation events.
eg : preWALWrite or postWALWrite .

Observer Coprocessors can be assumed as triggers in RDBMS.