JDK7中TransferQueue的使用以及TransferQueue与SynchronousQueue

JDK7对JDK5中的J.U.C并发工具进行了增强,其中之一就是新增了TransferQueue。java并发相关的JSR规范,可以查看Doug Lea维护的blog。现在简单介绍下这个类的使用方式。

public interface TransferQueue<E> extends BlockingQueue<E>{/*** Transfers the element to a waiting consumer immediately, if possible.** <p>More precisely, transfers the specified element immediately* if there exists a consumer already waiting to receive it (in* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),* otherwise returning {@code false} without enqueuing the element.** @param e the element to transfer* @return {@code true} if the element was transferred, else*{@code false}* @throws ClassCastException if the class of the specified element*prevents it from being added to this queue* @throws NullPointerException if the specified element is null* @throws IllegalArgumentException if some property of the specified*element prevents it from being added to this queue*/boolean tryTransfer(E e);/*** Transfers the element to a consumer, waiting if necessary to do so.** <p>More precisely, transfers the specified element immediately* if there exists a consumer already waiting to receive it (in* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),* else waits until the element is received by a consumer.** @param e the element to transfer* @throws InterruptedException if interrupted while waiting,*in which case the element is not left enqueued* @throws ClassCastException if the class of the specified element*prevents it from being added to this queue* @throws NullPointerException if the specified element is null* @throws IllegalArgumentException if some property of the specified*element prevents it from being added to this queue*/void transfer(E e) throws InterruptedException;/*** Transfers the element to a consumer if it is possible to do so* before the timeout elapses.** <p>More precisely, transfers the specified element immediately* if there exists a consumer already waiting to receive it (in* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),* else waits until the element is received by a consumer,* returning {@code false} if the specified wait time elapses* before the element can be transferred.** @param e the element to transfer* @param timeout how long to wait before giving up, in units of*{@code unit}* @param unit a {@code TimeUnit} determining how to interpret the*{@code timeout} parameter* @return {@code true} if successful, or {@code false} if*the specified waiting time elapses before completion,*in which case the element is not left enqueued* @throws InterruptedException if interrupted while waiting,*in which case the element is not left enqueued* @throws ClassCastException if the class of the specified element*prevents it from being added to this queue* @throws NullPointerException if the specified element is null* @throws IllegalArgumentException if some property of the specified*element prevents it from being added to this queue*/boolean tryTransfer(E e, long timeout, TimeUnit unit)throws InterruptedException;/*** Returns {@code true} if there is at least one consumer waiting* to receive an element via {@link #take} or* timed {@link #poll(long,TimeUnit) poll}.* The return value represents a momentary state of affairs.** @return {@code true} if there is at least one waiting consumer*/boolean hasWaitingConsumer();/*** Returns an estimate of the number of consumers waiting to* receive elements via {@link #take} or timed* {@link #poll(long,TimeUnit) poll}. The return value is an* approximation of a momentary state of affairs, that may be* inaccurate if consumers have completed or given up waiting.* The value may be useful for monitoring and heuristics, but* not for synchronization control. Implementations of this* method are likely to be noticeably slower than those for* {@link #hasWaitingConsumer}.** @return the number of consumers waiting to receive elements*/int getWaitingConsumerCount();}

可以看到TransferQueue同时也是一个阻塞队列,它具备阻塞队列的所有特性,主要介绍下上面5个新增API的作用。

1.transfer(E e)若当前存在一个正在等待获取的消费者线程,即立刻将e移交之;否则将元素e插入到队列尾部,并且当前线程进入阻塞状态,直到有消费者线程取走该元素。

public class TransferQueueDemo {private static TransferQueue<String> queue = new LinkedTransferQueue<String>();public static void main(String[] args) throws Exception {new Productor(1).start();Thread.sleep(100);System.out.println("over.size=" + queue.size());}static class Productor extends Thread {private int id;public Productor(int id) {this.id = id;}@Overridepublic void run() {try {String result = "id=" + this.id;System.out.println("begin to produce." + result);queue.transfer(result);System.out.println("success to produce." + result);} catch (InterruptedException e) {e.printStackTrace();}}}}可以看到生产者线程会阻塞,因为调用transfer()的时候并没有消费者在等待获取数据。队列长度变成了1,说明元素e没有移交成功的时候,会被插入到阻塞队列的尾部。只有经历过地狱般的折磨,才有征服天堂的力量,只有流过血的手指,才能弹出世间的绝唱。

JDK7中TransferQueue的使用以及TransferQueue与SynchronousQueue

相关文章:

你感兴趣的文章:

标签云: