hbase shell命令扩展

也许你还不清楚我在说什么,看一下下面的截图你就明白了:

好吧,如果您感兴趣,可以继续看下去了。

hbase是以字节数组的形式存储数据的,当你直接用API或通过hbase 自带的shell端去查询数据时,实际显示的是二进制数据的byteString的形式,就像这样:\xE5\x94\xAE\xE5\x90\x8E,当然,这肯定不是你想看到的结果。不过没办法,因为rowkey和value是用户自己定义的,,包括类型,长度等,又因为存的是二进制数据,所以hbase自身api不可能解析出真实的数据,因为它不知道存取的规则。

我们要做的,就是要制定这种数据存储的规则,比如:rowkey由几个字段构成,每一个的类型,value的类型。值得一提的是几种数值型一旦类型一定,长度就定了,但string类型比较特殊,长度不是固定的。这里先使用一个字节存储长度,再紧跟着具体string串,当然这只是一种方式。value使用一个columfamily,因为官方也不建议使用太多。

以上的规则作为元数据放在xml配置文件里再好不过了,以后增加了hbase表,只需修改配置文件。

根据上面的规则写几个包含处理逻辑的类是必须的,包括读元数据,类型之间的转换方法等等。有了这些方法,就能在任何地方输出想要的数据,比如web前端。下面是以scan为例的hbase shell客户端具体实现思路:

1. 首先要在ruby/shell/commands下增加一个命令,姑且叫做superscan吧,直接对应一个JRuby脚本,定义一个Superscan类继承自Command,接收结果集并格式化输出。

内容如下:

module Shell module Commandsclass Superscan < Commanddef helpreturn <<-EOF此处省略若干打印的帮助信息EOFenddef command(table, args = {})now = Time.nowformatter.header(["READABLE_ROW", "READABLE(COLUMN+CELL)"])count = table(table).superscan(args) do |row, cells|formatter.row([ row, cells ])endformatter.footer(now, count)endend endend

2.shell.rb脚本中的dml命令组里要添加上面定义的命令。

3.下面主要是对ruby/hbase/table.rb脚本的修改,添加相应的方法:

因为我的api里用到了表的名字,所以要在初始化方法里加个tablename变量:

def initialize(configuration, table_name, formatter)@table = org.apache.hadoop.hbase.client.HTable.new(configuration, table_name)@tableName = table_nameend

superscan方法:

#———————————————————————————————-# superScans whole table or a range of keys and returns rows matching specific criteriasdef superscan(args = {})unless args.kind_of?(Hash)raise ArgumentError, "Arguments should be a hash. Failed to parse #{args.inspect}, #{args.class}"endlimit = args.delete("LIMIT") || -1maxlength = args.delete("MAXLENGTH") || -1if args.any?filter = args["FILTER"]startrow = args["STARTROW"] || ”stoprow = args["STOPROW"]timestamp = args["TIMESTAMP"]columns = args["COLUMNS"] || args["COLUMN"] || get_all_columnscache = args["CACHE_BLOCKS"] || trueversions = args["VERSIONS"] || 1timerange = args[TIMERANGE]# Normalize column namescolumns = [columns] if columns.class == Stringunless columns.kind_of?(Array)raise ArgumentError.new("COLUMNS must be specified as a String or an Array")endscan = if stoproworg.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes, stoprow.to_java_bytes)elseorg.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes)endcolumns.each do |c|family, qualifier = parse_column_name(c.to_s)if qualifierscan.addColumn(family, qualifier)elsescan.addFamily(family)endendunless filter.class == Stringscan.setFilter(filter)elsescan.setFilter(org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter))endscan.setTimeStamp(timestamp) if timestampscan.setCacheBlocks(cache)scan.setMaxVersions(versions) if versions > 1scan.setTimeRange(timerange[0], timerange[1]) if timerangeelsescan = org.apache.hadoop.hbase.client.Scan.newend# Start the scannerscanner = @table.getScanner(scan)count = 0res = {}iter = scanner.iterator# Iterate resultswhile iter.hasNextif limit > 0 && count >= limitbreakendrow = iter.nextkey = org.apache.hadoop.hbase.util.Bytes::toStringBinary(row.getRow) #\00\x01这种形式byteKey = row.getRow#addrowType = com.cuirong.bi.data.hbase.reader.MetaConfig::getRowKeyType(@tableName)stringKey = com.cuirong.bi.data.hbase.inf.CommonUtil::bytes2String(byteKey,rowType)#endaddrow.list.each do |kv|family = String.from_java_bytes(kv.getFamily) #字节数组转stringqualifier = org.apache.hadoop.hbase.util.Bytes::toStringBinary(kv.getQualifier)column = "#{family}:#{qualifier}"cell = to_strings(column, kv, byteKey, maxlength)if block_given?yield(stringKey, "column=#{column}, #{cell}")elseres[stringKey] ||= {}res[stringKey][column] = cellendend# One more row processedcount += 1endreturn ((block_given?) ? count : res)end上面的方法用到了to_strings方法:天不负;卧薪尝胆,三千越甲可吞吴。

hbase shell命令扩展

相关文章:

你感兴趣的文章:

标签云: