34 通过非阻塞队列ConcurrentLinkedQueue实现一个对象池
Diego38 61 1

1. 前言

前面我们学习了四种阻塞队列,阻塞队列的实现各有千秋,但内部无外乎是通过数组和链表来实现的,在多线程场景下,需要对内部的数组和链表进行并发控制,就不可避免的去加锁(ArrayBlockingQueue、LinkedBlockingQueue)或者添加无锁化CAS(synchronousQueue、LinkedTransferQueue)。

而无锁化阻塞队列LinkedTransferQueue除了无界这一缺点,其他方面很优秀了。今天介绍一种无锁化非阻塞无界队列ConcurrentLinkedQueue,一般用于无需阻塞或通过其他阻塞手段来辅助的队列场景。

2. 通过非阻塞队列ConcurrentLinkedQueue实现一个对象池

队列根据操作的内部数据队列并发控制分为有锁和无锁队列,根据是否容量限制可分为有界和无界,根据是否阻塞可分为非阻塞和阻塞队列。

ConcurrentLinkedQueue兼具无锁、非阻塞和无界。在非阻塞队列中,假设遇到队列满或队列空,需要上层调用单独控制,比如忙等或者忽略,ConcurrentLinkedQueue使用场景并不多见,但是如果借助外部AQS控制,可以实现一个高性能的对象池。

ConcurrentLinkedQueue由于是无阻塞队列,并未实现BlockingQueue的API,仅仅实现了Queue API,所以它没有take、put方法。

对象池里元素的数量是有限的,比如jedis连接池,数据库连接池,当连接池数量为0时需要等待,当从连接池中取出元素使用完后,需要归还,基于此,我们实现代码如下:

public class LinkedQueuePool {

    public static class ConnObj {
        public String ip;
        public String port;
        public ConnObj(String ip, String port) {
            this.ip = ip;
            this.port = port;
        }
    }

    private final   ConcurrentLinkedQueue<ConnObj> linkedQueue;
    public final   AtomicInteger sizeCounter;
    private final   PoolSynchronizer synchronizer;

    public LinkedQueuePool(Integer size) {
        linkedQueue = new ConcurrentLinkedQueue<>();
        sizeCounter = new AtomicInteger();
        for (int i = 0; i < size; i++) {
            linkedQueue.offer(new ConnObj("ip", "" + i));
            sizeCounter.incrementAndGet();
        }
        synchronizer = new PoolSynchronizer(sizeCounter);
    }

    public static void main(String[] args) throws InterruptedException {
        LinkedQueuePool pool = new LinkedQueuePool(1);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
              while (true) {
                  try {
                      ConnObj connObj = pool.get();
                      System.out.println(Thread.currentThread().getName() + "使用连接" + connObj.ip + connObj.port);
                      pool.release(connObj);
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                  }
              }
            }).start();
        }
    }

    public ConnObj get() throws InterruptedException {
        do {
            //进入连接获取
            ConnObj obj = linkedQueue.poll();
            if (obj != null) {
                sizeCounter.decrementAndGet();
                return obj;
            } else {
                //进入等待释放
                synchronizer.acquireShared(sizeCounter.get());
            }
        } while (true);
    }

    public void release(ConnObj val) {
        linkedQueue.offer(val);
        //连接池数量减1
        sizeCounter.incrementAndGet();
        //唤醒永远成功,有循环判断,可以保证不会泄露
        synchronizer.releaseShared(1);
    }

    public static class PoolSynchronizer extends AbstractQueuedSynchronizer {

        private static final long serialVersionUID = 6305735919591847529L;

        private AtomicInteger counter;

        //直接将连接池数量传进来
        public PoolSynchronizer(AtomicInteger counter) {
            this.counter = counter;
        }

        //内部类的好处是可以不用state,直接判断连接池数量是否大于1
        @Override
        protected int tryAcquireShared(final int remain) {
            //counter.get() 用于被唤醒后再次检查
            return  remain > 0 ?  1 : counter.get();
        }

        @Override
        protected boolean tryReleaseShared(final int ignore) {
            return true;
        }
    }
}

输出如下:

Thread-0使用连接ip0
Thread-1使用连接ip0
Thread-2使用连接ip0
Thread-9使用连接ip0
Thread-5使用连接ip0

ConcurrentLinkedQueue是无阻塞的,而连接数量有限,得不到连接的线程将进入阻塞,我们需要实现一个AQS来将得不到连接的线程进入阻塞等待。

AQS的实现PoolSynchronizer,相当于是对过去AQS知识点的温习,其中tryAcquireShared需要仔细体会,处于等待队列的线程被唤醒后需要再次tryAcquireShared来检查是否真正获取许可,所以我们需要将对象池的数量计数传递到构造参数中缓存起来。

以上即是ConcurrentLinkedQueue实现的对象池。除了ConcurrentLinkedQueue,我们还可以使用LinkedTransferQueue来实现,而且在对象池数量降为0时,可以自动进入阻塞,无需AQS同步器的帮助。

3. 总结

ConcurrentLinkedQueue是JDK中唯一的无阻塞队列,用于无界无阻塞场景,也用于借助外部同步器来实现阻塞的场景,使用较少,但一般面试经常问到举出无阻塞队列的例子,ConcurrentLinkedQueue的内部原理无需深入,代码相对复杂,研究借鉴的意义不大,知道它的特点和使用场景即可。

预览图
评论区

索引目录