MapReduce对输入多文件的处理2自定义FileInputFormat类

多种自定义文件格式的文件输入处理

MultipleInputs可以让MR支持多种输入格式比如我们有两种文件格式,那么我们就要有两套Record Class,RecordReader和InputFormatInputFormat(extends FileInputFormat)—>RecordReader(extends RecordReader)—>RecordClass(implements Writable)MultipleInpts需要不同的InputFormat,一种InputFormat使用一种RecordReader来读取文件并返回一种Record格式的值这就是这三个典型的关系,也是map过程中涉及的三个步骤的工具和产物

数据准备a文件1t802t903t1004t505t73

b文件1tlilit32txiaomingt33tfeifeit34tzhangsant35tlisit3

t表示分隔符

设计思路

将t前面的Text表示给map将要输入的keyt后面的作为给map要输入的value要求自定义实现InputFormat,输出key,value格式数据。以产生Map的输入的数据(key,value)

!!!三个文件步骤!!!

InputFormat(extends FileInputFormat)—>RecordReader(extends RecordReader)—>RecordClass(implements Writable)

本例是对两个文件操作

1.两个RecordClass类(实现Writable接口)

package test.mr.multiinputs2;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;/* * 对map输入的value的预处理 * 对原始数据的预加工 *//* * 第一张表数据 */public class FirstClass implements Writable {private String value;public String getValue() {return value;}public void setValue(String value) {this.value = value;}public FirstClass() {super();// TODO Auto-generated constructor stub}public FirstClass(String value) {super();this.value = value;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(this.value);}@Overridepublic void readFields(DataInput in) throws IOException {this.value = in.readUTF();}@Overridepublic String toString() {return "FirstClasst" + value;}}package test.mr.multiinputs2;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;/* * 对map输入的value的预处理 * 对原始数据的预加工 *//* * 第二张表数据 */public class SecondClass implements Writable {private String username;private int classNo;public SecondClass() {super();}public SecondClass(String username, int classNo) {super();this.username = username;this.classNo = classNo;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public int getClassNo() {return classNo;}public void setClassNo(int classNo) {this.classNo = classNo;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(username);out.writeInt(classNo);}@Overridepublic void readFields(DataInput in) throws IOException {this.username = in.readUTF();this.classNo = in.readInt();}@Overridepublic String toString() {return "SecondClasst" + username + "t" + classNo;}}

2.两个自定义RecordReader类(继承RecordReader类)

package test.mr.multiinputs2;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;public class FirstRecordReader extends RecordReader<Text, FirstClass> {// 定义一个真正读取split中文件的读取器private LineRecordReader lineRecordReader = null;private Text key = null;private FirstClass value = null;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {close();lineRecordReader = new LineRecordReader();lineRecordReader.initialize(split, context);}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {// 没有读取到东西if (!lineRecordReader.nextKeyValue()) {key = null;value = null;return false;}Text val = lineRecordReader.getCurrentValue();String line = val.toString();String[] str = line.split("t");key = new Text(str[0]);value = new FirstClass(str[1].trim()); // 实现对原始数据的预分割return true;}// 读取key的当前值@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}// 读取value的当前值@Overridepublic FirstClass getCurrentValue() throws IOException,InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return lineRecordReader.getProgress();}@Overridepublic void close() throws IOException {if (null != lineRecordReader) {lineRecordReader.close();lineRecordReader = null;}key = null;value = null;}}

package test.mr.multiinputs2;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;public class SecondRecordReader extends RecordReader<Text, SecondClass> {// 定义一个真正读取split中文件的读取器private LineRecordReader lineRecordReader = null;private Text key = null;private SecondClass value = null;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {close();lineRecordReader = new LineRecordReader();lineRecordReader.initialize(split, context);}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!lineRecordReader.nextKeyValue()) {key = null;value = null;return false;}Text val = lineRecordReader.getCurrentValue();String line = val.toString();String str[] = line.split("t");key = new Text(str[0]);value = new SecondClass(str[1], Integer.parseInt(str[2]));return true;}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}@Overridepublic SecondClass getCurrentValue() throws IOException,InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return lineRecordReader.getProgress();}@Overridepublic void close() throws IOException {if (null != lineRecordReader) {lineRecordReader.close();lineRecordReader = null;}key = null;value = null;}}也许不是自己该去发挥的地方,还是让自己到最适合自己战斗的方面去吧!勇敢的接受自己的失败,

MapReduce对输入多文件的处理2自定义FileInputFormat类

相关文章:

你感兴趣的文章:

标签云: