Flume 开发者指南V1.5.2

介绍

概述

Apache Flume是一个用来从很多不同的源有效地收集,聚集和移动大量的日志数据到一个中心数据仓库的分布式的,可靠的和可用的系统。

Apache Flume是Apache软件基金会的顶级项目。目前有两个可获得的发布代码路线,0.9.x版本和1.x版本。本文档适用于1.x代码线。对于0.9.x代码线,请看Flume 0.9.x开发指南。

结构

数据流模型

一个Event是在Flume代理之间流动的数据单元。Event从Source流动到Channel再到Sink,并由一个Event接口的实现表示。一个Event携带着一个有效负载(字节数组)和一个可选的头部(字符串属性)集合。一个Flume代理是一个进程(JVM),它能控制组件允许Events从一个外部源流向一个外部目的地。

一个Source消耗有特殊格式的Events,并且那些Events通过像Web服务器之类的外部源被传送到Source。例如,一个AvroSource可以用来从客户端或从流中的其他的Flume代理接收Avro Events。当一个源收到了一个Event,它将它存入到一个或多个Channel中。Channel采用被动存储的形式,Channel会缓存该Event直到它被一个Sink处理。在Flume中,有一种Channel类型是FileChannel,它使用本地文件系统作为它的备份仓库。一个Sink负责将Event从Channel中移除,并将它放到外部仓库中,例如HDFS(这种情况下使用的是HDFSEventSink),或者将它放置到流中下一跳的Source中。在给定的代理中,Source和Sink是异步运行的,因为Events会缓存在Channel中。

可靠性

一个Event被缓存在Flume代理的Channel中。然后就是Sink的任务来将Event传送到流中的下一个代理或者目标仓库(例如HDFS)。Sink只有在Event存储到下一个代理的Channel或者目标仓库中,才会将Event从Channel中移除。这就是单跳消息传递语义如何在Flume中提供端到端的流的可靠性。Flume使用一个事务处理方法保证Events传输的可靠性。Sources和Sinks在由Channel提供的事务中封装了Events的存储和检索。这保证了Events集合可靠地在流中点到点传输。在多跳流的例子中,前一跳的Sink和后一跳的Source都有各自的事务运行来保证数据被安全地存储在下一跳的Channel中。

构建Flume

获取源代码

使用Git检出代码。获取git仓库根目录点击此处https://git-wip-us.apache.org/repos/asf/flume.git。

Flume 1.x的开发在“trunk”分支之下进行,所以可以使用下面的命令行:

git clone https://git-wip-us.apache.org/repos/asf/flume.git

编译/测试Flume

Flume是以Maven方式构建的。你可以使用标准的Maven命令编译Flume:

1.只编译:mvn clean compile

2.编译并运行单元测试:mvn clean test

3.运行独立测试:mvn clean test –Dtest=<Test1><Test2>,…-DfailIfNoTests=false

4.创建tarball包:mvn clean install

5.创建tarball包(跳过单元测试):mvn clean install –DskipTests

请注意,Flume的构建需要GoogleProtocol Buffers编译器在路径中。你可以通过下面的介绍下载并安装它https://developers.google.com/protocol-buffers/,。

开发自定义组件

客户端

Client在Event的起始点进行操作,并将他们传送到一个Flume代理上。Client通常在它们处理的数据来自于的程序的进程空间内操作。Flume目前支持Avro,log4j,syslog和Http POST(使用一个JSON)等方式从一个外部源传输数据。除此之外,有一个ExecSource可以处理本地进程的输出作为给Flume的输入。

很有可能有一个用例使得当前存在的选项都没有效。在这种情况下,你可以建立一个自定义的机制发送数据给Flume。要实现这个有两个方法。第一个方法是创建一个自定义的Client来跟Flume已经存在的Source像AvroSource或者SyslogTcpSource通信。这里Client应该把它的数据转换成这些Flume Source可以理解的数据。另一个选择是写一个自定义的Flume Source,使用IPC或者RPC协议,直接和你已有的客户端程序通信,并把客户端的数据转换成Flume Events进行发送。注意所有存储在一个Flume代理的Channel中events必须以Flume Events的形式存在。

客户端SDK

尽管Flume包含了多个内建的机制(例如Sources)来接收数据,但是人们经常想要能够从一个自定义的程序直接与Flume交互。Flume Client SDK就是一个可以让应用程序使用RPC协议连接到Flume并给Flume的数据流发送数据的库。

RPC客户端接口

Flume RpcClient接口的实现封装着Flume支持的RPC机制。用户的程序可以简单地调用Flume Client SDK中的append(Event)或者appendBatch(List<Event>)来发送数据而不必担心底层信息交换的细节。直接实现Event接口,同一个方便的实现SimpleEvent类,或者通过使用EventBuilder的重载的静态辅助方法wintBody(),用户可以提供需要的Event参数。

RPC客户端——Avro和Thrift

在Flume1.4.0中,Avro是默认的RPC协议。NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口。客户端需要目标Flume代理的主机地址和端口号来创建这个对象,然后就可以使用RpcClient将数据发送给代理。下面的例子展示了在一个用户的数据生成程序中如何使用Flume Client SDK API:

import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.api.RpcClient;import org.apache.flume.api.RpcClientFactory;The remote Flume agent needs to have an AvroSource(or aThriftSourceif you are using aThrift client) listening on some port. Below is an example Flumeagent configuration that’simport org.apache.flume.event.EventBuilder;import java.nio.charset.Charset;public class MyApp {public static void main(String[] args) {MyRpcClientFacade client = new MyRpcClientFacade();// Initialize client with the remote Flume agent's host and portclient.init("host.example.org", 41414);// Send 10 events to the remote Flume agent. That agent should be// configured to listen with an AvroSource.String sampleData = "Hello Flume!";for (int i = 0; i < 10; i++) {client.sendDataToFlume(sampleData);}client.cleanUp();}}class MyRpcClientFacade {private RpcClient client;private String hostname;private int port;public void init(String hostname, int port) {// Setup the RPC connectionthis.hostname = hostname;this.port = port;this.client = RpcClientFactory.getDefaultInstance(hostname, port);// Use the following method to create a thrift client (instead ofthe above line):// this.client = RpcClientFactory.getThriftInstance(hostname, port);}public void sendDataToFlume(String data) {// Create a Flume Event object that encapsulates the sample dataEvent event = EventBuilder.withBody(data,Charset.forName("UTF-8"));// Send the eventtry {client.append(event);} catch (EventDeliveryException e) {// clean up and recreate the clientclient.close();client = null;client = RpcClientFactory.getDefaultInstance(hostname, port);// Use the following method to create a thrift client (instead ofthe above line):// this.client = RpcClientFactory.getThriftInstance(hostname, port);}}public void cleanUp() {// Close the RPC connectionclient.close();}}

远程Flume代理需要有一个AvroSource(或者如果你用的是Thrift客户端的话那就是ThriftSource)监听某个端口。下面是一个Flume代理的配置在等带来自与MyApp的连接:

而不去欣赏今天就开在我们窗口的玫瑰。

Flume 开发者指南V1.5.2

相关文章:

你感兴趣的文章:

标签云: