几种类型的BlockingQueue
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
PriorityBlockingQueue是一个支持优先级的无界队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将DelayQueue运用在以下应用场景:
- 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
- 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
关于BlockingQueue的drainTo方法
int drainTo(Collection<? super E> c, int maxElements)
- 从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
- 在向 collection c 中添加元素没有成功时,可能导致在抛出相关异常时,元素会同时在两个 collection 中出现,或者在其中一个 collection 中出现,也可能在两个 collection 中都不出现。
- 如果试图将一个队列放入自身队列中,则会导致 IllegalArgumentException 异常。
- 如果正在进行此操作时,正在修改指定的 collection,则此操作行为是不确定的。
使用技巧
//先从queue中获取一个对象,如果没有对象线程自动阻塞
FixERForFixOutgoingValue firstValue = stkRequestQueue.take();
list.add(firstValue);
//如果获取到对象了,则一次性获取剩下的全部对象
stkRequestQueue.drainTo(list);
BlockingQueue 构建生产消费者模式:
package lands.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueTest {
    /**
     * @param args
     */
    public static void main(String[] args) {
        //BlockingQueue q = new ArrayBlockingQueue(100); //在构造时需要指定容量
        BlockingQueue q = new LinkedBlockingQueue(); //在构造时默认没有上限,但也可以选择指定最大上限
        BlockingQueueProducer p1 = new BlockingQueueProducer(q);
        BlockingQueueProducer p2 = new BlockingQueueProducer(q);
        new Thread(p1).start();
        new Thread(p2).start();
        BlockingQueueConsumer c1 = new BlockingQueueConsumer(q);
        BlockingQueueConsumer c2 = new BlockingQueueConsumer(q);
        new Thread(c1).start();
        new Thread(c2).start();
    }
}
class BlockingQueueProducer implements Runnable {
    private final BlockingQueue queue;
    BlockingQueueProducer(BlockingQueue q) {
        queue = q;
    }
    public void run() {
        try {
            int i = 0;
            while (true) {
                i++;
                queue.put(produce(i));
                //System.out.println("remainingCapacity:" + queue.remainingCapacity());
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
    String produce(int i) {
        //create your wanted object
        return i + "";
    }
}
class BlockingQueueConsumer implements Runnable {
    private final BlockingQueue queue;
    BlockingQueueConsumer(BlockingQueue q) {
        queue = q;
    }
    public void run() {
        try {
            while (true) {
                consume(queue.take());
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
    void consume(Object x) {
        //use a object in queue
        System.out.println(x);
    }
}
 
  
  
  
 
