32 通过阻塞队列(ArrayBlockingQueue、LinkedBlockingQueue)解决预约挂号问题
Diego38 93 1

1. 前言

之前学习了集合类、Map等并发安全容器,也是最常使用的两种数据结构,实际上JDK中种类最丰富的数据结构是队列Queue,它是一种支持在队尾入队,队头出队的先进先出的数据结构,在任务处理,异步化,消息队列很多场景都需要队列。

Java中的队列大多数天生就是线程安全的,本节我们先从最常见的ArrayBlockingQueue和LinkedBlockingQueue学起。

2. Java中的队列有哪些

2.1 队列分类

队列是JDK种类最丰富的数据结构,平时我们只会涉及到多线程中会使用,不过经常使用的LinkedList也是一种队列,只不过LinkedList只适用于单线程场景。

下面的脑图清晰的展示队列的分类: image

队列可以按照两种划分维度进行分类。

一是按阻塞与非阻塞维度可以分为阻塞队列和非阻塞队列,阻塞队列是最常见,使用最频繁的一类队列,适用于解决多线程问题。

二是按照功能特性维度,比如支持延迟任务执行的延迟队列,支持优先级排序的优先队列,支持双向入队出队的双向队列,还有一个使用场景比较少通过出队入队配对实现的队列-Exchanger(Exchanger使用较少面试几乎不考察,只做简单了解即可)。

这些队列本章后续都会介绍,

2.2 阻塞队列和非阻塞队列的区别

上述的队列分类,让我们看花眼,每个队列都有各自的使用场景,当了解到阻塞和非阻塞队列的区别和解决的问题后,后续学习会变得简单。 image

首先队列是一种支持在队尾入队,队头出队的先进先出的数据结构,而阻塞队列支持在入队和出队时阻塞的特殊队列。

阻塞队列支持在队列满时入队阻塞,在队列空时出队阻塞。

阻塞队列常用于生产者消费者场景,生产者向队列添加元素,消费者向队列获取元素,队列就是待消费的元素容器。

要解决生产者消费者问题:

  • 需要一个缓冲队列,缓冲队列可以有一定容量
  • 生产速度大于消费速度时,会造成队列满,此时生产需等待 反之,队列可为空,此时消费需等待
  • 这就是设计阻塞队列的初衷,阻塞队列天生是为解决生产者消费者问题而生的,而且阻塞队列天生是线程安全的。

非阻塞队列相反,它在入队和出队时不会阻塞线程,而且它不一定是线程安全的,比如 LinkedList也是队列,它属于非阻塞队列,但并不线程安全的,由于它是无界队列(容量无上限)它可以支持任意次数的入队,但当队列为空时,执行removeLast等操作会抛出异常。

非阻塞队列也可以是线程安全的,比如ConcurrentLinkedQueue,不支持put、task等阻塞操作,由于无界,入队不会造成队列满,队列为空时出队返回null。

2.3 阻塞队列的API

阻塞队列的API分为3类,分别是入队、出队、检查

  • 入队
    • offer(e) 非阻塞方法,当入队成功返回true否则返回false
    • offer(e, time, unit) 队列满时超时超时阻塞方法,当入队成功会返回true否则返回false
    • put(e) 队列满时一直阻塞或响应中断
    • add(e) 非阻塞方法,队列满时抛出IllegalStateException
  • 出队
    • poll() 非阻塞方法,当出队成功返回true否则返回false
    • poll(time, unit) 队列空时超时超时阻塞方法,当出队成功会返回true否则返回false
    • take() 队列空时一直阻塞或响应中断
    • remove(e) 非阻塞方法,移除队列中指定的元素,找到移除成功返回true,否则返回false
  • 检查(非阻塞)
    • element() 获取队头元素,队列空抛出NoSuchElementException
    • peek() 获取队头元素,队列空返回null
    • contains(o) 如果队列包含该元素则返回true否则false
    • drainTo(Collection c) 移除队列中元素到列表中

队列是不允许插入NULL元素的,入队API发现插入的元素为null,会抛出NullPointerException

3. 通过阻塞队列解决预约挂号问题

在生活中,挂号就诊就是一种典型的生产者消费者问题,我们可以提前或者当天继续挂号,挂完号也可以取消,挂号相当于生产,而就诊相当于消费,挂号的数量不能超过医院医生一天接诊最大次数,所以需要有一个容器来承接,其次挂号是按顺序先来后到的,因此,我们通过有界阻塞队里来实现。

  • 预约挂号 => 入队 => put
  • 就诊 => 出队 => take
  • 取消预约 => remove

代码如下:

public class LinkedBlockingQueueTest {

    private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue(10000);

    static Thread guahaoThread1 = new Thread(() -> {
        for (int i = 0; i < 10000; i ++) {
            String patientName = Long.toHexString(ThreadLocalRandom.current().nextLong(100000));
            boolean offer = queue.offer(patientName);
            if (!offer) {
                try {
                    System.out.println("进入阻塞入队");
                    queue.put(patientName);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    });

    static Thread guahaoThread2 = new Thread(() -> {
        for (int i = 10000; i < 20000; i ++) {
            String patientName = Long.toHexString(ThreadLocalRandom.current().nextLong(100000));
            boolean offer = queue.offer(patientName);
            if (!offer) {
                try {
                    System.out.println("进入阻塞入队");
                    queue.put(patientName);
                } catch (InterruptedException e) {
                    break;
                }
            }

        }
    });

    static Thread quxiaoThread = new Thread(() -> {
        for (int i = 0; i < 20000; i ++) {
            String patientName = Long.toHexString(ThreadLocalRandom.current().nextLong(100000));
            if (i % 5 == 0) {
                queue.remove(patientName);
            }
        }
    });

    static Thread jiuzhenThread = new Thread(() -> {
        int n = 1;
       while (true) {
           String element = null;
           try {
               element = queue.poll(5, TimeUnit.SECONDS);
           } catch (InterruptedException e) {
               break;
           }
           if (element != null) {
               System.out.println(element + ": 就诊完成");
               System.out.println("已有" + (n ++) + "完成就诊" );
           } else {
               System.out.println("挂号队列为空");
           }
       }
    });

    public static void main(String[] args) {
        guahaoThread1.start();
        guahaoThread2.start();
        quxiaoThread.start();
        jiuzhenThread.start();
    }

}

输出如下:

..
已有1611完成就诊
97d7: 就诊完成
已有1612完成就诊
进入阻塞入队
a29a: 就诊完成
已有1613完成就诊
进入阻塞入队
进入阻塞入队
4979: 就诊完成
已有1614完成就诊
进入阻塞入队
..

已有19724完成就诊
挂号队列为空
挂号队列为空
挂号队列为空
..

使用LinkedBlockingQueue来存放挂号,有两个挂号线程,一个取消挂号线程,还有一个就诊线程。

挂号线程在尝试无阻塞入队失败后,即队列满时进入阻塞入队,而就诊线程在队列空时不会无限忙等,而是进入每次5秒钟的阻塞获取阶段,避免造成CPU打满。

4. ArrayBlockingQueue的实现

上例中我们使用的LinkedBlockingQueue是最常见的阻塞队列,可以满足大部分场景,并且性能相对ArrayBlockingQueue要好。但ArrayBlockingQueue也有优点,接下来我们就分析下ArrayBlockingQueue。

ArrayBlockingQueue是基于数组实现的,容量固定的,在操作数组出队和入队时需要加锁访问来解决并发问题。下图展示了ArrayBlockingQueue入队出队过程。 image

  • 使用一把锁来控制对数组的操作。
  • 有一个takeIndex和PutIndex,putindex指向下一个插入的位置,taskIndex指向下一个删除的位置,两个index都会向右移动,而如果到末尾,putIndex和takeIndex会重置为0,也就是起始位置。
  • ArrayBlockingQueue内部维护了一个count,当count=0和count==array.length时takeindex和putindex会重合,这时可以表示队列空和队列满的状态
  • 对于take、put是通过lock + Condition来实现,对于无超时的poll和offer是通过lock来实现的,remove是将左边的往右拷贝来实现的。
  • 入队操作: 获锁 + 队列满等待 + 发送非空的通知
  • 出队操作: 获锁 + 队列空等待 + 发送不满的通知

阻塞是通过Condition来实现的,对应着notFull Condition和notEmpty Condition,两个Condition源于同一把Lock派生出来,我们来看下put操作和offer操作的代码:

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

put操作遇到满时,会进行条件队列的等待。

 public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

offer操作遇到队列满时,直接return,不会进入阻塞。

ArrayBlockingQueue有明显的优缺点

  • 优点: 支持有界,容量固定基于数组占用内存较少

  • 缺点: 入队出队共用同一把锁,性能不高

    5. LinkedBlockingQueue的实现

    LinkedBlockingQueue基于链表实现,入队和出队分别对应一把锁,适用于对性能要求较高的场景。

      /** Lock held by take, poll, etc 
       *  出队锁  
       */
      private final ReentrantLock takeLock = new ReentrantLock();
    
      /** Wait queue for waiting takes
       *  由takeLock派生出来,出队线程等待非空条件信号
      */
      private final Condition notEmpty = takeLock.newCondition();
    
      /** Lock held by put, offer, etc 
       *  入队锁
       */
      private final ReentrantLock putLock = new ReentrantLock();
    
      /** Wait queue for waiting puts 
       * 由putLock派生出来,入队线程等待非满条件信号
       */
      private final Condition notFull = putLock.newCondition();

    image

  • LinkedBlockingQueue支持有界和无界,内部有一个count记录队列里的元素总数,所以size操作是很轻量,直接取count的值。

  • 内置两把锁,一把putLock,一把takeLock, offer/put时使用putLock,当元素数量由0到1,则释放putLock锁后获取takeLock然后发送notEmpty的signal信号; take/poll使用takeLock,当拿出最后一个元素,则释放takeLock锁后获取putLock然后发送notFull的signal,也就是说大部分场景出队和入队两者操作是可以同时执行不需要做并发控制的,只有队列由空变成非空或者队列满到非满条件下才会两把锁串行获取。

不同的锁操作同一个链表,会不会带来可见性问题呢?要回答这个问题,还是需要从happens-before法则入手。

happens-before有一条规则,对volatile字段的写入操作先行发生于每一个后续的该字段的读操作,我们看看有没有触发这条规则:

put线程之间因为有putLock可以保证可见性put线程之间的可见性,take线程之间因为有take线程可以保证可见性,那put线程和take线程之间如何保证可见性呢?

我们看take和pool方法有如下一段:

//take线程,对Volatile做了读写
while (count.get() == 0) {
                notEmpty.await(); }
  x = dequeue();
c = count.getAndIncrement();
//put线程:对Volatile对了读写
while (count.get() == capacity) {
                notFull.await();
 }
enqueue(node);
c = count.getAndIncrement();

其中count.getAndIncrement()即是对同一变量的volatile写操作,所以说入队线程和出队线程满足happens-before法则,这两种线程之间的操作是互相可见的。

LinkedBlockingQueue的优缺点:

  • 优点: 支持有界与无界,两把锁,效率高
  • 缺点: 由于链表实现的原因,进行contains和remove需要锁住两把锁 但整体而言性能都比ArrayBlockingQueue效率高,我们在大部分场景优先选择LinkedBlockingQueue。

6. 总结

image

image

预览图
评论区

索引目录