【Flume】flume输出sink到hbase的实现

flume 1.5.2

hbase 0.98.9

hadoop 2.6

zk 3.4.6

以上是基础的软件及对应版本,请先确认以上软件安装成功!

1、添加jar包支持

将hbase的lib下的这些jar包拷贝到flume的lib下

2、配置flume

注意看以上的serializer配置,采用的是官方的RegexHbaseEventSerializer,

当然还有一个SimpleHbaseEventSerializer

如果你使用了SimpleHbaseEventSerializer

就会出现如下的错误

[2015-03-04 09:35:41,244] [SinkRunner-PollingRunner-DefaultSinkProcessor:5672] [ERROR] Failed to commit transaction.Transaction rolled back.java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)at java.lang.Thread.run(Thread.java:745)[2015-03-04 09:35:41,249] [SinkRunner-PollingRunner-DefaultSinkProcessor:5677] [ERROR] Failed to commit transaction.Transaction rolled back.java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)at java.lang.Thread.run(Thread.java:745)Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)at java.lang.Thread.run(Thread.java:745)所以不要使用该序列化器

当然这个序列化器应该自己来开发的,因为列族,列修饰符什么的,,应该由我们自己定义的

3、测试这里注意,你必须提前在hbase将表创建好

hbase(main):004:0> create 'flume','chiwei'0 row(s) in 1.0770 seconds=> Hbase::Table – flume启动flume

待监测的文件内容为

Hello FlumeHello HBaseHello HadoopHello ZkHello chiwei再来看一下hbase的表数据

hbase(main):006:0> scan 'flume'ROWCOLUMN+CELL1425437884264-rd3B0VE5Uz-0column=chiwei:payload, timestamp=1425437884520, value=Hello Flume1425437884275-rd3B0VE5Uz-1column=chiwei:payload, timestamp=1425437884520, value=Hello HBase1425437884276-rd3B0VE5Uz-2column=chiwei:payload, timestamp=1425437884520, value=Hello Hadoop1425437884277-rd3B0VE5Uz-3column=chiwei:payload, timestamp=1425437884520, value=Hello Zk1425437884278-rd3B0VE5Uz-4column=chiwei:payload, timestamp=1425437884520, value=Hello chiwei1425437884624-rd3B0VE5Uz-5column=chiwei:payload, timestamp=1425437884639, value=Hello Flume1425437884626-rd3B0VE5Uz-6column=chiwei:payload, timestamp=1425437884639, value=Hello HBase1425437884628-rd3B0VE5Uz-7column=chiwei:payload, timestamp=1425437884639, value=Hello Hadoop1425437884629-rd3B0VE5Uz-8column=chiwei:payload, timestamp=1425437884639, value=Hello Zk1425437884630-rd3B0VE5Uz-9column=chiwei:payload, timestamp=1425437884639, value=Hello chiwei10 row(s) in 0.4020 secondshbase(main):007:0> 测试成功!

人生并不在于获取,更在于放得下。放下一粒种子,收获一棵大树;

【Flume】flume输出sink到hbase的实现

相关文章:

你感兴趣的文章:

标签云: