Java同步技术(二)

版权声明

作者联系并注明出处http://blog.csdn.net/Iangao/archive/2008/10/09/3041364.aspx。

1.3、信号量(semaphore)——现代同步解决方案的基础

1.3.1、PV信号量简介

1.3.1.1 普通信号量

信号量(semaphore)是由Edsger Dijkstra于1968年发明的后来被作为第一个用于实现进程同步的面向软件的原语并成为了现在同步解决方案的基础。它是一个用于表示系统可用资源数量的非负整数s,当s增加时表示有资源被释放了,当s减少时则表示有资源被占用了,当s为0时则表示现在没有资源可以使用。在Dijkstra最初的论文中还同时定义了两个操作——P操作和V操作。其中P操作是荷兰语proberen的缩写意为”检测”,当线程执行P操作时会试图获取一个信号(资源)的使用权. 如果当前没有信号(s=0)线程会进入等待状态直到有信号为止,如果有信号线程就会立即占有(减少)一个(s–),然后独占的使用它完成后续的操作。V操作是荷兰语verhogen的缩写,意为”增量”.当线程执行V操作时,会释放(增加)一个信号量(s++)。由于这种信号量通常是用来计录可用资源数目的,所以我们又称它为“计数信号量”。

信号量的基本语义如下:s:是一个非负的整数// 表示当前可用共享资源的个数P(s):[s=s+1]// 资源数加1,表示新增(或释放)了一个共享资源.V(s):[while(s==0){wait};s=s-1]// 表示: 1.如果没有资源,那么线程进入等待状态,直到有空闲的资源为止;          2.获取资源后,资源数量减1.表示一个资源被占用(锁住)了.方括号表示:其中的操作是不可分割的(indivisible)原子的(atomic)操作.  

1.3.1.2 二值信号量

在Dijkstra定义的信号量中有一种只能取0和1两个值的信号量——二值信号量(binary semaphores)。由于它可以表示两个互斥的状态,因此我们也称它为互斥(Mutex)信号量。我们一般使用它来定义临界区。这时它会被初始化为1(并不总是这样)表示临界区处于空闲状态。当进入临界区时对信号执行P操作将其变为0,这样就可以表示临界处于工作状态了。当退出临界区时,再通过执行V操作使得信号量重新被置为1。这样的话,P操作就相当于加锁lock()。而V操作就相当于解锁unlock()了。

下面是对二值信号量的示意性说明

shared double balance, account;//共享变量shared int lock=FALSE;//同步变量Program for P1 …enter(lock);// 获取资源并加锁balance=balance+account;// <临界区> exit(lock);// 释放资源并解锁… shared double balance, account;//共享变量shared semaphore mutex=1; //同步信号量Program for P1 …p(mutex);// 获取资源并加锁balance=balance+account;// <临界区> v(mutex);// 释放资源并解锁…

不过在实现互斥信号量时,我们还要考虚到加锁和解锁的层次问题,也就是说同一个线程可能对同一互斥信号量重复加锁。比如在一个临界区内调用另一个含有相同临界区定义的方法时,就会出现这个多重锁的问题,所以我们认为Mutex的实现应该要有如下几个约束。

Mutex约束:

1. 信号值只能取0,12. 为了保证临界区的原子性,如果Mutex已经被加锁了(执行了P操作),则必须由同一线程解锁(V操作).3. 允许同一线程多层加锁并只在最外层的锁被解开时才真正的释放对Mutex的控制权,而内层锁的pv操作只做计数处理.4. 要有一次解多层锁和加多层锁的功能,主要用于多重锁的释放和恢复.(在后面的条件变量中会用到)

1.3.2、PV信号量的Java实现

1.3.2.1 信号量Semaphore

为了实现信号量,我们创建一个Semaphore信号类,并加入用来实现Dijkstra定义的P操作和V操作的两个方法p()、v()。代码清单如下:

/*** 信号类* @author iangao*/public class Semaphore { private volatile long s=1; //信号量,非负数整数,初始信号为1;(valatile:可以保证s不被优化处理,保证多线程中处理的是同一块内存) public Semaphore(){} /** * 根据传入的初始信号量初始化s * @param init 初始信号量 */ public Semaphore(long init){ if(init>0) this.s=init; } public long queue(){ return queue; }}/*** V 操作: * 1. synchronized标识可以保证操作是不可分割和原子的。* 2. 无论当前是否有等待线程,都保存当前的v操作信号.* @param increaseCount 信号的增值.*/ public synchronized void v(long increaseCount){ s+=increaseCount; // 增加信号量(不论是否有等等线程) notifyAll(); // 如果有等待线程,那么 就唤醒一个. return true;}public synchronized void v(){ v(1); }/*** P 操作:* 1.synchronized标识可以保证操作是不可分割和原子的。* 2.因为实际应用中p操作的等待有时 可能是限时的,* 所以此处引入mills参数,用来设置这个等待时间* @param acquireCount 请求资源数* @param mills 等待时间,如果为0,则表示死等* @return true:获得了资源控制权, false:未获得资源控制权*/public synchronized boolean p(long acquireCount,long mills) throws InterruptedException{ long preTime=System.currentTimeMillis(); boolean hold=false; // 无信号, 等待 while(hold=s<acquireCount){ wait(mills); // 等mills时间,0则死等 if(mills>0){ // 有限等待,则.. long now=System.currentTimeMillis() mills-=now-preTime;// 重新计算mills值 if(mills<=0) break; // 超时,退出循环 preTime=now; } } // 只所以此处加一个if,是因为超时时 信号量可能是不足的. if(hold) return false; // 超时 // 减少信号量: 即有空闲资源时,锁定 请求的资源 s-=acquireCount; // 如果有多余信号,则继续唤醒操作 if(s>0) notifyAll(); return true;}public synchronized boolean p(long mills) { p(1,mills); }public synchronized boolean p() { p(0); }1.3.2.2 互斥信号量Mutex

1. synchronized关健字

我们知道Java中的synchronized关键字已经可以很好的完成了对临界区的定义了,它为我们提供的功能是在其他语言中需要便用互斥信号量Mutex(或者叫锁(lock))才能完成的功能。请参看下面的示意性代码清单:

private double balance, account; //共享变量private Object mutexObj=new Object(); //同步对象Program for P1 …synchronized(mutexObj){ // 获取资源并加锁 balance=balance+account; // <临界区> } // 释放资源并解锁… shared double balance, account;//共享变量shared semaphore mutex=1; //同步信号量Program for P1 …p(mutex);// 获取资源并加锁balance=balance+account;// <临界区> v(mutex);// 释放资源并解锁…

2. Mutex类

虽然synchronized关键字可以满足我们大部分的同步需求.但是还是有一些特殊的情况(比如在后面我们要讨论的”条件变量”就会用到)会需要在两个不同的方法中去完成加锁和解锁操作,这时sychronized就不适用了.因此我们还是有必要创建一个Mutex类去完成与synchronized相同的功能. 代码清单如下:

/*** 互斥信号量* @author iangao*/public class Mutex { private volatile Thread owner=null; // 拥有控制权的线程指针 private volatile long holdCount=0; // 加锁层数 private volatile Semaphore lock; // 锁控制权信号量 /** * 互斥信号量(初始:未锁状态) */ public Mutex() { this(false); } /** * 互斥信号量,并通过hold参数设置初始值 * @param hold: true 锁, false 未锁 */ public Mutex (boolean hold){ lock=new Semaphore(hold?0:1); }

/** * 加锁: 首次加锁时,锁定控制权,并记录控制线程,其他层p操作只做计数处理 * @param mills 等待mills, 如果mills为0,则死等. * @param holdCount 本次加锁的层数 */ boolean p(long mills, long holdCount) throws InterruptedException { boolean success=true; synchronized(lock){ // <lock临界区> // 首次加锁(<当前线程尚未加锁>)时,做P操作,并记录加锁线程 if(Thread.currentThread()!=owner ){ success = lock.p(mills); // 首次加锁:锁定控制权 if(!success) return false; // 加锁失败 this.owner=Thread.currentThread(); // 计录线程 } // 增加加锁层数 this.holdCount+=holdCount; } return success; } public boolean p(long mills) throws InterruptedException { return p(1,mills); } public boolean p() throws InterruptedException { return p(0); }

/** * 解锁: * 1. 前置条件是“如果已经被锁,那么必须当前线程加的锁,否则不能解” * 2. 解到最后一道锁时,释放控制权并清除控制线程指针 * @param relinquish 是否放弃临界区(解除每一层锁),true:全解 false:只解一层 * @return 释放锁的层数 */ long v(boolean relinquish){ // 如果已经加锁,那么必须由加锁的线程解锁 if(owner!=null && Thread.currentThread()!=owner) throw new IllegalMonitorStateException( “current thread isn’t the owner of the mutex”); // 解锁,释放线程 synchronized(lock){ // <lock临界区> long release=relinquish?holdCount:1; // 计算解锁层数 if(holdCount>0) holdCount-=release; // 如果有锁,则减少加锁层数 if(holdCount==0){// 解到最后一道锁时,释放控制权并清除控制线程指针 lock.v(); // 解锁:释放控制权 owner=null; // 清除控制线程指针 } return release; } } public boolean v() { return v(false); }}

1.3.2.3 信号量的简单测试

1. 普通信号量测试

下面我们针对信号量的使用做一个简单的测试。测试内容是创建三个线程,并让它们按照一定的顺序依次执行。其测试代码及测试结果如下:

public class SemaphoreTest { public static void main(String[] args){ /** * 创建3个线程,对线程的执行要求: 依次执行线程2、线程3、线程1 * @author iangao */ new ThreadsTest(){ Semaphore t3Begin=new Semaphore(0); // 没有t3开始信号 Semaphore t3Done=new Semaphore(0); // 没有t3结束信号 public void runInThread1() throws InterruptedException{ t3Done.p(); // 等待t3结束信号 doingSomething(1000); } public void runInThread2(){ doingSomething(2000); t3Begin.v(); // 发出t3开始信号 } public void runInThread3() throws InterruptedException{ t3Begin.p(); // 等待t3开始信号 doingSomething(3000); t3Done.v(); // 发出t3结束信号 } private void doingSomething(long runTime) { output(“执行… (“+(runTime/1000)+”秒)”); sleep(runTime); output(“结束”); } }.execute(3);}测试结果:[T2]: 执行… (2秒)[T2]: 结束[T3]: 执行… (3秒)[T3]: 结束[T1]: 执行… (1秒)[T1]: 结束

2. 互斥信号量测试

下面我们对互斥信号量做一个测试,测试中线程T1循环调用了两层锁操作,但线程T2只能在T1解开最后一层锁的时候才可以继续运行。代码清单如下:

public class MutexTest { public static void main(String[] args){ new ThreadsTest(){ Mutex mutex=new Mutex(); public void runInThread1() throws InterruptedException{ mutex.p(); // 加锁:记录本线程为控制线程 output(“加锁(1层)”); output(“调用synProc()”); synProc(); // 此方法中还有同步操作 sleep(2000); output(“解锁(1层)”); mutex.v(); // 解锁:释放控制权 } public void synProc() throws InterruptedException{ mutex.p(); // 第2重锁,只计数,不等待 output(“synProc: 加锁(2层)”); output(“synProc: 调用synProc2()”); synProc2(); sleep(2000); output(“synProc: 解锁(2层)”); mutex.v(); // 第2重锁,只计数,不释放控制权 } public void synProc2() throws InterruptedException{ mutex.p(); output(“synProc2: 加锁(3层)”); output(“synProc2: 运行…”); sleep(2000); output(“synProc2: 解锁(3层)”); mutex.v(); } public void runInThread2() throws InterruptedException{ name(“waiter”); sleep(1000); output(“等待…”); mutex.p(); output(“加锁”); output(“运行…”); sleep(2000); mutex.v(); output(“解锁”); } }.execute(2); }}

运行结果:

[T1]: 加锁(1层) // 第一道锁,锁住控制权[T1]: 调用synProc()[T1]: synProc: 加锁(2层) // T1第2层锁(计数)[waiter]: 等待…[T1]: synProc: 调用synProc2()[T1]: synProc2: 加锁(3层) // T1第3层锁(计数)[T1]: synProc2: 运行…[T1]: synProc2: 解锁(3层)[T1]: synProc: 解锁(2层)[T1]: 解锁(1层) // 最后一道锁,释放控制权[waiter]: 加锁 // 等到T1解除最后一道锁[waiter]: 运行…[waiter]: 解锁

1.3.2.4 J2SE 1.5中的信号量与锁

在J2SE 1.5版中Java基础类库提供了一个名为java.util.concurrent.Semaphore的信号量类,并通过acquire()和release()方法分别实现了P操作和V操作。同样在1.5版本中还提供了一个java.util.concurrent.locks.Lock接口类用来实现互斥信号量的功能,通过接口中的lock()和unlock()方法分别实现了P操作和V操作的功能.同时还提供了一个Lock接口的实现类java.util.concurrent.locks.ReentrantLock,我们可以通过创建它的实例来使用互斥信号量.

家!甜蜜的家!天下最美好的莫过於家

Java同步技术(二)

相关文章:

你感兴趣的文章:

标签云: