hdfs RotationActions 接口用法

storm-hdfs 这个插件支持Rotation Actions 这个功能,官方文档解释是这样的,

### File Rotation Actions Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s. What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For example, moving a file to a different location or renaming it.

大概意思就是,写完hdfs文件之后会调用这个注册的方法,,可以做个善后工作,比如移动个文件夹啥的。官方给了个例子,MoveFileAction 也是当文件写完之后移动到另外一个文件夹中,可是坑爹的是这个类不好用。于是没办法只能自己手动写一个了,把这个公布出来大家一起分享。

package lanbo.storm.kafka.examples;import org.apache.commons.io.IOUtils;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.storm.hdfs.common.rotation.RotationAction;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;/** * Created by lanbo on 15-2-13. */public class CopyFileAction implements RotationAction {private static final Logger LOG = LoggerFactory.getLogger(CopyFileAction.class);private String destination;public CopyFileAction toDestination(String destDir){destination = destDir;return this;}@Overridepublic void execute(FileSystem fileSystem, Path filePath) throws IOException {Path destPath = new Path(destination, filePath.getName());LOG.info("Moving file {} to {}", filePath, destPath);InputStream ins = fileSystem.open(filePath);OutputStream os = fileSystem.create(destPath);try{IOUtils.copy(ins, os);fileSystem.delete(filePath,true);}catch(Exception e){throw new IOException(e);}finally{ins.close();os.close();}return;}} 这个类简单的就能实现移动文件夹,以后会陆续更新很多更有用的工具

有理想在的地方,地狱就是天堂

hdfs RotationActions 接口用法

相关文章:

你感兴趣的文章:

标签云: