flume bucketpath的bug一例

今天在做flume+kerberos写入hdfs时遇到的问题。测试的配置文件:

agent-server1.sources= testtailagent-server1.sinks = hdfs-sinkagent-server1.channels= hdfs-channelagent-server1.sources.testtail.type = netcatagent-server1.sources.testtail.bind = localhostagent-server1.sources.testtail.port = 9999agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs/_HOST@KERBEROS_HADOOPagent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = /home/vipshop/conf/hdfs.keytabagent-server1.channels.hdfs-channel.type = memoryagent-server1.channels.hdfs-channel.capacity = 200000000agent-server1.channels.hdfs-channel.transactionCapacity = 10000agent-server1.sinks.hdfs-sink.type = hdfsagent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume/%Y%m%dagent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10agent-server1.sinks.hdfs-sink.hdfs.round = falseagent-server1.sinks.hdfs-sink.hdfs.roundValue = 30agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minuteagent-server1.sinks.hdfs-sink.hdfs.batchSize = 100agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStreamagent-server1.sinks.hdfs-sink.hdfs.writeFormat = Textagent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ipagent-server1.sinks.hdfs-sink.channel = hdfs-channelagent-server1.sources.testtail.channels = hdfs-channel

在启动服务后,使用telnet进行测试,发现如下报错:

14/03/24 18:03:07 ERROR hdfs.HDFSEventSink: process failedjava.lang.RuntimeException: Flume wasn’t able to parse timestamp header in the event to resolve time based bucketing. Please check that you’re correctly populating timestamp header (for example using TimestampInterceptor source interceptor).at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)at java.lang.Thread.run(Thread.java:662)Caused by: java.lang.NumberFormatException: nullat java.lang.Long.parseLong(Long.java:375)at java.lang.Long.valueOf(Long.java:525)at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)… 5 more14/03/24 18:03:07 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn’t able to parse timestamp header in the event toresolve time based bucketing. Please check that you’re correctly populating timestamp header (for example using TimestampInterceptor source interceptor).at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:461)at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)at java.lang.Thread.run(Thread.java:662)Caused by: java.lang.RuntimeException: Flume wasn’t able to parse timestamp header in the event to resolve time based bucketing. Please check that you’re correctly populating timestamp header (for example using TimestampInterceptor source interceptor).at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)… 3 moreCaused by: java.lang.NumberFormatException: nullat java.lang.Long.parseLong(Long.java:375)at java.lang.Long.valueOf(Long.java:525)at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)… 5 more

从调用栈的信息来看,错误出在org.apache.flume.formatter.output.BucketPath类的replaceShorthand方法。在org.apache.flume.sink.hdfs.HDFSEventSink类中,使用process方法来生成hdfs的url,其中主要是调用了BucketPath类的escapeString方法来进行字符的转换,,并最终调用了replaceShorthand方法。其中replaceShorthand方法的相关代码如下:

public static String replaceShorthand(char c, Map<String, String> headers,TimeZone timeZone, boolean needRounding, int unit, int roundDown) {String timestampHeader = headers.get(“timestamp”);long ts;try {ts = Long.valueOf(timestampHeader);} catch (NumberFormatException e) {throw new RuntimeException(“Flume wasn’t able to parse timestamp header”+ ” in the event to resolve time based bucketing. Please check that”+ ” you’re correctly populating timestamp header (for example using”+ ” TimestampInterceptor source interceptor).”, e);}if(needRounding){ts = roundDown(roundDown, unit, ts);}……..

从代码中可以看到,timestampHeader 的值如果取不到,在向ts赋值时就会报错。。这其实是flume的一个bug,bug id:https://issues.apache.org/jira/browse/FLUME-1419解决方法有3个:1.更改配置,更新hdfs文件的路径格式

agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume孤单不是与生俱来,而是由你爱上一个人的那一刻开始。

flume bucketpath的bug一例

相关文章:

你感兴趣的文章:

标签云: