K天熟悉Apache Storm (三)

软件版本:Storm:0.9.3 ,Redis:2.8.19;jedis:2.6.2;

代码及Jedis下载:Storm实时单词计数

Storm应用场景–实时单词计数,有点类似《Getting Started with Storm》中的chapter6的real-life app。

场景描述:

1. 使用一个java程序每间隔一定时间向Redis数据库A中存入数据;

2. Storm的Spout读取Redis数据库A中的数据,读取后删除Redis中的数据;

3. Storm的SplitBolt读取Spout的输出,对其进行解析,并输出;

4. Storm的CountBolt对SplitBolt的数据进行计数,并每隔一定间隔把数据存储在Redis数据库B中;

5. 另外的java程序定时读取Redis数据库B中的数据,并打印;

具体实现:

1. Java定时向Redis发送数据

while(true){// 每次发送3个数据try {Thread.sleep(200);// 每200毫秒产生一次数据} catch (InterruptedException e) {e.printStackTrace();}interval ++;int index = random.nextInt(normal.length);if(!jedis.exists("0")){// 如果不存在key说明已经被取走了,就再次产生,否则不产生jedis.set("0",normal[index]);}index = random.nextInt(normal.length);if(!jedis.exists("1")){jedis.set("1", normal[index]);}index = random.nextInt(normal.length);if(!jedis.exists("2")){jedis.set("2", normal[index]);}if(interval*200/1000==2*60) {// 每间隔200毫秒产生数据后,产生了2分钟,共2*60*1000/200*3 个数据记录// 暂停 5分钟System.out.println(new java.util.Date()+":数据暂定5分钟产生…");try {interval=0;Thread.sleep(5*60*1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new java.util.Date()+":5分钟暂停完成,继续产生数据…");}}这里使用一个固定的字符串数组,每次从里面随机抽取三个字符串,使用Jedis存储到Redis的数据库中;

2. Spout读取Redis数据

@Overridepublic void nextTuple() {long interval =0;while(true){// 获取数据interval++;String zero = getItem("0");String one = getItem("1");String two = getItem("2");try {Thread.sleep(200);// 每200毫秒发送一次数据} catch (InterruptedException e) {e.printStackTrace();}if(zero==null||one==null||two==null){// do nothing// 没有数据//if(interval%15==0){//}}else{String tmpStr =zero+","+one+","+two;if(thisTaskId==tmpStr.hashCode()%numTasks){ // spout负载均衡this.collector.emit(new Values(tmpStr));if(interval%15==0&&"fast".equals(slow_fast)){System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),taskId, "Spout:["+zero+","+one+","+two+"]"));}else if("slow".equals(slow_fast)){System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),taskId, "Spout:["+zero+","+one+","+two+"]"));}else{new RuntimeException("Wrong argument!");}}}}}这里使用了负载均衡,Spout处理的数据按task进行分隔。

getItem用于从Redis中获取数据,并删除对应的数据,代码如下:

/** * Redis中获取键值并删除对应的键 * @param index */private String getItem(String index){if(!jedis.exists(index)){return null;}String val = jedis.get(index);//if(val==null||"null".equals("null")){//return ;//}jedis.del(index);return val;}3. SplitBolt就是一般的单词分隔代码:

public void execute(Tuple input, BasicOutputCollector collector) {interval++;String sentence = input.getString(0);if(interval%15==0&&"fast".equals(slow_fast)){//System.out.println(new java.util.Date()+":ConponentId:"+conponentId+",taskID:"+taskId+//"splitBolt:"+sentence);System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "splitBolt:"+sentence));}else if("slow".equals(slow_fast)){System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "splitBolt:"+sentence));}String[] words = sentence.split(",");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}4. CountBolt进行单词计数,并向Redis数据库中存储单词的计数

public void execute(Tuple input, BasicOutputCollector collector) {interval++;String str = input.getString(0);/** * If the word dosn’t exist in the map we will create * this, if not We will add 1*/if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}// 每records条数据则向向数据库中更新if(interval%records==0){for(Map.Entry<String , Integer> m :counters.entrySet()){jedis.set(m.getKey(), String.valueOf(m.getValue()));//}}}5. Java程序定时读取Redis中单词计数,,并打印

private void read() {System.out.println("数据获取开始。。。,10s后打印。。。");long interval =0;while(true){// 获取数据interval++;Set<String> keys = jedis.keys("*");for(String key:keys){push2Map(key);}//push2Map("one");try {Thread.sleep(200);// 每200毫秒获取一次数据} catch (InterruptedException e) {e.printStackTrace();}if(interval*200/1000==10) {// 每10秒打印一次interval=0;printMap();}}}Storm作为实时大数据处理框架,从这个小例子中就可以感受一二。

ps:相关调用接口:

没有伞的孩子必须努力奔跑!

K天熟悉Apache Storm (三)

相关文章:

你感兴趣的文章:

标签云: