SODBASE CEP学习进阶篇(二):日志采集

在IT系统运维和性能监控中,常常需要对日志进行分析,得到系统的故障点或性能瓶颈。采用现成的日志分析软件,通常着重于监测节点和网络状态,几乎都难以满足大型应用系统对故障点或性能瓶颈分析的要求。

举几个例子:

(1)找出故障的上下层调用的关系,定位应用层故障对应的底层接口

(2)分析父子调用的时间差,找出性能瓶颈

(3)分析指定系统调用和服务的响应时间、是否超时

SODBASE CEP可以处理各类复杂的日志实时分析和图表显示功能。用户可以自己定义日志服务接口,采集日志数据,也可以用一些日志采集客户端,如flume、splunk等,完成采集功能。

本文用flume作为日志采集客户端,将日志信息发送到SODBASE CEP引擎。

1.操作步骤

(1)Windows环境(Linux环境更简单),,安装JDK1.6+

(2)下载SODBASE Studio软件

下载loganalysis.sod

下载apache-flume-1.5.2-bin.zip,解压到E:\software

(3)解压点击cepstudio.exe打开,菜单“文件”->“导入" 选择loganalysis.sod

在工作区面板点击邮件,测试运行

(4)用记事本代开flume的bin/flume-win.bat,设置自己的JAVA_HOME参数,保存运行。模拟日志采集。

(5)Studio中接收到的日志结果

示例操作完成,如果想了解工作原理,请看下文。

2.工作原理

flume比较其它日志采集客户端的优点是Java编写跨平台,轻量级,不需要安装额外的如elasticsearch等检索软件。本文用flume作为日志采集客户端,将日志信息发送到SODBASE CEP引擎。示例中,我们实现一个将数据通过socket传输到CEP引擎的Sink。在CEP引擎中通过socket输入适配器负责接收数据。

需要用到的类库有sodbase-cep-engine.jar,sodbase-dataadaptor-socket.jar,运行时放到flume的lib目录下即可。注:sodbase-cep-engine.jar在flume中用的版本需要和CEPserver中的版本一致,保证对象能够正常解序列化。

package com.sodbase.dataadaptor.flume;import java.io.IOException;import java.util.Date;import org.apache.flume.Channel;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.Transaction;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;import com.sodbase.outputadaptor.socket.SocketUtil;import zstreamplus.eventbuffer.PrimitiveEvent;import zstreamplus.eventbuffer.ValueType;public class CEPServerSink extends AbstractSink implements Configurable{private String CEPServerSocketIpPort;private String retrynum;private static final String DEFAULT_ENCODING = "UTF-8";private SocketUtil socketUtil=new SocketUtil();@Overridepublic void configure(Context context){/** * 在flume-conf.properties中配置 */CEPServerSocketIpPort = context.getString("CEPServerSocketIpPort","localhost:12345");retrynum = context.getString("CEPServerSocketRetryNum", "30");}@Overridepublic void start(){socketUtil.setRunning(true);}@Overridepublic void stop(){socketUtil.setRunning(false);if (socketUtil.getClient() != null)try{socketUtil.getClient().close();} catch (IOException e){e.printStackTrace();}}@SuppressWarnings("unchecked")@Overridepublic Status process() throws EventDeliveryException{Status status = null;// Start transactionChannel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();try{Event event = ch.take();//prepare the log dataString eventData = new String(event.getBody(), DEFAULT_ENCODING);PrimitiveEvent primitiveEvent = new PrimitiveEvent();ValueType valueType = new ValueType(eventData, "string");primitiveEvent.getAttributeMap().put("flumeeventdata", valueType);Date date = new Date();long time = date.getTime();primitiveEvent.setStart_ts(time);primitiveEvent.setEnd_ts(time);//transfer data to cep serverString[] address = CEPServerSocketIpPort.split(":");socketUtil.setIp(address[0]);socketUtil.setPort(address[1]);socketUtil.setRetrynum(retrynum);socketUtil.outputPrimitiveEvent(primitiveEvent);txn.commit();status = Status.READY;} catch (Throwable t){txn.rollback();status = Status.BACKOFF;if (t instanceof Error){throw (Error) t;}} finally{txn.close();}return status;}}

linux的flume启动调用sh脚本即可,windows中可参考下面脚本

set FLUME_HOME=E:\software\apache-flume-1.5.2-bin\apache-flume-1.5.2-binset JAVA_HOME=D:\Program Files\Java\jdk1.7.0_51set JAVA="%JAVA_HOME%\bin\java.exe"set JAVA_OPTS=-Xmx256mset CONF=%FLUME_HOME%\conf\flume-cep-conf.propertiesset AGENT=agent%JAVA% %JAVA_OPTS% -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 -Dlog4j.configuration=file:\\\%FLUME_HOME%\conf\log4j.properties -cp "%FLUME_HOME%\lib\*" org.apache.flume.node.Application -f %FLUME_HOME%\conf\flume-cep-conf.properties -n %AGENT%天才是百分之一的灵感加上百分之久十久的努力

SODBASE CEP学习进阶篇(二):日志采集

相关文章:

你感兴趣的文章:

标签云: