hadoop mapreduce任务中,map任务数的确定

原理

我们知道在创建一个mapreduce任务的时候,需要指定Inputformat的类型,,我们就从这里入手了解一个mr任务中,是如何读取数据的,首先来看一个具体的inputformat的子类源码(hadoop2.6版本源码)

#segment_1package org.apache.hadoop.mapreduce.lib.input;import org.apache.hadoop.classification.InterfaceAudience;import org.apache.hadoop.classification.InterfaceStability;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.io.compress.SplittableCompressionCodec;import org.apache.hadoop.mapreduce.InputFormat;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import com.google.common.base.Charsets;/** An {@link InputFormat} for plain text files. Files are broken into lines. * Either linefeed or carriage-return are used to signal end of line. Keys are * the position in the file, and values are the line of text.. */@InterfaceAudience.Public@InterfaceStability.Stable<RecordReader<LongWritable, Text>createRecordReader(InputSplit split,TaskAttemptContext context) {String delimiter = context.getConfiguration().get(“textinputformat.record.delimiter”);byte[] recordDelimiterBytes = null;if (null != delimiter)recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);LineRecordReader(recordDelimiterBytes); }}

那么了解LineRecordReader则是最关键的,首先看原理图:

上图中,split是对hdfs文件的分片,里面封装了关于分片的大小,以及超始信息,路径信息等; block就是hdfs中的文件分块;而readReader则是上面介绍的recordReader,它根据split中的分片信息读取数据;

所以分析的流程就是:生成分片信息—->处理分片信息;

生成分片信息

segment_1源码处,TextInputFormat 继承了FileInputFormat,FileInputFormat里面实现了对split分片信息的获取,源码如下:

package org.apache.hadoop.mapreduce.lib.input;<String SPLIT_MINSIZE = “mapreduce.input.fileinputformat.split.minsize”;String SPLIT_MAXSIZE = “mapreduce.input.fileinputformat.split.maxsize”; /*** Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException */ public List<InputSplit> getSplits(JobContext job) throws IOException {Stopwatch sw = new Stopwatch().start();//下面两行是最重要的原理代码,即获得最大最小的分片量;long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);for (FileStatus file: files) {Path path = file.getPath();long length = file.getLen();//获取block信息;if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;//循环切割block,生成分片信息;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//这里表明了split包含的信息;splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitablesplits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else {//Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgenjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug(“Total # of splits generated by getSplits: ” + splits.size()+ “, TimeTaken: ” + sw.elapsedMillis());}return splits; }(JobContext job){return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);} () {return 1; } (JobContext context) {return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE); } (long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize)); }}处理分片信息package org.apache.hadoop.mapreduce.lib.input;<LongWritable, Text>{ (byte[] recordDelimiter) {this.recordDelimiterBytes = recordDelimiter; } (InputSplit genericSplit,TaskAttemptContext context) throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();// open the file and seek to the start of the splitfinal FileSystem fs = file.getFileSystem(job);fileIn = fs.open(file);CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);if (null!=codec) {isCompressedInput = true;decompressor = CodecPool.getDecompressor(codec);if (codec instanceof SplittableCompressionCodec) {//创建可分片的压缩流;final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.BYBLOCK);in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes);start = cIn.getAdjustedStart();end = cIn.getAdjustedEnd();filePosition = cIn;} else {in = new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes);filePosition = fileIn;}} else {fileIn.seek(start);in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);filePosition = fileIn;}(start != 0) {start += in.readLine(new Text(), 0, maxBytesToConsume(start));}//初始化读取位置;this.pos = start; }() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;(getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {if (pos == 0) {newSize = skipUtfByteOrderMark();} else {newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));pos += newSize;}if ((newSize == 0) || (newSize < maxLineLength)) {break;}// line too long. try againLOG.info(“Skipped line of size ” + newSize + ” at pos ” +(pos – newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;} }}//这里才是真正的如何调用分片信息;package org.apache.hadoop.mapreduce.lib.input;{ () throws Exception {Job job = Job.getInstance(conf);FileSystem fs = FileSystem.getLocal(conf);Path dir = new Path(System.getProperty(“test.build.data”,”.”) + “/mapred”);Path file = new Path(dir, “test.seq”);int seed = new Random().nextInt();Random random = new Random(seed);fs.delete(dir, true);FileInputFormat.setInputPaths(job, dir);// for a variety of lengthsfor (int length = 0; length < MAX_LENGTH;length += random.nextInt(MAX_LENGTH / 10) + 1) {// create a file with length entriesSequenceFile.Writer writer =SequenceFile.createWriter(fs, conf, file,IntWritable.class, LongWritable.class);try {for (int i = 0; i < length; i++) {IntWritable key = new IntWritable(i);LongWritable value = new LongWritable(10 * i);writer.append(key, value);}} finally {writer.close();}TaskAttemptContext context = MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());// try splitting the file in a variety of sizesInputFormat<Text, Text> format =new SequenceFileAsTextInputFormat();for (int i = 0; i < 3; i++) {// check each splitBitSet bits = new BitSet(length);int numSplits =random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;FileInputFormat.setMaxInputSplitSize(job,fs.getFileStatus(file).getLen() / numSplits);for (InputSplit split : format.getSplits(job)) {RecordReader<Text, Text> reader =format.createRecordReader(split, context);//可以看到一个mapContext是处理多个分片信息的;MapContext<Text, Text, Text, Text> mcontext =new MapContextImpl<Text, Text, Text, Text>(job.getConfiguration(),context.getTaskAttemptID(), reader, null, null,MapReduceTestUtil.createDummyReporter(),split);reader.initialize(split, mcontext);Class<?> readerClass = reader.getClass();assertEquals(“reader class is SequenceFileAsTextRecordReader.”,SequenceFileAsTextRecordReader.class, readerClass);Text key;try {int count = 0;while (reader.nextKeyValue()) {key = reader.getCurrentKey();int keyInt = Integer.parseInt(key.toString());assertFalse(“Key in multiple partitions.”, bits.get(keyInt));bits.set(keyInt);count++;}} finally {reader.close();}}assertEquals(“Some keys in no partition.”, length, bits.cardinality());}} }}总结

mr任务当中,map任务数是由分片数的上限决定的,也就是说,如果不指定map数,可通过修改最小分片量以及最大分片量,然后程序分自动生成map任数量

而消极的人则在每个机会都看到某种忧患。

hadoop mapreduce任务中,map任务数的确定

相关文章:

你感兴趣的文章:

标签云: