基于Hadoop的Mapreduce编程(一)

翻译的一篇国外的关于hadoop mapreduce的文章,文章比较长,先翻译第一部分吧

翻译者:pconlin900

博客:http://pconline900.javaeye.com

Hadoop是apache的一个开源的map-reduce框架,MapReduce是一个并行计算模型,用来处理海量数据。模型思想来源于Google的Jeffrey Dean 和 Sanjay Ghemawat,包括map() reduce()两个主要的功能。

这是一个很简单的类似于Hadoop的MapReduce应用例子,应用了mapreduce的基本思想,可以帮助理解hadoop的处理思想和技术,但注意,它没有使用hadoop框架。

例子的功能是创建一些字符串,然后统计这些字符串里面每个字符出现的次数,最后汇总得到总的字符出现次数。

Listing 1. 主程序

public class Main{  public static void main(String[] args)  {    MyMapReduce my = new MyMapReduce();    my.init();  }

Listing 2. MyMapReduce.java

import java.util.*;public class MyMapReduce{List buckets = new ArrayList();List intermediateresults = new ArrayList();List values = new ArrayList();public void init(){for(int i = 1; i<=30; i++){values.add("http://pconline900.javaeye.com" + new Integer(i).toString());}  System.out.println("**STEP 1 START**-> Running Conversion into Buckets**");System.out.println();List b = step1ConvertIntoBuckets(values,5);    System.out.println("************STEP 1 COMPLETE*************");    System.out.println();    System.out.println();  System.out.println("**STEP 2 START**->Running **Map Function** concurrently for all    Buckets");System.out.println();List res = step2RunmapFunctionForAllBuckets(b);System.out.println("************STEP 2 COMPLETE*************");    System.out.println();    System.out.println();System.out.println("**STEP 3 START**->Running **Reduce Function** for collating Intermediate Results and Printing Results");System.out.println();step3RunReduceFunctionForAllBuckets(res);System.out.println("************STEP 3 COMPLETE*************");         System.out.println("************pconline900 翻译*************");         System.out.println("***********博客:http://pconline900.javaeye.com*************");}public List step1ConvertIntoBuckets(List list,int numberofbuckets){int n = list.size();int m = n / numberofbuckets;int rem = n% numberofbuckets;int count = 0;System.out.println("BUCKETS");for(int j =1; j<= numberofbuckets; j++){List temp = new ArrayList();for(int i=1; i<= m; i++){temp.add((String)values.get(count));count++;}buckets.add(temp);temp = new ArrayList();}if(rem != 0){List temp = new ArrayList();for(int i =1; i<=rem;i++){temp.add((String)values.get(count));count++;}buckets.add(temp);}    System.out.println();System.out.println(buckets);System.out.println();return buckets;}public List step2RunmapFunctionForAllBuckets(List list){for(int i=0; i< list.size(); i++){List elementList = (ArrayList)list.get(i);new StartThread(elementList).start();}    try    {Thread.currentThread().sleep(1000);}catch(Exception e){}return intermediateresults;}public void step3RunReduceFunctionForAllBuckets(List list){int sum =0;for(int i=0; i< list.size(); i++){//you can do some processing here, like finding max of all results etcint t = Integer.parseInt((String)list.get(i));sum += t;}System.out.println();System.out.println("Total Count is "+ sum);System.out.println();}class StartThread extends Thread{private List tempList = new ArrayList();public StartThread(List list){tempList = list;}public void run(){for(int i=0; i< tempList.size();i++){String str = (String)tempList.get(i);synchronized(this)           {intermediateresults.add(new Integer(str.length()).toString());}}}}}

init()方法创建了一些测试数据,作为测试数据。实际应用中会是海量数据处理。

step1ConvertIntoBuckets()方法将测试数据拆分到5个 bucket中,每个bucket是一个ArrayList(包含6个String数据)。bucket可以保存在内存,磁盘,或者集群中的其他节点;

step2RunmapFunctionForAllBuckets()方法创建了5个线程(每个bucket一个),每个线程StartThread处理每个bucket并把处理结果放在intermediateresults这个arraylist中。

如果bucket分配给不同的节点处理,必须有一个master主控节点监控各个节点的计算,汇总各个节点的处理结果,若有节点失败,master必须能够分配计算任务给其他节点计算。

step3RunReduceFunctionForAllBuckets()方法加载intermediateresults中间处理结果,并进行汇总处理,最后得到最终的计算结果。

什么天荒地老,什么至死不渝。都只是锦上添花的借口…

基于Hadoop的Mapreduce编程(一)

相关文章:

你感兴趣的文章:

标签云: