【Flume】 flume中ExecSource源码的详细分析

我们直接看该Source的start方法吧

public void start() {logger.info("Exec source starting with command:{}", command);executor = Executors.newSingleThreadExecutor();runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);// FIXME: Use a callback-like executor / future to signal us upon failure.runnerFuture = executor.submit(runner);/** NB: This comes at the end rather than the beginning of the method because* it sets our state to running. We want to make sure the executor is alive* and well first.*/sourceCounter.start();super.start();logger.debug("Exec source started"); }启动了一个线程来运行,运行的详细过程看runner

它是一个线程,实现了Runnable接口,所以直接看它重写的run方法的逻辑,,我们一块一块来看:

if(shell != null) {String[] commandArgs = formulateShellCommand(shell, command);process = Runtime.getRuntime().exec(commandArgs);} else {String[] commandArgs = command.split("\\s+");process = new ProcessBuilder(commandArgs).start();}reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset));这里就是执行shell命令,并且将shell命令的输出结果作为输入流读到reader中,InputStreamReader是字节流通向字符流的桥梁,它使用指定的charset读取字节并将其解码为字符,每次调用read方法都会从底层输入流读取一个或多个字节。

while ((line = reader.readLine()) != null) {synchronized (eventList) {sourceCounter.incrementEventReceivedCount();eventList.add(EventBuilder.withBody(line.getBytes(charset)));if(eventList.size() >= bufferCount || timeout()) {flushEventBatch(eventList);}}}如果读入的内容非空,先同步eventList,如果eventList超出一定范围未做处理就会flush

private void flushEventBatch(List<Event> eventList){channelProcessor.processEventBatch(eventList);sourceCounter.addToEventAcceptedCount(eventList.size());eventList.clear();lastPushToChannel = systemClock.currentTimeMillis();}flush就是现将积攒下来的eventList中的event都处理掉,然后清空

1、将event都放入配置的通道中 for (Event event : events) {List<Channel> reqChannels = selector.getRequiredChannels(event);for (Channel ch : reqChannels) {List<Event> eventQueue = reqChannelQueue.get(ch);if (eventQueue == null) {eventQueue = new ArrayList<Event>();reqChannelQueue.put(ch, eventQueue);}eventQueue.add(event);}List<Channel> optChannels = selector.getOptionalChannels(event);for (Channel ch: optChannels) {List<Event> eventQueue = optChannelQueue.get(ch);if (eventQueue == null) {eventQueue = new ArrayList<Event>();optChannelQueue.put(ch, eventQueue);}eventQueue.add(event);}}这里就是将event放到通道中的详细过程了,但是这里大家注意到有两次selector的getchannel的方法,这是因为通道的选择器模式有两种:复用和复制 if(restart) {logger.info("Restarting in {}ms, exit code {}", restartThrottle,exitCode);try {Thread.sleep(restartThrottle);} catch (InterruptedException e) {Thread.currentThread().interrupt();}} else {logger.info("Command [" + command + "] exited with " + exitCode);}} while(restart);restart参数的含义是,当shell命令执行的时候进程死了,是否重启该命令的进程,默认是false

配置为true的话,就会将刚才的所有代码循环一遍

总结:

1、event如何产出的?

eventList.add(EventBuilder.withBody(line.getBytes(charset))); public static Event withBody(byte[] body, Map<String, String> headers) {Event event = new SimpleEvent();if(body == null) {body = new byte[0];}event.setBody(body);if (headers != null) {event.setHeaders(new HashMap<String, String>(headers));}return event; }2、event如何放入通道?

private void flushEventBatch(List<Event> eventList){channelProcessor.processEventBatch(eventList);sourceCounter.addToEventAcceptedCount(eventList.size());eventList.clear();lastPushToChannel = systemClock.currentTimeMillis();}

获致幸福的不二法门是珍视你所拥有的遗忘你所没有的

【Flume】 flume中ExecSource源码的详细分析

相关文章:

你感兴趣的文章:

标签云: