lskyne的专栏

转自:?mod=viewthread&tid=8295&highlight=flume

参考:?mod=viewthread&tid=8315&highlight=flume

首先第一步你有这个类:开发完毕,然后打包jar包,将jar包加入到flume中,有两种方法:方法一:进入vi /etc/profile 在文件中加入 export FLUME_CLASSPATH = XXX/XX/XXX.jar在CALSSPATH 的尾部加入:$FLUME_CLASSPATH , 然后保存退出后,在命令行执行sh /etc/profile ,通过在命令行输入 flume classpath 查看插件jar包是否在其中,在其中表示配置完成(注:开发插件时引入的jar包必须都能够在flume classpath下找到,如果找不到的话像加入插件包的方式加入)方法二:简单的方法是将开发的jar包和需要引入的jar包直接复制到/usr/lib/flume/lib修改flume-site.xml文件 如下所示:

<property><name>flume.plugin.classes</name> <value>插件类得全路径名(如:hello.HelloWorldSink) </value> <description>Comma separated list of plugin classes</description></property>

复制代码

将flume-site.xml文件放到master 和collector的/etc/flume/conf 下启动集群配置即可

flume插件开发一般分如下步骤:1 编写插件类(Sink/Source/Decorator)相应的继承flume的(EventSink.Base/ EventSource.Base/EventSinkDecorator<S>)Source必须实现四个方法:

void open() throws IOException Event next() throws IOExceptionvoid close() throws IOExceptionReportEvent getReport()

复制代码

Sink和Decorator必须实现四个方法:

void open() throws IOException void append(Event e) throws IOException void close() throws IOException ReportEvent getReport()

复制代码

主要介绍sink插件的开发append(Event e)方法中的event参数是flume框架自带的类,日志文件的一行记录会在flume框架中封装成一个event,在append方法中对event的处理就可以当做是处理日志文件的每一行,Event包含的六个属性是:

Unix timestamp时间戳 Nanosecond timestamp Priority Source host 主机地址(这个地址是产生日志的文件地址) Body 日志文件的一行 Metadata table with an arbitrary number of attribute value pairs.

在sink的插件类中一定要是加入下面的代码:

public static List<Pair<String, SinkBuilder>> getSinkBuilders() { List<Pair<String, SinkBuilder>> builders = new ArrayList<Pair<String, SinkBuilder>>(); builders.add(new Pair<String, SinkBuilder>("HBaseEventSinkV1", builder())); return builders; }

因为flume的SinkFactoryImpl中需要通过这份方法将写的插件类注册到flume,如果不写会报错,"HBaseEventSinkV1" 就是注册名以开发collector的插件类为例,完成插件类的开发之后,将插件类打包分别放到master和collector主机上(注:多个master和多个collector的情况下要都放置,最好目录都一样,便于配置flume classpath)。

根据上面的方法,,首先你可以自定义个sink,然后把这个类放到flume中,放到flume中后,你在配置文件中,配置自己自定义的type你可以自定义到Kafka

此刻睡觉的口水将变成明天流下的泪水。

lskyne的专栏

相关文章:

你感兴趣的文章:

标签云: