JAVA并发容器代码随读

Wesley13
• 阅读 462

1. java.util.concurrent****所提供的并发容器
java.util.concurrent提供了多种并发容器,总体上来说有4类,队列类型的BlockingQueue和 ConcurrentLinkedQueue,Map类型的ConcurrentMap,Set类型的ConcurrentSkipListSet和CopyOnWriteArraySet,List类型的CopyOnWriteArrayList.

这些并发容器都采用了多种手段控制并发的存取操作,并且尽可能减小控制并发所带来的性能损耗。接下来我们会对每一种类型的实现类进行代码分析,进而得到java.util.con current包所提供的并发容器在传统容器上所做的工作。

2. BlockingQueue
BlockingQueue接口定义的所有方法实现都是线程安全的,它的实现类里面都会用锁和其他控制并发的手段保证这种线程安全,但是这些类同时也实现了Collection接口(主要是AbstractQueue实现),所以会出现BlockingQueue的实现类也能同时使用Conllection接口方法,而这时会出现的问题就是像addAll,containsAll,retainAll和removeAll这类批量方法的实现不保证线程安全,举个例子就是addAll 10个items到一个ArrayBlockingQueue,可能中途失败但是却有几个item已经被放进这个队列里面了。

下面我们根据这幅类图来逐个解析不同实现类的特性和特性实现代码
JAVA并发容器代码随读
DelayQueue提供了一个只返回超时元素的阻塞队列,也就是说,即使队列中已经有数据了,但是poll或者take的时候还要判定这个element有没达到规定的超时时间,poll方法在element还没达到规定的超时时间返回null,take则会通过condition.waitNanos()进入等待状态。一般存储的element类型为Delayed,这个接口JDK中实现的类有ScheduledFutureTask,而DelayQueue为DelayedWorkQueue的Task容器,后者是ScheduledThreadPoolExecutor的工作队列,所以DelayQueue所具有的超时提供元素和线程安全特性对于并发的定时任务有很大的意义。

public  E take()  throws  InterruptedException {
         final  ReentrantLock lock  = this .lock;
        //控制并发
        lock.lockInterruptibly();
         try  {
             for  (;;) {
                E first  =  q.peek();
                 if  (first  == null ) {
                    //condition协调队列里面元素
                    available.await();
                }  else  {
                     long  delay  =   first.getDelay(TimeUnit.NANOSECONDS);
                     if  (delay  > 0 ) {
                        //因为first在队列里面的delay最短的(优先队列保证),所以wait这个时间那么队列中最短delay的元素就超时了.即
                        //队列有元素供应了.                         long  tl  =  available.awaitNanos(delay);
                    }  else  {
                        E x  =  q.poll();
                         assert  x  != null ;
                         if  (q.size()  != 0 )
                            available.signalAll();  //  wake up other takers
return  x;

                    }
                }
            }
        }  finally  {
            lock.unlock();
        }
}

DelayQueue的内部数据结构是PriorityQueue,因为Delayed接口同时继承了Comparable接口,并且Delayed的实现类对于这个compareTo方法的实现是基于超时时间进行大小比较,所以DelayQueue无需关心数据的排序问题,只需要做好存取的并发控制(ReetranLock)和超时判定即可。另外,DelayQueue有一个实现细节就是通过一个Condition来协调队列中是否有数据可以提供,这对于take和带有提取超时时间的poll是有意义的(生产者,消费者的实现)。

PriorityBlockingQueue实现对于外部而言是按照元素的某种顺序返回元素,同时对存取提供并发保护(ReetranLock),使用Condition协调队列是否有新元素提供。PriorityBlocking Queue内部的数据结构为PriorityQueue,优先级排序工作交给PriorityQueue,至于怎么排序,需要根据插入元素的Comparable的接口实现,和DelayQueue比起来,它没有限定死插入数据的Comparable实现,而DelayQueue的元素实现Comparable必须按照超时时间的长短进行比较,否则DelayQueue返回的元素就很可能是错误的。

ArrayBlockingQueue是一个先入先出的队列,内部数据结构为一个数组,并且一旦创建这个队列的长度是不可改变的,当然put数据时,这个队列也不会自动增长。ArrayBlockingQueue也是使用ReetranLock来保证存取的原子性,不过使用了notEmpty和notFull两个Condition来协调队列为空和队列为满的状态转换,插入数据的时候,判定当前内部数据结构数组E[] items的长度是否等于元素计数,如果相等,说明队列满,notFull.await(),直到items数组重新不为满(removeAt,poll等),插入数据后notEmpty.sinal()通知所有取数据或者移除数据并且因为items为空而等待的线程可以继续进行操作了。提取数据或者移除数据的过程刚好相反。

ArrayBlockingQueue使用三个数字来维护队列里面的数据变更,包括takeIndex,putIndex,count,这里需要讲一下takeIndex和putIndex,其中takeIndex指向下一个能够被提取的元素,而putIndex指向下一个能够插入数据的位置,实现类似下图的结构,当takeIndex移到内部数组items最大长度时,重新赋值为0,也就是回到数组头部,putIndex也是相同的策略.
JAVA并发容器代码随读

JAVA并发容器代码随读 /**
JAVA并发容器代码随读 * 循环增加putIndex和takeIndex,如果到数组尾部,那么
JAVA并发容器代码随读 * 置为0
JAVA并发容器代码随读 */
JAVA并发容器代码随读 final int  inc( int  i)  {
JAVA并发容器代码随读 return (++i == items.length)? 0 : i;
JAVA并发容器代码随读 }
JAVA并发容器代码随读
JAVA并发容器代码随读 /**
JAVA并发容器代码随读 * 插入一个item,需要执行线程获得了锁
JAVA并发容器代码随读 */
JAVA并发容器代码随读 private void  insert(E x)  {
JAVA并发容器代码随读     items[putIndex] = x;
JAVA并发容器代码随读 //累加putIndex,可能到数组尾部,那么重新指向0位置
JAVA并发容器代码随读 putIndex = inc(putIndex);
JAVA并发容器代码随读 ++count;
     //put后,使用Condition通知正在等待take的线程可以做提取操作JAVA并发容器代码随读     notEmpty.signal();
JAVA并发容器代码随读 }
JAVA并发容器代码随读
JAVA并发容器代码随读 /**
JAVA并发容器代码随读 * 获取一个元素,执行这个操作的前提是线程已经获得锁,
JAVA并发容器代码随读 * 内部调用
JAVA并发容器代码随读 */
JAVA并发容器代码随读 private  E extract()  {
JAVA并发容器代码随读 final E[] items = this.items;
JAVA并发容器代码随读     E x = items[takeIndex];
JAVA并发容器代码随读     items[takeIndex] = null;
JAVA并发容器代码随读 //累加takeIndex,有可能到数组尾部,重新调到数组头部
JAVA并发容器代码随读 takeIndex = inc(takeIndex);
JAVA并发容器代码随读 --count;
JAVA并发容器代码随读 //take后,使用Condition通知正在等待插入的线程可以插入
JAVA并发容器代码随读 notFull.signal();
JAVA并发容器代码随读 return x;
JAVA并发容器代码随读 }

这里需要解释下Condition的实现,Condition现在的JDK实现只有AQS的ConditionObject,并且通过ReetranLock的newConditon()方法暴露出来,这是因为Condition的await()或者sinal()一般在lock.lock()与lock.unlock()之间执行,当执行condition.await()方法时,它会首先释放掉本线程持有的锁,然后自己进入等待队列,直到sinal(),唤醒后又会重新去试图拿到锁,拿到后执行await下方的代码,其中释放当前锁和得到当前锁都需要ReetranLock的tryAcquire(int args)方法来判定,并且享受ReetranLock的重进入特性。

JAVA并发容器代码随读 public final void  await()  throws  InterruptedException  {
JAVA并发容器代码随读 if (Thread.interrupted())
JAVA并发容器代码随读 throw new InterruptedException();
JAVA并发容器代码随读 //加一个新的condition等待节点
JAVA并发容器代码随读 Node node = addConditionWaiter();
JAVA并发容器代码随读 //释放自己占用的锁
JAVA并发容器代码随读 int savedState = fullyRelease(node);
JAVA并发容器代码随读 int interruptMode = 0;
JAVA并发容器代码随读 while (!isOnSyncQueue(node)) {
JAVA并发容器代码随读 //如果当前线程等待状态是CONDITION,park住当前线程,等待condition的signal来解除
JAVA并发容器代码随读 LockSupport.park(this);
JAVA并发容器代码随读 if ((interruptMode =checkInterruptWhileWaiting(node)) != 0)
JAVA并发容器代码随读 break;
JAVA并发容器代码随读     } JAVA并发容器代码随读 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
JAVA并发容器代码随读         interruptMode = REINTERRUPT;
JAVA并发容器代码随读 if (node.nextWaiter != null)
JAVA并发容器代码随读         unlinkCancelledWaiters();
JAVA并发容器代码随读 if (interruptMode != 0)
JAVA并发容器代码随读         reportInterruptAfterWait(interruptMode);
JAVA并发容器代码随读 }

LinkedBlockingQueue是一个链表结构构成的队列,并且节点是单向的,也就是只有next,没有prev,可以设置容量,如果不设置,最大容量为Integer.MAX_VALUE,队列只持有头结点和尾节点以及元素数量,通过putLock和takeLock两个ReetranLock分别控制存和取的并发,但是remove,toArray,toString,clear, drainTo以及迭代器等操作会同时取得putLock和takeLock,并且同时lock,此时存或者取操作都会不可进行,这里有个细节需要注意的就是所有需要同时lock的地方顺序都是先putLock.lock再takeLock.lock,这样就避免了可能出现的死锁问题。takeLock实例化出一个notEmpty的Condition,putLock实例化一个notFull的Condition,两个Condition协调即时通知线程队列满与不满的状态信息,这在前面几种BlockingQueue实现中也非常常见,在需要用到线程间通知的场景时,各位不妨参考下。另外dequeue的时候需要改变头节点的引用地址,否则肯定会造成不能GC而内存泄露

JAVA并发容器代码随读 private  E dequeue()  {
JAVA并发容器代码随读     Node<E> h = head;
JAVA并发容器代码随读     Node<E> first = h.next;
JAVA并发容器代码随读 //将原始节点的next指针指向自己,这样就能GC到自己否则虚拟机会认为这个节点仍然在用而不销毁(不知道是否理解有误)
JAVA并发容器代码随读 h.next = h; // help GC
JAVA并发容器代码随读 head = first;
JAVA并发容器代码随读     E x = first.item;
JAVA并发容器代码随读     first.item = null;
JAVA并发容器代码随读 return x;
JAVA并发容器代码随读 }

BlockingDequeue为阻塞的双端队列接口,继承了BlockingQueue,双端队列的最大的特性就是能够将元素添加到队列末尾,也能够添加到队列首部,取元素也是如此。LinkedBlockingDequeue实现了BlockingDequeue接口,就像LinkedBlockingQueue类似,也是由链表结构构成,但是和LinkedBlockingQueue不一样的是,节点元素变成了可双向检索,也就是一个Node持有next节点引用,同时持有prev节点引用,这对队列的头尾数据存取是有决定性意义的。LinkedBlockingDequeue只采用了一个ReetranLock来控制存取并发,并且由这个lock实例化了2个Condition notEmpty和notFull,count变量维护队列长度,这里只使用一个lock来维护队列的读写并发,个人理解是头尾的读写如果使用头尾分开的2个锁,在维护队列长度和队列Empty/Full状态会带来问题,如果使用队列长度做为判定依据将不得不对这个变量进行锁定.

// 无论是offerLast,offerFirst,pollFirst,pollLast等等方法都会使用同一把锁.
public  E pollFirst() {
     final  ReentrantLock lock  = this .lock;
    lock.lock();
     try  {
         return  unlinkFirst();
    }  finally  {
        lock.unlock();
    }
}

public  E pollLast() {
     final  ReentrantLock lock  = this .lock;
    lock.lock();
     try  {
         return  unlinkLast();
    }  finally  {
        lock.unlock();
    }
 }

3. ConcurrentMap 
ConcurrentMap定义了V putIfAbsent(K key,V value),Boolean remove(Object Key,Object value),Boolean replace(K key,V oldValue,V newValue)以及V replace(K key,V value)四个方法,几个方法的特性并不难理解,4个方法都是线程安全的。

ConcurrentHashMap是ConcurrentMap的一个实现类,这个类的实现相当经典,基本思想就是分拆锁,默认ConcurrentHashMap会实例化一个持有16个Segment对象的数组,Segment数组大小是可以设定的,构造函数里的concurrencyLevel指定这个值,但是需要注意的是,这个值并不是直接赋值.Segment数组最大长度为MAX_SEGMENTS = 1 << 16

JAVA并发容器代码随读 int  sshift  = 0 ;
JAVA并发容器代码随读 int  ssize  = 1 ;
JAVA并发容器代码随读 // ssize是左移位的,也就是2,4,8,16,32JAVA并发容器代码随读 增长(*2),所以你设定concurrencyLevel为10的时候,这个时候并发数最大为8.
JAVA并发容器代码随读 while  (ssize  <  concurrencyLevel)  {
JAVA并发容器代码随读 ++sshift;
JAVA并发容器代码随读     ssize <<= 1;
JAVA并发容器代码随读 }

每个Segment维持一个自动增长的HashEntry数组(根据一个阈值确定是否要增长长度,并不是满了才做).

JAVA并发容器代码随读 int  c  =  count;
JAVA并发容器代码随读 // threshold一般(int)(capacity * loadFactor),
JAVA并发容器代码随读 if  (c ++ >  threshold) 
JAVA并发容器代码随读     rehash();

下面3段代码是ConcurrentHashMap的初始化Segment,计算hash值,以及如何选择Segment的代码以及示例注解.

JAVA并发容器代码随读 public  ConcurrentHashMap( int  initialCapacity,
JAVA并发容器代码随读          float  loadFactor,  int  concurrencyLevel)  {
JAVA并发容器代码随读 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
JAVA并发容器代码随读 throw new IllegalArgumentException();
JAVA并发容器代码随读
JAVA并发容器代码随读 //首先确定segment的个数,左移位,并且记录移了几次,比如conurrencyLevel为30,那么2->4->8->16,ssize为16,sshift为4
JAVA并发容器代码随读 if (concurrencyLevel > MAX_SEGMENTS)
JAVA并发容器代码随读         concurrencyLevel = MAX_SEGMENTS;
JAVA并发容器代码随读  
JAVA并发容器代码随读 int sshift = 0;
JAVA并发容器代码随读 int ssize = 1;
JAVA并发容器代码随读 while (ssize < concurrencyLevel) {
JAVA并发容器代码随读 ++sshift;
JAVA并发容器代码随读         ssize <<= 1;
JAVA并发容器代码随读     } JAVA并发容器代码随读 //segmentShift为28
JAVA并发容器代码随读 segmentShift = 32 - sshift;
JAVA并发容器代码随读 //segmentMask为15
JAVA并发容器代码随读 segmentMask = ssize - 1;
JAVA并发容器代码随读 //this.segments=new Segment[16]
JAVA并发容器代码随读 this.segments = Segment.newArray(ssize);
JAVA并发容器代码随读
JAVA并发容器代码随读 if (initialCapacity > MAXIMUM_CAPACITY)
JAVA并发容器代码随读         initialCapacity = MAXIMUM_CAPACITY;
JAVA并发容器代码随读 //假设initialCapacity使用32,那么c=2
JAVA并发容器代码随读 int c = initialCapacity / ssize;
JAVA并发容器代码随读 if (c * ssize < initialCapacity)
JAVA并发容器代码随读 ++c;
JAVA并发容器代码随读 int cap = 1;
JAVA并发容器代码随读 //cap为2
JAVA并发容器代码随读 while (cap < c)
JAVA并发容器代码随读         cap <<= 1;
JAVA并发容器代码随读 //每个Segment的容量为2
JAVA并发容器代码随读 for (int i = 0; i < this.segments.length; ++i)
JAVA并发容器代码随读 this.segments[i] = new Segment<K,V>(cap, loadFactor);
JAVA并发容器代码随读 }

JAVA并发容器代码随读 /** JAVA并发容器代码随读  *segmentShift为28,segmentMask为15(1111)
JAVA并发容器代码随读  *因为hash值为int,所以32位的
JAVA并发容器代码随读  *hash >>> segentShift会留下最高的4位,
JAVA并发容器代码随读  *再与mask 1111做&操作
JAVA并发容器代码随读  *所以这个最终会产生 0-15的序列.
JAVA并发容器代码随读 */
JAVA并发容器代码随读 final  Segment < K,V >  segmentFor( int  hash)  {
JAVA并发容器代码随读 return segments[(hash >>> segmentShift) & segmentMask];
JAVA并发容器代码随读 }

JAVA并发容器代码随读 /** JAVA并发容器代码随读  *将计算的hash值补充到原始hashCode中,这是为了防止
   *外部用户传进来劣质的hash值(比如重复度很高)所带来
   *的危害. 
JAVA并发容器代码随读 */
JAVA并发容器代码随读 private static int  hash( int  h)  {
JAVA并发容器代码随读 // Spread bits to regularize both segment and index locations,
JAVA并发容器代码随读 // using variant of single-word Wang/Jenkins hash.
JAVA并发容器代码随读 h += (h << 15) ^ 0xffffcd7d;
JAVA并发容器代码随读     h ^= (h >>> 10);
JAVA并发容器代码随读     h += (h << 3);
JAVA并发容器代码随读     h ^= (h >>> 6);
JAVA并发容器代码随读     h += (h << 2) + (h << 14);
JAVA并发容器代码随读 return h ^ (h >>> 16);
JAVA并发容器代码随读 }

当put进来一个key、value对,ConcurrentHashMap会计算Key的hash值,然后从Segment数组根据key的Hash值选出一个Segment,调用其put方法,Segment级别的put方法通过ReetranLock来控制读取的并发,其实Segment本身继承了ReetranLock类。

Segment的put方法在lock()后,首先对数组长度加了新的元素之后是否会超过阈值threshold进行了判定,如果超过,那么进行rehash(),rehash()的过程相对繁琐,首先数组会自动增长一倍,然后需要对HashEntry数组中的所有元素都需要重新计算hash值,并且置到新数组的新的位置,同时为了减小操作损耗,将原来不需要移动的数据不做移动操作(power-of-two expansion,在默认threshold,在数组扩大一倍时只需要移动1/6元素,其他都可以不动)。所有动作完成之后,通过一个while循环寻找Segment中是否有相同Key存在,如果已经存在,那么根据onlyIfAbsent参数确定是否替换(如果为true,不替换,如果为false,替换掉value),然后返回替换的value,如果不存在,那么新生成一个HashEntry,并且根据一开始计算出来的index放到数组指定位置,并且累积元素计数,返回put的值。最后unlock()释放掉锁.

**4. CopyOnWriteArrayList和CopyOnWriteArraySet

CopyOnWriteList**是线程安全的List实现,其底层数据存储结构为数组(Object[] array),它在读操作远远多于写操作的场景下表现良好,这其中的原因在于其读操作(get(),indexOf(),isEmpty(),contains())不加任何锁,而写操作(set(),add(),remove())通过Arrays.copyOf()操作拷贝当前底层数据结构(array),在其上面做完增删改等操作,再将新的数组置为底层数据结构,同时为了避免并发增删改, CopyOnWriteList在这些写操作上通过一个ReetranLock进行并发控制。另外需要注意的是,CopyOnWriteList所实现的迭代器其数据也是底层数组镜像,所以在CopyOnWriteList进行interator,同时并发增删改CopyOnWriteList里的数据实不会抛“ConcurrentModificationException”,当然在迭代器上做remove,add,set也是无效的(抛UnsupportedOperationExcetion),因为迭代器上的数据只是当前List的数据数组的一个拷贝而已。

CopyOnWriteSet是一个线程安全的Set实现,然后持有一个CopyOnWriteList实例,其所有的操作都是这个CopyOnWriteList实例来实现的。CopyOnWriteSet与CopyOnWriteList的区别实际上就是Set与List的区别,前者不允许有重复的元素,后者是可以的,所以CopyOnWriteSet的add和addAll两个操作使用的是其内部CopyOnWriteList实例的addAbsent()和addAllAbsent()两个防止重复元素的方法,addAbsent()实现是拷贝底层数据数组,然后逐一比较是否相同,如果有一个相同,那么直接返回false,说明插入失败,如果和其他元素不同,那么将元素加入到新的数组中,最后置回新的数组, addAllAbsent()方法实现则是能有多少数据插入就插入,也就是说addAllAbsent一个集合的数据,可能只有一部分插入成功,另外一部分因为元素相同而遭丢弃,完成后返回插入的元素。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
2年前
JUC
Java5.0在java.util.concurrent包中提供了多种并发容器类来改进同步容器的性能。CountDownLatch一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
2年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这