java关于jdk1.5线程加强的专题

1.线程池的概念与 Executors 类的应用

在jdk1.5之后,我们可以使用 Executors 这个类的静态方法来创建线程池对象//该方法用于创建一个线程池对象,通过指定的参数确定该线程池中有多少个线程对象,该对象实现了ExecutorService接口1.

public static ExecutorService newFixedThreadPool(int nThreads)

//该方法创建一个线程池,该线程池会根据动态的任务创建线程对象个数,线程池对象实现了ExecutorService接口2.

public static ExecutorService newCachedThreadPool()

//该方法创建一个线程池对象,该对象中只有一个线程对象,线程池对象实现了ExecutorService接口3.

public static ExecutorService newSingleThreadExecutor()

4.另外,讲解一下 ScheduledExecutorService 这个接口,通过 Executors 的 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)方法可以获取实现了实现了 ScheduledExecutorService 接口的对象 该对象主要作用是他可以对同一个任务进行多次的线程执行5. ScheduledExecutorService 和 ExecutorService 和 Executor 的关系是Executor |ExecutorService |ScheduledExecutorService接下来看具体的代码示例

实例1

package com.xiaogao.thread;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class NewThreadPool {public static void main(String[] args) {//创建一个线程池,该线程池中含有3个线程对象//ExecutorService es = Executors.newFixedThreadPool(3);//创建一个具有缓冲线程池的线程//ExecutorService es = Executors.newCachedThreadPool();//创建一个具有单线程的线程池ExecutorService es = Executors.newSingleThreadExecutor();for(int i=1; i<11; i++) {final int task = i;//循环十次,给线程池分配十次任务,并执行es.execute(new Runnable() {public void run() {//执行十次循环,这是每个任务需要做的事,打印信息for(int j=1; j<11; j++) {try {//睡眠20毫秒Thread.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第" + task + "号任务执行,第" + j + "次循环");}}});}System.out.println("十次任务由三个线程执行完毕,此时java虚拟机还没对退出,必须调用shutdown方法");es.shutdown();try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}//这个方法非常危险,会杀死当前正在执行任务的线程,暂停正在处理的等待中的任务,正在执行任务的线程被杀死后,会抛出异常//es.shutdownNow();//第一次执行,隔六秒,之后每一次执行隔2秒Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable(){@Overridepublic void run() {System.out.println("bombing!");}},6,2,TimeUnit.SECONDS);}}

2.java1.5之后锁的使用在java1.5之前,在多线程之中我们为了防止多线程操纵共享数据发生错误,我们会将相应的代码封装在用 synchronized 修饰的方法,或代码块中,在jdk1.5之后我们可以考虑使用 Lock 这个接口来实现,这个接口中定义了六个方法,主要的有lock() 和 unlock() 这两个方法,用于得到锁和释放锁, ReentrantLock 这个类实现了 Lock 接口,主要用于创建创建锁对象实例2

package com.xiaogao.test;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class TestLock {public static void main(String[] args) {ShareaDate sd = new ShareaDate();//创建并启动4个线程new Thread(sd).start();new Thread(sd).start();new Thread(sd).start();new Thread(sd).start();}//定义一个内部类,该类中封装了共享数据static class ShareaDate implements Runnable {//创建共享字段//重设共享数据private int count = 10000;//拿到锁对象,用于同步代码Lock lock = new ReentrantLock();//为了保持操作数据的原子性,原本是加synchronized关键字的,现在可以使用Lockpublic /* synchronized */ int getCount() {//为了保险起见,万一try代码块中发生了异常,锁无法得到释放,应该将释放锁的代码放在finally中try {//上锁lock.lock();count = count - 1;return count;} finally {//释放锁lock.unlock();}}@Overridepublic void run() {while(count > 0 ) {System.out.println(getCount());}}}}

3.锁的深入研究ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。该类封装了两个抽象方法Lock readLock()Lock writeLock()用于取得读取锁和写入锁ReentrantReadWriteLock 这个类实现了 ReadWriteLock 这个接口,但是他并没有覆写readLock()方法和writeLick()方法,通过查看源代码,我们发现这个类中封装了两个静态内部类ReentrantReadWriteLock.ReadLock 和 ReentrantReadWriteLock.WriteLock 这两个类中有readLock()方法和writeLock()的具体实现读锁和写锁到底有什么作用,和普通锁有什么区别,在多线程程序中,我们可能需要取得共享数据,有时需要修改共享数据,取得共享数据时,线程的乱入可能没有什么危害,但是修改数据时危害就很大,这时为了提高效率,就出现了读锁和写锁的概念,具体是什么意思1.读锁不互斥,当一个线程进入读锁,还没有执行完相应的代码,另一个线程可以进入执行,就像没有加锁一样.2.读写锁互斥,一个线程进入,另一个线程无法进入3.写锁互斥,一个线程进入,另一个线程无法进入相对于普通锁的好处是他能有效的提高性能,如果是普通锁的话,不管你做什么操作,他都不允许别的线程进入,下面代码演示实例3

package com.xiaogao.test;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantLock;import java.util.concurrent.locks.ReentrantReadWriteLock;/** *测试读锁和写锁的区别  *  */public class TestLock {public static void main(String[] args) {ShareaDate sd = new ShareaDate();//创建并启动4个线程new Thread(sd, "1").start();new Thread(sd, "2").start();new Thread(sd, "3").start();new Thread(sd, "4").start();}//定义一个内部类,该类中封装了共享数据static class ShareaDate implements Runnable {//创建共享字段//重设共享数据private int count = 100;//创建读写锁ReadWriteLock rel = new ReentrantReadWriteLock();//为了保持操作数据的原子性,原本是加synchronized关键字的,现在可以使用Lock/*public int getCount() {//拿到锁对象,用于同步代码//拿到读锁,并上锁rel.readLock().lock();try {for(int i=0; i<10; i++) {try {Thread.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第" + Thread.currentThread().getName() + "号线程进入执行," + i+"count:" + count);}return count;} finally {//释放读锁rel.readLock().unlock();}}*/public void resetCount() {//拿到写锁,并上锁rel.writeLock().lock();//保险起见,在判断一次if(count > 0) {try {count--;for(int i=0; i<10; i++) {//循环10次System.out.println("第" + Thread.currentThread().getName() + "号线程进入执行," + i+"count:" + count);}} finally {//释放锁rel.writeLock().unlock();}}}@Overridepublic void run() {while(count > 0 ) {//getCount();resetCount();}}}}

从代码运行结果来看,如果用的是读锁,在内部循环时,其他线程可以进入,而用写锁时,无法进入.什么是锁的降级:重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。貌似应用比较少这里就不加举例子了4.Condition 的简单应用Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。咱们可以这么理解 Condition ,它可用于唤醒别的线程和暂停本线程.在原来的 synchronized 修饰的语句块中,我们是通过调用锁的wait()方法和notify()来暂停和唤醒线程,同理Condition 必须基于线程同步代码块的互斥才行,也就是必须存在于Lock.lock()与Lock.unlock()之中,这样的话我们也可以更好的理解为什么读锁为什么无法拿到 Condition 对象了加读锁后,线程之间并不互斥,就像没有加 synchronized 一样,自然没有资格使用wait()方法和notify()一样synchronized 和 Condition 的区别相同点:都是使用在并发编程当中,并且必须基于一把锁对象不同点:当有>=3个的线程的话,用notify()方法没有办法指定唤醒哪一个线程,完全是随机的,就算是你使用notifyall()唤醒全部线程也没有办法保证你想要的线程得到锁,但是 Condition就不一样,你可以指定唤醒某个线程,并将锁扔给他.让我们现在来看一道面试题有三个线程,分别执行三段代码,主线程执行完后,执行子1号线程,之后子2号线程,之后再主线程,这样来回若干次分析:如果我们不使用 Condition 这个新技术,会很麻烦的,因为我们唤醒其他线程的时候不知道下一个是哪个线程会被唤醒并得到执行,而如果使用 Condition 可以很好的解决这个问题见具体代码实例4

package com.xiaogao.test;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ThreeConditionCommunication {/** * @param args */public static void main(String[] args) {final Business business = new Business();//创建3个线程对象并启动他们,每个线程执行的任务不同new Thread(new Runnable() {@Overridepublic void run() {for(int i=1;i<=50;i++){business.sub2(i);}}}).start();new Thread(new Runnable() {@Overridepublic void run() {for(int i=1;i<=50;i++){business.sub3(i);}}}).start();for(int i=1;i<=50;i++){business.main(i);}}static class Business {    //创建锁对象Lock lock = new ReentrantLock();//得到3个Condition对象Condition condition1 = lock.newCondition();Condition condition2 = lock.newCondition();Condition condition3 = lock.newCondition();  private int shouldSub = 1;  public  void sub2(int i){  //上锁  lock.lock();  try{  while(shouldSub != 2){  try {  //正在执行的线程等待,condition2记住了当前线程,后面就可以通过condition2.signal()唤醒该线程;condition2.await();System.out.println("第二号子线程2222222222");} catch (Exception e) {e.printStackTrace();}  }for(int j=1;j<=10;j++){System.out.println("sub2 thread sequence of " + j + ",loop of " + i);}  shouldSub = 3;  //通过condition3唤醒指定的线程  condition3.signal();  }finally{  lock.unlock();  }  }  public  void sub3(int i){  lock.lock();  try{  while(shouldSub != 3){  try {condition3.await();System.out.println("第三号子线程33333333333333");} catch (Exception e) {e.printStackTrace();}  }for(int j=1;j<=20;j++){System.out.println("sub3 thread sequence of " + j + ",loop of " + i);}  shouldSub = 1;  condition1.signal();  }finally{  //解锁,释放权限  lock.unlock();  }  }    public  void main(int i){  lock.lock();  try{ while(shouldSub != 1){  try {condition1.await();System.out.println("主线程1111111111111111111");} catch (Exception e) {e.printStackTrace();}  }for(int j=1;j<=100;j++){System.out.println("main thread sequence of " + j + ",loop of " + i);}shouldSub = 2;condition2.signal();  }finally{  lock.unlock();}  }        }    }

我在每个condition.await();后面加了一句打印代码,就是为了确认某个condition调用signal()后,是不是指定的线程得到权限,并执行,从结果来看,显然如此下面介绍几个多线程并发的工具类,没什么难度,记得在哪用就可以了5.Semaphore 的使用一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。 Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:实例5

package com.xiaogao.test;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class SemaphoreTest {public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();//创建一个Semaphore对象,并指定其权限个数final  Semaphore sp = new Semaphore(3);//循环10次for(int i=0;i<10;i++){Runnable runnable = new Runnable(){public void run(){try {//取得一个许可权限,当权限用完后,线程将阻塞,等待其他线程释放权限sp.acquire();} catch (InterruptedException e1) {e1.printStackTrace();}System.out.println("线程" + Thread.currentThread().getName() + "进入,当前已有" + (3-sp.availablePermits()) + "个并发");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程" + Thread.currentThread().getName() + "即将离开");sp.release();//下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元System.out.println("线程" + Thread.currentThread().getName() + "已离开,当前已有" + (3-sp.availablePermits()) + "个并发");}};service.execute(runnable);}}}

6.CyclicBarrier一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier实例5

package com.xiaogao.test;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class CyclicBarrierTest {public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();final  CyclicBarrier cb = new CyclicBarrier(3);for(int i=0;i<3;i++){Runnable runnable = new Runnable(){public void run(){try {Thread.sleep((long)(Math.random()*10000));System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));cb.await();Thread.sleep((long)(Math.random()*10000));System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));cb.await();Thread.sleep((long)(Math.random()*10000));System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));cb.await();} catch (Exception e) {e.printStackTrace();}}};service.execute(runnable);}service.shutdown();}}

7.CountDownLatch一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。 CountDownLatch 与 CyclicBarrier 的区别还有 CyclicBarrier 调用await()方法后,它本身的数值会减1,而 CountDownLatch 必须调用countDown()方法才能减1实例6

package com.xiaogao.test;import java.util.concurrent.CountDownLatch;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class CountdownLatchTest {public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();final CountDownLatch cdOrder = new CountDownLatch(1);final CountDownLatch cdAnswer = new CountDownLatch(3);for(int i=0;i<3;i++){Runnable runnable = new Runnable(){public void run(){try {System.out.println("线程" + Thread.currentThread().getName() + "正准备接受命令");cdOrder.await();System.out.println("线程" + Thread.currentThread().getName() + "已接受命令");Thread.sleep((long)(Math.random()*10000));System.out.println("线程" + Thread.currentThread().getName() + "回应命令处理结果");cdAnswer.countDown();} catch (Exception e) {e.printStackTrace();}}};service.execute(runnable);}try {Thread.sleep((long)(Math.random()*10000));System.out.println("线程" + Thread.currentThread().getName() + "即将发布命令");cdOrder.countDown();System.out.println("线程" + Thread.currentThread().getName() + "已发送命令,正在等待结果");cdAnswer.await();System.out.println("线程" + Thread.currentThread().getName() + "已收到所有响应结果");} catch (Exception e) {e.printStackTrace();}service.shutdown();}}

8.Exchanger可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。实例7

package com.xiaogao.test;import java.util.concurrent.Exchanger;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ExchangerTest {public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();final Exchanger<String> exchanger = new Exchanger<String>();service.execute(new Runnable(){public void run() {try {String data1 = "zxx";System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去");Thread.sleep((long)(Math.random()*10000));String data2 = (String)exchanger.exchange(data1);System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2);}catch(Exception e){}}});service.execute(new Runnable(){public void run() {try {String data1 = "lhm";System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去");Thread.sleep((long)(Math.random()*10000));String data2 = (String)exchanger.exchange(data1);System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2);}catch(Exception e){}}});service.shutdown();}}

9.下面介绍一下线程阻塞队列的类和接口接口:BlockingQueue<E>支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。 BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法: 抛出异常 特殊值 阻塞 超时 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 检查 element() peek() 不可用 不可用

实现了这个接口的类有 ArrayBlockingQueue<E>一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。这个类与 Semaphore 很像,但与 Semaphore 不同的是 Semaphore 中装的是许可权限,一个线程拿到后就少一个名额,当权限为0时必须线程自己释放才能使别的线程得到执行机会,而 ArrayBlockingQueue 是代表一个数组序列,当数组序列满时,再调用put()方法当前线程将会阻塞,当数组序列为0时,调用take()方法线程也将会阻塞, ArrayBlockingQueue 是多个线程操纵一个数组序列,当一个线程调用一个方法时,可能会使另一个线程从阻塞状态恢复回来下面看两个示例代码实例8

package com.xiaogao.test;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class BlockingQueueTest {public static void main(String[] args) {final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);for(int i=0;i<2;i++){new Thread(){public void run(){while(true){try {//随眠时间1000毫秒以下的任意值时间Thread.sleep((long)(Math.random()*1000));System.out.println(Thread.currentThread().getName() + "准备放数据!");//添加数据,如果已经填满,将阻塞queue.put(1);System.out.println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据");} catch (InterruptedException e) {e.printStackTrace();}}}}.start();}new Thread(){public void run(){while(true){try {//睡眠1秒后取数据Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + "准备取数据!");//取得数据,如果没有数据将会阻塞queue.take();System.out.println(Thread.currentThread().getName() + "已经取走数据," + "队列目前有" + queue.size() + "个数据");} catch (InterruptedException e) {e.printStackTrace();}}}}.start();}}

从程序执行的结果来看,很明显,程序中一共有三个线程并发执行,两个线程负责向集合中添加数据,另外还有一个负责取出数据,并且添加数据的频率明显快于取出数据的频率,所以总会显示集合队列中有3个数据,改一下睡眠的时间,结果又会不一样用两个具有1个空间的的队列来实现同步功能问题分析1.实现同步功能,首先必须要有两个线程,每个线程处理一个队列2.为了避免阻塞的产生,应该当一个线程阻塞时,另外一个线程得以运行,在第二个线程阻塞前使第一个线程致使阻塞的条件得以解除,这样往复执行实例9

package com.xiaogao.test;import java.util.Collections;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.atomic.AtomicInteger;public class BlockingQueueCommunication {public static void main(String[] args) {final Business business = new Business();//创建并执行线程,线程内部循环50次new Thread(new Runnable() {public void run() {for(int i=1;i<=50;i++){business.sub(i);}}}).start();//主线程循环50次for(int i=1;i<=50;i++){business.main(i);}} static class Business {  //创建两个序列,每个序列的容量为1  BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);  BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);  //让Business对象创建时就填满队列queue2队列  {  try {queue2.put(1);} catch (InterruptedException e) {e.printStackTrace();}  }  public  void sub(int i){  try {queue1.put(1);} catch (InterruptedException e) {e.printStackTrace();}for(int j=1;j<=10;j++){System.out.println("sub thread sequece of " + j + ",loop of " + i);}try {queue2.take();} catch (InterruptedException e) {e.printStackTrace();}  }  public  void main(int i){  try {queue2.put(1);} catch (InterruptedException e1) {e1.printStackTrace();}for(int j=1;j<=100;j++){System.out.println("main thread sequece of " + j + ",loop of " + i);}try {queue1.take();} catch (InterruptedException e) {e.printStackTrace();}  }  }}

以上就是java关于jdk1.5线程加强的专题的详细内容,更多请关注其它相关文章!

听他第二十八次提起童年往事,每年的同一天和他庆祝生日,

java关于jdk1.5线程加强的专题

相关文章:

你感兴趣的文章:

标签云: