Статьи

Работа с HBase и Hadoop

HBase — это база данных NoSQL. Он основан на распределенной системе хранения Google Bigtable — как описано в исследовательской работе Google; «Bigtable — это разреженная, распределенная, постоянная многомерная отсортированная карта. Карта индексируется ключом строки, ключом столбца и отметкой времени; каждое значение на карте представляет собой не интерпретированный массив байтов ». Если вы хотите получить подробное объяснение того, что означает каждое слово в этом страшном определении, я предлагаю проверить этот пост .

HBase поддерживает масштабирование, выходящее далеко за рамки традиционных возможностей СУБД, поддерживает автоматическое разбиение и широкие возможности параллельной обработки с помощью Mapreduce. HBase построен на основе HDFS и обеспечивает быстрый поиск больших записей. Подробнее об архитектуре HBase читайте здесь .

HBase может использоваться как источник данных, а также приемник данных для заданий Mapreduce. Наш пример в этом посте будет использовать HBase в качестве приемника данных. Если вас интересуют другие примеры, взгляните на Hadoop wiki, HBase как источник данных и приемник MapRedude .

HBase распределенное хранилище для информации о цене акций

Пример будет обрабатывать цены на акции Apple, загруженные с финансового сайта Yahoo , это тот же набор данных — цены на акции Apple, который мы использовали ранее для демонстрации возможностей Hive на Amazon Elastic MapReduce . Он хранится в контейнере AWS S3, который называется stockprice. Задание MapReduce будет извлекать файл оттуда с помощью s3n: // Идентификатор ключа доступа AWS: секретный ключ доступа AWS // bucket / object  url и сохранит вывод в таблицу HBase с именем aapl_marketdata . Тестовая среда была основана на Hadoop-0.20.2 и HBase-0.90.6.

 Перед запуском задания MapReduce необходимо создать таблицу

$ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 0.90.6, r1295128, Wed Feb 29 14:29:21 UTC 2012

hbase(main):005:0> create 'aapl_marketdata', 'marketdata'
0 row(s) in 1.4290 seconds

hbase(main):001:0> list
TABLE
aapl_marketdata
1 row(s) in 0.3950 seconds

Now we are ready to run the MapReduce job. It is advisable to have a driver script to run your job and set all the required arguments in there for easier configuration but in essence it is just a plain old java code.

My script looks like this:

$ cat hb.sh
java -classpath /home/ec2-user/hadoop/hadoop-0.20.2-ant.jar:/home/ec2-user/hadoop/hadoop-0.20.2-core.jar:/home/ec2-user/hadoop/hadoop-0.20.2-tools.jar:/home/ec2-user/hadoop/lib/jets3t-0.6.1.jar:/home/ec2-user/aws-java-sdk-1.3.11/aws-java-sdk-1.3.11.jar:/home/ec2-user/hbase/hbase-0.90.6.jar:/home/ec2-user/hbase/lib/commons-codec-1.4.jar:/home/ec2-user/hbase/lib/commons-httpclient-3.1.jar:/home/ec2-user/hbase/lib/commons-cli-1.2.jar:/home/ec2-user/hbase/lib/commons-logging-1.1.1.jar:/home/ec2-user/hbase/lib/zookeeper-3.3.2.jar:/home/ec2-user/hbase/lib/log4j-1.2.16.jar:json_io_1.0.4.jar:awsdemo-hbase.jar:/home/ec2-user/core-site.xml org.awsdemo.hbase.MarketDataApplication s3n://AWSAccessKeyId:AWSSecretAccessKey@stockprice/apple/input/APPL_StockPrices.csv s3n://AWSAccessKeyId:AWSSecretAccessKey@stockprice/apple/output/

Once the MapReduce job was successfully finished, we can check the result in HBase table using bin/hbase shell.

hbase(main):001:0> get ‘table_name’, ‘rowkey’

e.g. hbase(main):001:0> get ‘aapl_marketdata’, ‘AAPL-1984-10-25′

hbase(main):001:0> get 'aapl_marketdata', 'AAPL-1984-10-25'
COLUMN                                  CELL
 marketdata:daily                       timestamp=1341590928097, value={"@type":"org.apache.hadoop.io.MapWritable","@keys":[{"@type":"org.apache.hadoop.io
                                        .Text","bytes":[115,116,111,99,107,83,121,109,98,111,108],"length":11},{"@type":"org.apache.hadoop.io.Text","bytes
                                        ":[115,116,111,99,107,80,114,105,99,101,76,111,119],"length":13},{"@type":"org.apache.hadoop.io.Text","bytes":[115
                                        ,116,111,99,107,80,114,105,99,101,79,112,101,110],"length":14},{"@type":"org.apache.hadoop.io.Text","bytes":[100,9
                                        7,116,101],"length":4},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,80,114,105,99,101,67,108,1
                                        11,115,101],"length":15},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,80,114,105,99,101,65,100
                                        ,106,67,108,111,115,101],"length":18},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,86,111,108,
                                        117,109,101],"length":11},{"@type":"org.apache.hadoop.io.Text","bytes":[115,116,111,99,107,80,114,105,99,101,72,10
                                        5,103,104],"length":14}],"@items":[{"@type":"org.apache.hadoop.io.Text","bytes":[65,65,80,76],"length":4},{"@type"
                                        :"org.apache.hadoop.io.Text","bytes":[50,53,46,50,53],"length":5},{"@type":"org.apache.hadoop.io.Text","bytes":[50
                                        ,54,46,50,53],"length":5},{"@type":"org.apache.hadoop.io.Text","bytes":[49,57,56,52,45,49,48,45,50,53],"length":10
                                        },{"@type":"org.apache.hadoop.io.Text","bytes":[50,53,46,50,53],"length":5},{"@type":"org.apache.hadoop.io.Text","
                                        bytes":[50,46,56,56],"length":4},{"@type":"org.apache.hadoop.io.Text","bytes":[53,54,55,54,48,48,48],"length":7},{
                                        "@type":"org.apache.hadoop.io.Text","bytes":[50,54,46,50,53],"length":5}]}
1 row(s) in 0.4140 seconds

The output was generated by JsonWriter and then serialized and stored in HBase, so it requires some ASCII skills to decode the values. E.g. “115,116,111,99,107,83,121,109,98,111,108″ means stockSymbol, “115,116,111,99,107,80,114,105,99,101,76,111,119″ means stockPriceLow, “115 ,116,111,99,107,80,114,105,99,101,79,112,101,110″ means stockPriceOpen, etc. “65,65,80,76″ means AAPL, “50,53,46,50,53″ means 25.25, you know the rest.

You can also scan the entire table with Hbase shell using

hbase(main):001:0> scan 'aapl_marketdata'

command. If you are done and want to get rid of the data, you need to disable the table and then you can drop it.

hbase(main):002:0> disable 'aapl_marketdata'
0 row(s) in 2.1490 seconds

hbase(main):004:0> drop 'aapl_marketdata'
0 row(s) in 1.1790 seconds

The MapReduce code

The code consist of 4 files.

MarketDataApplication.java:

package org.awsdemo.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;

public class MarketDataApplication {
    public static void main(String[] args) throws Exception {
        System.out.println("MarketDataApplication invoked");
    	int m_rc = 0;
        m_rc = ToolRunner.run(new Configuration(), new MarketDataDriver(), args);
        System.exit(m_rc);
    }
}

MarketDataDriver.java

package org.awsdemo.hbase;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import  org.apache.hadoop.conf.Configured;

public class MarketDataDriver extends Configured implements Tool {
	   @Override
	    public int run(String[] args) throws Exception { 

	        Configuration conf = new Configuration();
	        Job job = new Job(conf, "AAPL Market Data Application");
	        job.setJarByClass(MarketDataApplication.class);
	        job.setInputFormatClass(TextInputFormat.class);
	        job.setMapperClass(MarketDataMapper.class);
	        job.setReducerClass(MarketDataReducer.class);
	        job.setMapOutputKeyClass(Text.class);
	        job.setMapOutputValueClass(MapWritable.class);

	        FileInputFormat.addInputPath(job, new Path(args[0]));
	        FileOutputFormat.setOutputPath(job, new Path(args[1]));

	        TableMapReduceUtil.initTableReducerJob("aapl_marketdata",
	                MarketDataReducer.class, job);

	        boolean jobSucceeded = job.waitForCompletion(true);
	        if (jobSucceeded) {
	            return 0;
	        } else {
	            return -1;
	        }
	    }

	}

MarketDataMapper.java

package org.awsdemo.hbase;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MarketDataMapper extends
    Mapper {

	public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

	final String APPL_STOCK_SYMBOL = "AAPL";

    final Text STOCK_SYMBOL = new Text("stockSymbol");
    final Text DATE = new Text("date");
    final Text STOCK_PRICE_OPEN = new Text("stockPriceOpen");
    final Text STOCK_PRICE_HIGH = new Text("stockPriceHigh");
    final Text STOCK_PRICE_LOW = new Text("stockPriceLow");
    final Text STOCK_PRICE_CLOSE = new Text("stockPriceClose");
    final Text STOCK_VOLUME = new Text("stockVolume");
    final Text STOCK_PRICE_ADJ_CLOSE = new Text("stockPriceAdjClose");

    String strLine = "";

    strLine = value.toString();
    String[] data_values = strLine.split(",");
    MapWritable marketData = new MapWritable();
    marketData.put(STOCK_SYMBOL, new Text(APPL_STOCK_SYMBOL));
    marketData.put(DATE, new Text(data_values[0]));
    marketData.put(STOCK_PRICE_OPEN, new Text(data_values[1]));
    marketData.put(STOCK_PRICE_HIGH, new Text(data_values[2]));
    marketData.put(STOCK_PRICE_LOW, new Text(data_values[3]));
    marketData.put(STOCK_PRICE_CLOSE, new Text(data_values[4]));
    marketData.put(STOCK_VOLUME, new Text(data_values[5]));
	marketData.put(STOCK_PRICE_ADJ_CLOSE, new Text(data_values[6]));

	context.write(new Text(String.format("%s-%s", APPL_STOCK_SYMBOL, data_values[0])), marketData);	    
  }
}

MarketDataReducer.java

package org.awsdemo.hbase;

import java.io.IOException;

import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import com.cedarsoftware.util.io.JsonWriter;

public class MarketDataReducer extends TableReducer {
   public void reduce(Text arg0, Iterable arg1, Context context) {
   // Since the complex key made up of stock symbol and date is unique
   // one value comes for a key.

   MapWritable marketData = null;
   for (MapWritable value : arg1) {
       marketData = value;
       break;
   }

   ImmutableBytesWritable key = new ImmutableBytesWritable(Bytes.toBytes(arg0.toString()));
   Put put = new Put(Bytes.toBytes(arg0.toString()));

   put.add(Bytes.toBytes("marketdata"), Bytes.toBytes("daily"), Bytes.toBytes(JsonWriter.toJson(marketData)));
   try {
       context.write(key, put);
   } catch (IOException e) {
       // TODO Auto-generated catch block
   } catch (InterruptedException e) {
       // TODO Auto-generated catch block
   }
  }
}

Amazon EMR HBase

Amazon Web Services recently launched HBase on it Elastic MapReduce. It runs on the Amazon distribution of Hadoop 0.20.205 (as of writing this post, it is not available yet on MapR M3 or M5 distributions).

You can configure it using Create a New Job Flow menu:

Then select the EC2 instance (they need to be Large or bigger). If you like you can also add Hive or Pig:

Then you can define EC2 keys (if you want to login to the instances using ssh, you need to add your key)

Check summary page and the launch HBase by clicking on Create Job Flow :

The instance will be seen as WAITING status in AWS EMR console:

Now you can login using ssh (and your ssh key) and you can start hbase shell, just as we discussed before:

$ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell

You can also check the running HBase instances on AWS EC2 console: