Hadoop源码分析:HDFS数据读写流量控制(DataTransferThrottler类

DataTransferThrottler类用于在Datanode读写数据时控制数据传输速率。这个类是线程安全的,可以被多个线程共享使用。使用方式是先构造DataTransferThrottler对象并设置周期period和带宽bandwidthPerSec,然后在数据实际读写前调用DataTransferThrottler.throttle()方法。如果I/O的速率相对给定的带宽太快,则该方法会将当前线程wait。

两个构造函数

双参构造函数,可以设置周期period和带宽bandwidthPerSec。

public DataTransferThrottler( long period,long bandwidthPerSec)

单参构造函数,可以设置带宽bandwidthPerSec, 周期period默认被设置为500ms。

public DataTransferThrottler( long bandwidthPerSec)

重要属性

period周期,单位毫秒

periodExtension周期扩展时间,单位毫秒

bytesPerPeriod一个周期内可以发送/接收的byte总数

curPeriodStart当前周期开始时间,单位毫秒

curReserve当前周期内还可以发送/接收的byte数

bytesAlreadyUsed当前周期内已经使用的byte数

带宽bandWidthPerPerSec = bytesPerPeriod * period/1000

DataTransferThrottler.throttle()方法

该方法有两个入参:

numOfBytes是自从上次throttle被调用到现在,需要发送/接收的byte数; canceler是一个外部取消器,可以用来检测是否取消流量控制。

DataTransferThrottler.throttle()方法会先将curReserve减去numOfBytes,接着执行如下逻辑。

如果curReserve大于0,则说明当前周期还有余量,throttle方法直接返回。 如果curReserve小于等于0,则说明当前周期已经没有余量,会执行下面逻辑。

>如果当前时间now小于当前周期结束时间curPeriodEnd,,则wait到下一个周期

>如果当前时间now大于当前周期结束时间curPeriodEnd并且小于curPeriodStart+periodExtension,说明已经进入下一个周期并且throttle应该不是很长时间没有使用,则将curReserve加上下一个周期可以传输的byte总数bytesPerPeriod,并将curPeriodStart设置到下一个周期的开始。

>如果当前时间now大于curPeriodStart+periodExtension,则可能Throttler很长时间没有使用,则抛弃上一个周期。

DataTransferThrottler.throttle()方法源码如下:

publicsynchronizedvoid throttle(long numOfBytes , Canceler canceler) { if ( numOfBytes <= 0 ) { return; } //当前周期余量减去需要发送/接收的byte数numOfBytes curReserve -= numOfBytes ; bytesAlreadyUsed += numOfBytes; //如果curReserve小于等于0,则说明当前周期已经没有余量 while ( curReserve <= 0) { //如果传入了有效取消器canceler,并且取消器的取消状态isCancelled是true,则直接退出while循环 if (canceler != null && canceler.isCancelled()) { return; } long now = monotonicNow(); //计算当前周期结束时间,并存放在curPeriodEnd变量中 long curPeriodEnd = curPeriodStart + period ; if ( now < curPeriodEnd ) { //等待下一个周期,这样curReserve就可以增加 try { wait( curPeriodEnd – now ); } catch (InterruptedException e) { //终止throttle, 并且重置interrupted状态来确保在调用栈中其它interrupt处理器可以正确执行 Thread. currentThread().interrupt(); break; } } //如果当前时间now比当前结束时间curPeriodEnd晚,并且小于curPeriodStart+periodExtension(周期3倍时间),则进入下一个周期 //并增加bytesPerPeriod到curReserve elseif ( now < (curPeriodStart + periodExtension)) { curPeriodStart = curPeriodEnd; curReserve += bytesPerPeriod ; } //如果当前时间now大于curPeriodStart+periodExtension,则可能Throttler很长时间没有使用,抛弃上一个周期 else { curPeriodStart = now; curReserve = bytesPerPeriod – bytesAlreadyUsed; } } bytesAlreadyUsed -= numOfBytes;}

总结

DataTransferThrottler类可以控制数据传输在一段时期内的平均速率。但可能在一个周期结束时瞬时速率会失控。

参考资料

hadoop-release-2.5.0源码

转载请附上原博客地址:

有勇气并不表示恐惧不存在,而是敢面对恐惧克服恐惧

Hadoop源码分析:HDFS数据读写流量控制(DataTransferThrottler类

相关文章:

你感兴趣的文章:

标签云: