琴弦上、漫步

一、MapReduce概述 MapReduce 是 Hadoop 的核心组成, 是专用于进行数据计算的,是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.

MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入输出信息。 map、reduce键值对格式

二、MapReduce体系结构及工作流程 1、JobTracker 负责接收用户提交的作业,负责启动、跟踪任务执行。 JobSubmissionProtocol是JobClient与JobTracker通信的接口。 InterTrackerProtocol是TaskTracker与JobTracker通信的接口。

2、TaskTracker 负责执行任务。

3、JobClient 是用户作业与JobTracker交互的主要接口。 负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。

4、工作流程图

执行步骤:

1.map任务处理 1.1读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。

1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

1.3 对输出的key、value进行分区。

1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

1.5 (可选)分组后的数据进行归约。

2.reduce任务处理

2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

2.3 把reduce的输出保存到文件中。

三、统计单词源代码及注释

package mapreduce;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountApp {static final String INPUT_PATH = “hdfs://liguodong:9000/hello”;static final String OUT_PATH = “hdfs://liguodong:9000/out”;public static void main(String[] args) throws Exception {Configuration conf = new Configuration();final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);if(fileSystem.exists(new Path(OUT_PATH))){fileSystem.delete(new Path(OUT_PATH),true);}final Job job = new Job(conf, WordCountApp.class.getSimpleName());//1.1 输入目录在哪里FileInputFormat.setInputPaths(job, INPUT_PATH);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//2.3 指定输出的路径FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));job.waitForCompletion(true);}/*** KEYIN 即K1表示每一行的起始位置(偏移量offset)* VALUEIN 即v1表示每一行的文本内容* KEYOUT 即k2表示每一行中的每个单词* VALUEOUT 即v2表示每一行中的每个单词的出现次数,固定值1* @author liguodong* JavaHadoop* LongLongWritable* StringText*/static <void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws IOException, InterruptedException {final String[] splited = value.toString().split(” “);for (String word : splited) {context.write(new Text(word), new LongWritable(1));}}}/*** KEYIN 即k2表示每一行中的每个单词* VALUEIN 即v2表示每一行中每个单词出现次数,固定值1* KEYOUT 即k3表示整个文件中的不同单词* VALUEOUT 即v3表示整个文件中的不同单词的出现总次数* @author liguodong*/static <void reduce(Text k2, Iterable<LongWritable> v2s,Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws IOException, InterruptedException {long sum = 0L;for (LongWritable v2 : v2s) {sum += v2.get();}context.write(k2, new LongWritable(sum));}} }

四、最小的MapReduce驱动

Configuration configuration = new Configuration();Job job = new Job(configuration, “HelloWorld”);job.setInputFormat(TextInputFormat.class);job.setMapperClass(IdentityMapper.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(Text.class);job.setPartitionerClass(HashPartitioner.class);job.setNumReduceTasks(1);job.setReducerClass(IdentityReducer.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(Text.class);job.setOutputFormat(TextOutputFormat.class);job.waitForCompletion(true);

五、MapReduce驱动默认的设置

六、Hadoop序列化与基本类型

1、序列化概念 序列化(Serialization)是指把结构化对象转化为字节流。 反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。 Java序列化(java.io.Serializable)

2、Hadoop序列化格式特点 紧凑:高效使用存储空间。 快速:读写数据的额外开销小 可扩展:可透明地读取老格式的数据 互操作:支持多语言的交互

Hadoop的序列化格式:Writable

3、Hadoop序列化的作用

序列化在分布式环境的两大作用:进程间通信,永久存储。 Hadoop节点间通信。

4、基本数据类型 Hadoop的数据类型要求必须实现Writable接口。

Writable接口是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.

等待故人的归来。山上的树,大多数是松树比较突出。

琴弦上、漫步

相关文章:

你感兴趣的文章:

标签云: