[Hadoop源码解读](一)MapReduce篇之InputFormat

平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class);来保证输入文件按照我们想要的格式被读取。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。

其实,一个输入格式InputFormat,主要无非就是要解决如何将数据分割成分片[比如多少行为一个分片],以及如何读取分片中的数据[比如按行读取]。前者由getSplits()完成,后者由RecordReader完成。

不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的map task作为数据源。下面我们先看看这些输入分片(inputSplit)是什么样的。

InputSplit:

我们知道Mappers的输入是一个一个的输入分片,称InputSplit。InputSplit是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException; public abstractString[] getLocations() throws IOException, InterruptedException;} getLength()用来获取InputSplit的大小,以支持对InputSplits进行排序,而getLocations()则用来获取存储分片的位置列表。 我们来看一个简单InputSplit子类:FileSplit。public class FileSplit extends InputSplit implements Writable { private Path file; private long start; private long length; private String[] hosts; FileSplit() {} public FileSplit(Path file, long start, long length, String[] hosts) {this.file = file;this.start = start;this.length = length;this.hosts = hosts; } //序列化、反序列化方法,获得hosts等等……} 从上面的源码我们可以看到,一个FileSplit是由文件路径,分片开始位置,分片大小和存储分片数据的hosts列表组成,由这些信息我们就可以从输入文件中切分出提供给单个Mapper的输入数据。这些属性会在Constructor设置,我们在后面会看到这会在InputFormat的getSplits()中构造这些分片。

我们再看CombineFileSplit:

public class CombineFileSplit extends InputSplit implements Writable { private Path[] paths; private long[] startoffset; private long[] lengths; private String[] locations; private long totLength; public CombineFileSplit() {} public CombineFileSplit(Path[] files, long[] start,long[] lengths, String[] locations) {initSplit(files, start, lengths, locations); } public CombineFileSplit(Path[] files, long[] lengths) {long[] startoffset = new long[files.length];for (int i = 0; i < startoffset.length; i++) {startoffset[i] = 0;}String[] locations = new String[files.length];for (int i = 0; i < locations.length; i++) {locations[i] = "";}initSplit(files, startoffset, lengths, locations); }private void initSplit(Path[] files, long[] start,long[] lengths, String[] locations) {this.startoffset = start;this.lengths = lengths;this.paths = files;this.totLength = 0;this.locations = locations;for(long length : lengths) {totLength += length;} } //一些getter和setter方法,和序列化方法} 与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和存储分片数据的host列表,由于CombineFileSplit是针对小文件的,它把很多小文件包在一个InputSplit内,这样一个Mapper就可以处理很多小文件。要知道我们上面的FileSplit是对应一个输入文件的,也就是说如果用FileSplit对应的FileInputFormat来作为输入格式,那么即使文件特别小,也是单独计算成一个输入分片来处理的。当我们的输入是由大量小文件组成的,就会导致有同样大量的InputSplit,从而需要同样大量的Mapper来处理,这将很慢,想想有一堆map task要运行!!这是不符合Hadoop的设计理念的,Hadoop是为处理大文件优化的。

最后介绍TagInputSplit,这个类就是封装了一个InputSplit,然后加了一些tags在里面满足我们需要这些tags数据的情况,我们从下面就可以一目了然。

class TaggedInputSplit extends InputSplit implements Configurable, Writable { private Class<? extends InputSplit> inputSplitClass; private InputSplit inputSplit; @SuppressWarnings("unchecked") private Class<? extends InputFormat> inputFormatClass; @SuppressWarnings("unchecked") private Class<? extends Mapper> mapperClass; private Configuration conf; //getters and setters,序列化方法,getLocations()、getLength()等}

现在我们对InputSplit的概念有了一些了解,我们继续看它是怎么被使用和计算出来的。

InputFormat:

通过使用InputFormat,MapReduce框架可以做到:

1、验证作业的输入的正确性

2、将输入文件切分成逻辑的InputSplits,一个InputSplit将被分配给一个单独的Mapper task

3、提供RecordReader的实现,这个RecordReader会从InputSplit中正确读出一条一条的K-V对供Mapper使用。

public abstract class InputFormat<K, V> { public abstractList<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;public abstractRecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException;}

上面是InputFormat的源码,getSplits用来获取由输入文件计算出来的InputSplits,我们在后面会看到计算InputSplits的时候会考虑到输入文件是否可分割、文件存储时分块的大小和文件大小等因素;而createRecordReader()提供了前面第三点所说的RecordReader的实现,以将K-V对从InputSplit中正确读出来,比如LineRecordReader就以偏移值为key,一行的数据为value,这就使得所有其createRecordReader()返回了LineRecordReader的InputFormat都是以偏移值为key,一行数据为value的形式读取输入分片的。

FileInputFormat:愚者用肉体监视心灵,智者用心灵监视肉体

[Hadoop源码解读](一)MapReduce篇之InputFormat

相关文章:

你感兴趣的文章:

标签云: