并发数据结构与线程(ArrayBlockingQueue)

BigData
• 阅读 3128

今天在QQ群上抛出来一个问题,如下
并发数据结构与线程(ArrayBlockingQueue)

我以Java自带的数据结构为例,用源码的形式说明,如何阻塞线程、通知线程的。

一、Lock & Condition
ArrayBlockingQueue以可重入锁和两个Condition对象来控制并发。

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

构造函数中初始化了notEmpty和notFull.

    /**
     * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
     * capacity and the specified access policy.
     * @param capacity the capacity of this queue
     * @param fair if <tt>true</tt> then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if <tt>false</tt> the access order is unspecified.
     * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

二、线程阻塞
当ArrayBlockingQueue存储的元素是0个的时候,take()方法会阻塞.

    public Object take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            Object x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }

这里take方法首先获得可重入锁lock,然后判断如果元素为空就执行notEmpty.await(); 这个时候线程挂起。

三、通知线程
比如使用put放入一个新元素,

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

enqueue方法中,

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

对刚才的notEmptyCondition进行通知。
四、ReentrantLock vs AbstractQueuedSynchronizer
ArrayBlockingQueue使用ReentrantLock来控制并发,同时也使用ArrayBlockingQueue的Condition对象来与线程交互。notEmptynotFull都是由
ReentrantLock的成员变量sync生成的,

public Condition newCondition() {
        return sync.newCondition();
    }

sync可以认为是一个抽象类类型,Sync,它是在ReentrantLock内部定义的静态抽象类,抽象类实现了newCondition方法,

final ConditionObject newCondition() {
            return new ConditionObject();
        }

返回的类型是实现了Condition接口的ConditionObject类,这是在AbstractQueuedSynchronizer内部定义的类。在ArrayBlockingQueue中的notEmpty就是ConditionObject实例。

阻塞:
ArrayBlockingQueue为空时,notEmpty.await()将自己挂起,如ConditionObject的await方法,

        /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

addConditionWaiter是将当前线程作为一个node加入到ConditionObject的队列中,队列是用链表实现的。
如果是初次加入队列的情况,node.waitStatus == Node.CONDITION成立,方法isOnSyncQueue返回false,那么就将线程park。

while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                ....
}

至此线程被挂起,LockSupport.park(this);这里this是指ConditionObject,是notEmpty.
通知:
当新的元素put进入ArrayBlockingQueue后,notEmpty.signal()通知在这上面等待的线程,如ConditionObject的signal方法,

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

doSignal方法,

        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

doSignal一开始接收到的参数就是firstWaiter这个参数,在内部实现中用了do..while的形式,首先将first的的nextWaiter找出来保存到firstWaiter此时(first和firstWaiter不是一回事),在while的比较条件中可调用了transferForSignal方法,
整个while比较条件可以看着短路逻辑,如果transferForSignal结果为true,后面的first = firstWaiter就不执行了,整个while循环就结束了。

参照注释,看

transferForSignal方法,

    /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

首先确保想要被signal的等待node还是处于Node.CONDITION状态,然后调整状态为Node.SIGNAL,这两个都是采用CAS方法,最后调用的是

LockSupport.unpark(node.thread);

五、LockSupport
至此,我们已经知道了线程的挂起和通知都是使用LockSupport来完成的,并发数据结构与线程直接的交互最终也是需要LockSupport。那么关于LockSupport,我们又可以了解多少呢?

Ref:
Java中的ReentrantLock和synchronized两种锁定机制的对比
Java的LockSupport.park()实现分析

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
4年前
java并发相关(四)——关于synchronized的可重入性,线程切换实现原理与是否公平锁
一、可重入性  关于synchronized的可重入性的证明,我们可以通过A类内写两个同步方法syncA(),syncB()。然后syncA内调用syncB,调用syncA发现代码可正常执行,来证明这一点。  当处于无锁阶段时,划掉,都重入了不可能处于无锁。  当处于偏向锁阶段时,由之前对偏向锁的解释可知,偏向当前线程id是,当前线程可直
Wesley13 Wesley13
4年前
java并发面试常识之ArrayBlockingQueue
       ArrayBlockingQueue是常用的线程集合,在线程池中也常常被当做任务队列来使用。使用频率特别高。他是维护的是一个循环队列(基于数组实现),循环结构在数据结构中比较常见,但是在源码实现中还是比较少见的。线程安全的实现     线程安全队列,基本是离不开锁的。ArrayBlockingQueue使用的是Reen
Stella981 Stella981
4年前
ReenTrantLock可重入锁和synchronized的区别
ReenTrantLock可重入锁和synchronized的区别可重入性:从名字上理解,ReenTrantLock的字面意思就是再进入的锁,其实synchronized关键字所使用的锁也是可重入的,两者关于这个的区别不大。两者都是同一个线程没进入一次,锁的计数器都自增1,所以要等到锁的计数器下降为0时才能释放锁。锁的实现:S
Wesley13 Wesley13
4年前
100 行写一个 go 的协程池 (任务池)
前言go的goroutine提供了一种较线程而言更廉价的方式处理并发场景,go使用二级线程的模式,将goroutine以M:N的形式复用到系统线程上,节省了cpu调度的开销,也避免了用户级线程(协程)进行系统调用时阻塞整个系统线程的问题。【1】但goroutine太多仍会导致调度性能下降、GC
Wesley13 Wesley13
4年前
Java 并发数据结构
\TOCM\因为Java提供了一些非线程安全的数据结构如HashMap,ArrayList,HashSet等。所有在多线程环境中需要使用支持并发访问操作的数据结构。并发ListVector,CopyOnWriteArrayList是线程安全的List。ArrayList是线程不安全的。如果一定要使用,需要:Collection
Wesley13 Wesley13
4年前
Java并发源码之ReentrantLock
ReentrantLock介绍ReentrantLock是一个可重入的互斥锁,与使用synchronized方法和语句访问的隐式监视锁具有相同的基本行为和语义,但具有扩展功能。ReentrantLock属于最后一个成功加锁并且还没有释放锁的线程。当一个线程请求lock时,如果锁不属于任何线程,将立马得到这个锁;如果锁已经被
Stella981 Stella981
4年前
Notification使用详解之二:可更新进度的通知
上次和大家分享了关于Notification的基础应用,包括简单的通知和自定义视图的通知。今天和大家分享一下如何实现一个可更新进度的通知。我们将会模拟一个下载任务,先启动一个线程负责模拟下载工作,在这个过程中更新进度信息,然后下载线程把最新的进度信息以消息的形式,发送到UI线程的消息队列中,最后UI线程负责根据最新的进度信息来更新进度通知的UI界面。
Stella981 Stella981
4年前
Noark入门之线程模型
0x00单线程多进程单线程与单进程多线程的目的都是想尽可能的利用CPU,减少CPU的空闲时间,特别是多核环境,今天咱不做深度解读,跳过...0x01线程池锁最早的一部分游戏服务器是采用线程池的方式来处理玩家的业务请求,以达最大限度的利用多核优势来提高处理业务能力。但线程池同时也带来了并发问题,为了解决同一玩家多个业务请求不被
Wesley13 Wesley13
4年前
Java并发编程:多线程如何实现阻塞与唤醒
线程的阻塞和唤醒在多线程并发过程中是一个关键点,当线程数量达到很大的数量级时,并发可能带来很多隐蔽的问题。如何正确暂停一个线程,暂停后又如何在一个要求的时间点恢复,这些都需要仔细考虑的细节。Java为我们提供了多种API来对线程进行阻塞和唤醒操作,比如suspend与resume、sleep、wait与notify以及park与unpark等等。!(
Wesley13 Wesley13
4年前
Java 并发编程:多线程如何实现阻塞与唤醒
线程的阻塞和唤醒在多线程并发过程中是一个关键点,当线程数量达到很大的数量级时,并发可能带来很多隐蔽的问题。如何正确暂停一个线程,暂停后又如何在一个要求的时间点恢复,这些都需要仔细考虑的细节。Java为我们提供了多种API来对线程进行阻塞和唤醒操作,比如suspend与resume、sleep、wait与notify以及park与unpark等等。!(
Stella981 Stella981
4年前
Linux 多线程
I.同步机制线程间的同步机制主要包括三个:互斥锁:以排他的方式,防止共享资源被并发访问;互斥锁为二元变量,状态为0开锁、1上锁;开锁必须由上锁的线程执行,不受其它线程干扰.条件变量:
BigData
BigData
Lv1
落叶他乡树,寒灯独夜人。
文章
5
粉丝
0
获赞
0