容器源码分析 - 并发容器

技术栈杂食者
• 阅读 157

容器源码分析 - 并发容器

以下源码分析基于 JDK 1.8。

CopyOnWriteArrayList

1.读写分离

写操作在一个复制的数组上进行,读操作还是在原始数组中进行,读写分离,互不影响。

写操作需要加锁,防止并发写入时导致写入数据丢失。

写操作结束之后需要把原始数组指向新的复制数组。

public boolean add(E e) {
   //加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // newElements 是一个复制的数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        // 写操作在一个复制的数组上进行
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

final void setArray(Object[] a) {
    array = a;
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
    //读取操作仍然在原始的数组中
    return (E) a[index];
}

2.适用场景

CopyOnWriteArrayList 在写操作的同时允许读操作,大大提高了读操作的性能,很适合读多写少的应用场景。

CopyOnWriteArrayList 有其缺陷:

  • 内存占用:在写操作时需要复制一个新的数组,使得内存占用为原来的两倍左右;
  • 数据不一致:读操作不能读取实时性的数据,因为部分写操作的数据还未同步到读数组中。

所以 CopyOnWriteArrayList 不适合内存敏感以及对实时性要求很高的场景。

ConcurrentHashMap

1. 存储结构

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
}

ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶
从而使其并发度更高(并发度就是 Segment 的个数)。

Segment 继承自 ReentrantLock。

static final class Segment<K,V> extends ReentrantLock implements Serializable {

    private static final long serialVersionUID = 2249069246763182397L;

    static final int MAX_SCAN_RETRIES =
        Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

    transient volatile HashEntry<K,V>[] table;

    transient int count;

    transient int modCount;

    transient int threshold;

    final float loadFactor;
}
final Segment<K,V>[] segments;

默认的并发级别为 16,也就是说默认创建 16 个 Segment。

static final int DEFAULT_CONCURRENCY_LEVEL = 16;

<div align="center"> <img src="https://github.com/DuHouAn/ImagePro/raw/master/java-notes/java/3fdfc89d-719e-4d93-b518-29fa612b3b18.png"/> </div>

2. size 操作

每个 Segment 维护了一个 count 变量来统计该 Segment 中的键值对个数。

/**
 * The number of elements. Accessed only either within locks
 * or among other volatile reads that maintain visibility.
 */
transient int count;

在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。

ConcurrentHashMap 在执行 size 操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的。

尝试次数使用 RETRIES_BEFORE_LOCK 定义,该值为 2,retries 初始值为 -1,因此尝试次数为 3。

如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。

/**
 * Number of unsynchronized retries in size and containsValue
 * methods before resorting to locking. This is used to avoid
 * unbounded retries if tables undergo continuous modification
 * which would make it impossible to obtain an accurate result.
 */
static final int RETRIES_BEFORE_LOCK = 2;

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            // 超过尝试次数,则对每个 Segment 加锁
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            // 连续两次得到的结果一致,则认为这个结果是正确的
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

3. JDK 1.8 的改动

ConcurrentHashMap 取消了 Segment 分段锁。

JDK 1.8 使用 CAS 操作来支持更高的并发度,在 CAS 操作失败时使用内置锁 synchronized

数据结构与HashMap 1.8 的结构类似,数组+链表 / 红黑二叉树(链表长度 > 8 时,转换为红黑树 )。synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 Hash 值不冲突,就不会产生并发。

4. JDK 1.8 中的 put 方法

(1)hash 算法

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

(2)定位索引位置

i = (n - 1) & hash

(3)获取 table 中对应索引的元素 f

f = tabAt(tab, i = (n - 1) & hash
// Unsafe.getObjectVolatile 获取 f
// 因为可以直接指定内存中的数据,保证了每次拿到的数据都是新的
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}Copy to clipboardErrorCopied

(4)如果 f 是 null,说明 table 中是第一次插入数据,利用

  • 如果 CAS 成功,说明 Node 节点插入成功
  • 如果 CAS 失败,说明有其他线程提前插入了节点,自旋重新尝试在该位置插入 Node

(5)其余情况把新的 Node 节点按链表或红黑树的方式插入到合适位置,这个过程采用内置锁实现并发。

5. 和 Hashtable 的区别

底层数据结构:

  • JDK1.7 的 ConcurrentHashMap 底层采用分段的数组+链表实现,
    JDK1.8 的 ConcurrentHashMap 底层采用的数据结构与 JDK1.8 的 HashMap 的结构一样,数组+链表/红黑二叉树
  • Hashtable 和 JDK1.8 之前的HashMap的底层数据结构类似都是采用数组+链表的形式,
    数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的。

实现线程安全的方式

  • JDK1.7 的 ConcurrentHashMap(分段锁)对整个桶数组进行了分割分段(Segment),
    每一把锁只锁容器其中一部分数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问度。
    JDK 1.8 采用数组+链表/红黑二叉树的数据结构来实现,并发控制使用 synchronizedCAS来操作。
  • Hashtable 使用 synchronized 来保证线程安全,效率非常低下。
    当一个线程访问同步方法时,其他线程也访问同步方法,可能会进入阻塞或轮询状态,
    如使用 put 添加元素,另一个线程不能使用 put 添加元素,也不能使用 get,竞争会越来越激烈。

    推荐阅读:

  • 程序员面试手册
  • 编程笔记

    本文由博客一文多发平台 OpenWrite 发布!
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
4年前
java 并发容器一之BoundedConcurrentHashMap(基于JDK1.8)
最近开始学习java并发容器,以补充自己在并发方面的知识,从源码上进行。如有不正确之处,还请各位大神批评指正。前言:本人个人理解,看一个类的源码要先从构造器入手,然后再看方法。下面看BoundedConcurrentHashMap的注释1  先看HashTable的介绍:    Thisclassimplementsahasht
待兔 待兔
5年前
mysql面试题:如何实现 MySQL 的读写分离?MySQL 主从复制原理是啥?如何解决 MySQL 主从同步的延时问题?
你有没有做MySQL读写分离?如何实现MySQL的读写分离?MySQL主从复制原理的是啥?如何解决MySQL主从同步的延时问题?考点分析高并发这个阶段,肯定是需要做读写分离的,啥意思?因为实际上大部分的互联网公司,一些网站,或者是app,其实都是读多写少。所以针对这个情况,就是写一个主库,但是主库挂多个从库,然后从多个从库来
Wesley13 Wesley13
4年前
Java CopyOnWrite容器
   CopyOnWrite简称COW(写时复制),是一种程序设计中的优化策略,读取时,直接读取,写入时,copy一个副本,在这个副本上进行写入,写入完成,用副本替换原数据,这是一种延时懒惰策略。   从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,CopyOnWriteArrayList和CopyOnW
Wesley13 Wesley13
4年前
Java并发系列4
今天讲另一个并发工具,叫读写锁。读写锁是一种分离锁,是锁应用中的一种优化手段。考虑读多写少的情况,这时如果我们用synchronized或ReentrantLock直接修饰读/写方法未尝不可,如:publicstaticclassRw{privateintval;publicsynchr
Wesley13 Wesley13
4年前
JAVA中 ReentrantReadWriteLock读写锁详系教程,包会
一、读写锁简介现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。 针对这种场景,JAVA的并发包提供了读写锁ReentrantReadW
Wesley13 Wesley13
4年前
Java集合框架源码及高质量代码案例分析
Java集合框架源码分析本次源码分析对JavaJDK中的集合框架部分展开分析,采用的是JDK1.8.0\_171版本的源码。Java集合框架(JavaCollectionsFramework,JCF)也称容器,即可以容纳其他Java对象的对象。JCF为开发者提供了通用的容器,数据持有对象的方式和对数据集合的操作,优点是:1
Stella981 Stella981
4年前
ReentrantReadWriteLock(读写锁)
ReentrantReadWriteLock是JDK5中提供的读写分离锁。读写分离锁可以有效的帮助减少锁的竞争,以此来提升系统的性能。用锁分离的机制来提升性能也非常好理解,比如线程A,B,C进行写操作,D,E,F进行读操作,如果使用ReentrantLock或者synchronized关键字,这些线程都是串行执行的,即每次都只有一个线程做操作。但是当D进行读
Stella981 Stella981
4年前
ReentrantReadWriteLock实现原理
  在java并发包java.util.concurrent中,除了重入锁ReentrantLock外,读写锁ReentrantReadWriteLock也很常用。在实际开发场景中,在使用共享资源时,可能读操作远远多于写操作。这种情况下,如果对这部分共享资源能够让多个线程读的时候不受阻塞,仅仅在写的时候保证安全性,这样效率会得到显著提升。读写锁Reentra
Stella981 Stella981
4年前
ReentrantReadWriteLock读写锁详解
一、读写锁简介现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。 针对这种场景,JAVA的并发包提供了读写锁ReentrantReadW
Stella981 Stella981
4年前
CopyOnWrite容器
CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWri
SpringBoot 项目优雅实现读写分离 | 京东云技术团队
一、读写分离介绍当使用SpringBoot开发数据库应用时,读写分离是一种常见的优化策略。读写分离将读操作和写操作分别分配给不同的数据库实例,以提高系统的吞吐量和性能。读写分离实现主要是通过动态数据源功能实现的,动态数据源是一种通过在运行时动态切换数据库连