flume 自定义 hbase sink 类

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.flume.sink.hbase;import java.util.ArrayList;import java.util.List;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import org.apache.flume.conf.ComponentConfiguration;import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;import com.google.common.base.Charsets;public class BaimiAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { private byte[] table; private byte[] cf; private byte[][] payload; private byte[][] payloadColumn; private final String payloadColumnSplit = "\\^A"; private byte[] incrementColumn; private String rowSuffix; private String rowSuffixCol; private byte[] incrementRow; private KeyType keyType; @Override public void initialize(byte[] table, byte[] cf) {this.table = table;this.cf = cf; } @Override public List<PutRequest> getActions() {List<PutRequest> actions = new ArrayList<PutRequest>();if(payloadColumn != null){byte[] rowKey;try {switch (keyType) {case TS:rowKey = SimpleRowKeyGenerator.getTimestampKey(rowSuffix);break;case TSNANO:rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowSuffix);break;case RANDOM:rowKey = SimpleRowKeyGenerator.getRandomKey(rowSuffix);break;default:rowKey = SimpleRowKeyGenerator.getUUIDKey(rowSuffix);break;}// for 循环,提交所有列和对于数据的put请求。for (int i = 0; i < this.payload.length; i++){PutRequest putRequest = new PutRequest(table, rowKey, cf,payloadColumn[i], payload[i]);actions.add(putRequest);}} catch (Exception e){throw new FlumeException("Could not get row key!", e);}}return actions; } public List<AtomicIncrementRequest> getIncrements(){List<AtomicIncrementRequest> actions = newArrayList<AtomicIncrementRequest>();if(incrementColumn != null) {AtomicIncrementRequest inc = new AtomicIncrementRequest(table,incrementRow, cf, incrementColumn);actions.add(inc);}return actions; } @Override public void cleanUp() {// TODO Auto-generated method stub } @Override public void configure(Context context) {String pCol = context.getString("payloadColumn", "pCol");String iCol = context.getString("incrementColumn", "iCol");rowSuffixCol = context.getString("rowPrefixCol", "mac");String suffix = context.getString("suffix", "uuid");if(pCol != null && !pCol.isEmpty()) {if(suffix.equals("timestamp")){keyType = KeyType.TS;} else if (suffix.equals("random")) {keyType = KeyType.RANDOM;} else if(suffix.equals("nano")){keyType = KeyType.TSNANO;} else {keyType = KeyType.UUID;}// 从配置文件中读出column。String[] pCols = pCol.replace(" ", "").split(",");payloadColumn = new byte[pCols.length][];for (int i = 0; i < pCols.length; i++){// 列名转为小写payloadColumn[i] = pCols[i].toLowerCase().getBytes(Charsets.UTF_8);}}if(iCol != null && !iCol.isEmpty()) {incrementColumn = iCol.getBytes(Charsets.UTF_8);}incrementRow =context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void setEvent(Event event) {String strBody = new String(event.getBody());String[] subBody = strBody.split(this.payloadColumnSplit);if (subBody.length == this.payloadColumn.length){this.payload = new byte[subBody.length][];for (int i = 0; i < subBody.length; i++){this.payload[i] = subBody[i].getBytes(Charsets.UTF_8);if ((new String(this.payloadColumn[i]).equals(this.rowSuffixCol))){// rowkey 前缀是某一列的值, 默认情况是mac地址this.rowSuffix = subBody[i];}}} } @Override public void configure(ComponentConfiguration conf) {// TODO Auto-generated method stub }}

重点可以查看setEent,configure,,getActions函数。

源码编译和执行

编写好自定义的BaimiAsyncHbaseEventSerializer函数后,接下来需要编译源码,生成flume-ng-hbase-sink.*.jar包,替换flume中原来的flume-ng-hbase-sink.*.jar包。

只有经历过地狱般的折磨,才有征服天堂的力量。

flume 自定义 hbase sink 类

相关文章:

你感兴趣的文章:

标签云: