具体算法(HashedWheelTimer)

在一些需要实时消息功能的网站应用中,除了客户端轮询请求服务器获取消息外,还有一种方案就是comet长连接推送消息。显然后者更具有优势,实时性高,客户端、服务端压力都比较小。对于长连接方案,我们需要考虑对长连接进行管理,以便有消息时可以推送到客户端。可是对于大量长连接的建立服务器是有可能会被压垮的,所以不是一味的接受连接和hold住连接就可以的,需要对连接进行超时管理,如果超时需要推送重连消息或是关闭连接。我们可以初略分析下具体的管理逻辑。

先简单分析下具体的应用场景:用户登录一个电商网站(苏宁,国美),恰好在大量客户登录之后,商家(海尔、格力等)推出了一系列优惠活动(针对注册用户),需要及时推送到浏览器让用户了解,不需要用户刷新页面。

就这么一个常见的功能,我们需要怎么做呢?首先,用户打开页面,客户端js就需要请求长连接,并携带用户信息(id)过来,服务器使用list、queue之类的存储起来,记录当时的请求时间;另起一个守护线程,扫描是否有超时或需要断开的连接,超时的需要发重连消息,断开的发断开消息;如果有消息需要推送从list、queue中获取连接推送即可。好像没什么问题,对于超时处理,遍历大量元素耗时太长是接受不了的,那么排除list存储,queue具有FIFO的特性,每次检查超时,只检查表头元素即可,这样是很快。但是忽略了重连这个问题,间隔时间内重连,不能重新new一个连接对象(大量新对象耗内存伤不起),只能复用之前的对象,之前对象的time就会发生变化,queue就会变成不按时间排序的有序队列了,如果按之前的处理方式就会阻塞在表头,无法处理了,并且queue的重连遍历也是个问题,队列本身遍历比较麻烦。从现在来看单一的数据结构很难处理这样的问题,需要多个数据结构结合使用。我们知道在linux内核中定时器的实现采用wheel。我们可以借鉴之,对于超时的处理我们使用hashedwheel,hashed+wheel,hash主要用于对时间进行处理,决定放在wheel的那个区间段中;wheel是一个数组,减小了遍历的数目。

关于HashedWheelTimer实现具体见代码。netty中有类似的实现,这里进行了简化。主要有:1. 守护线程由外部提供,该线程来调用hashedwheel;2. 假定我们的超时时间不会大于wheel的圈时长(假设wheel 1s转动一次,一共512个区间,,圈时长就为512s,我们的超时时间不会这么久),就不会有第几圈计算的问题;

package com.yq.algorithmic;/** * * * 描述:连接管理 * 创建时间:2011-9-16下午08:42:34 * @author yq76034150 * */public class TimeOut {final long time;final long deadline;private int rounds;//连接的用户id//连接的channelpublic TimeOut(long time, long deadline) {super();this.time = time;this.deadline = deadline;}public int getRounds() {return rounds;}public void setRounds(int rounds) {this.rounds = rounds;}public long getTime() {return time;}public long getDeadline() {return deadline;}public String toString(){return "survival: " + (System.currentTimeMillis() – time);}}

package com.yq.algorithmic;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;/** * * 描述:一个基于时间hash的wheel超时器 * 创建时间:2011-9-16上午11:44:02 * @author yq76034150 * */public class HashedWheelTimer {volatile int tick = 1;volatile int currentWheel;private Queue<TimeOut>[] wheel;final int mask;private long delay;//private int ticksPerWheel;private long tickDuration;//private long roundDuration;/** * 描述: * @param ticksPerWheel 一圈多少tick */public HashedWheelTimer(int ticksPerWheel, long tickDuration, long delay){mask = ticksPerWheel – 1;this.delay = delay; //this.ticksPerWheel = ticksPerWheel;this.tickDuration = tickDuration;//this.roundDuration = tickDuration * ticksPerWheel;createWheel(ticksPerWheel);}/** * * 描述:wheel中填充数据 * @param timeout */public void newTimeOut(TimeOut timeout){long shift = delay / tickDuration;//long remainRounds = delay / roundDuration;int stopIndex = currentWheel + (int)shift & mask;wheel[stopIndex].offer(timeout);}/** * * 描述:外部线程调用,外部线程调用间隔必须和tickDuration一致 */public void run(long start){//模拟外部执行线程执行间隔数try {Thread.sleep(tickDuration);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}tick++;currentWheel = currentWheel + 1 & mask;long deadline = start + tick * tickDuration;Queue<TimeOut> queue = wheel[currentWheel];TimeOut timeOut = queue.peek();if(timeOut != null){long firstDeadline = timeOut.getDeadline();boolean isDead = firstDeadline <= deadline;if(firstDeadline <= deadline){while(isDead){ //对一个tick中的队列元素递归检查超时queue.remove();System.out.println("delete: " + currentWheel + timeOut);timeOut = queue.peek();if(timeOut != null){firstDeadline = timeOut.getDeadline();isDead = firstDeadline <= deadline;} else {break;}}}}}/** * * 描述:初始化wheel * @param ticksPerWheel */private void createWheel(int ticksPerWheel) {wheel = new Queue[ticksPerWheel];for(int i = 0; i < ticksPerWheel; i++){wheel[i] = new ConcurrentLinkedQueue<TimeOut>();}}/** * 描述: * @param args */public static void main(String[] args) {long tickDuration = 1000;long delay = 9000;HashedWheelTimer timer = new HashedWheelTimer(4, tickDuration, delay);long start = System.currentTimeMillis();TimeOut timeout = new TimeOut(start, start + delay);timer.newTimeOut(timeout);timer.run(start);start = System.currentTimeMillis();timeout = new TimeOut(start, start + delay);timer.newTimeOut(timeout);timer.run(start);for(int i = 0; i < 10; i++){timer.run(start);}}}

上面的例子使用queue,会存在阻塞问题。测试代码如下:

接受失败,是我们不常听到或看到的一个命题,

具体算法(HashedWheelTimer)

相关文章:

你感兴趣的文章:

标签云: