一种极致性能的缓冲队列

捉虫大师 等级 852 1 0

本文已收录 https://github.com/lkxiaolou/lkxiaolou 欢迎star。

背景

在多线程下的生产者-消费者模型中,需求满足如下情况:

对生产者生产投递数据的性能要求非常高 多个生产者,单个(多个也可以,本文只介绍单个的情况)消费者 当消费者跟不上生产者速度时,可容忍少部分数据丢失 生产者是单条单条地生产数据 举个日志采集的例子,日志在不同的线程上生产,在日志生产速度远超消费者速度时,可以丢弃部分数据,要求打日志的性能损耗最小,这种情况下可采用本文提供的极致性能的缓冲队列。

实现细节

多个生产者向一个缓冲队列提交消息,说到底是线程安全问题,如果不考虑线程安全,性能必然是最高的,但出现的问题是,数据经常被覆盖。虽然可以容忍少部分数据丢失,但也是在消费者跟不上生产者速度时。缓冲区必然有界,无界可能导致内存泄露,如果缓冲区满,再生产新数据,可选的策略一般有如下几种:

  • 阻塞直到被消费
  • 覆盖旧数据
  • 丢弃新数据 在要求对生产者性能损耗最小的情况下一般不选1,通常采取覆盖策略。

环形队列

有一种环形队列的数据结构(ring buffer)可以很好的解决解决上面提到的生产者-消费者模型、缓冲区有界、覆盖策略。通常用数组来实现ring buffer,只要保证生产者获取下标是线程安全的即可解决线程安全问题。而且数组内存预先分配加上连续内存索引更加快速的特点也保证了强悍的性能。

一种极致性能的缓冲队列

AtomicInteger

在环形队列上如何保证线程安全地获取数组下标?线程安全地自增我们想到了AtomicInteger,很容易写出如下代码

public class AtomicRangeInteger extends Number {

    private final AtomicInteger value;

    private final int startValue;
    private final int endValue;

    public AtomicRangeInteger(int startValue, int endValue) {
        this.startValue = startValue;
        this.endValue = endValue;
        this.value = new AtomicInteger(startValue);
    }

    public final int incrementAndGet() {
        int next;
        do {
            next = value.incrementAndGet();
            if (next > endValue && value.compareAndSet(next, startValue)) {
                return startValue;
            }
        } while (next > endValue);

        return next;
    }

    public final int get() {
        return value.intValue();
    }

    @Override
    public int intValue() {
        return value.intValue();
    }

    @Override
    public long longValue() {
        return value.intValue();
    }

    @Override
    public float floatValue() {
        return value.intValue();
    }

    @Override
    public double doubleValue() {
        return value.intValue();
    }
}
public final class RingBuffer<T> {

    private int bufferSize;
    private AtomicRangeInteger index;
    private final T[] buffer;

    @SuppressWarnings("unchecked")
    public RingBuffer(int bufferSize) {
        this.bufferSize = bufferSize;
        this.index = new AtomicRangeInteger(0, bufferSize);
        this.buffer = (T[]) new Object[bufferSize];
    }

    public final void offer(final T data) {
        buffer[index.incrementAndGet()] = data;
    }

    public final T poll(int index) {
        T tmp = buffer[index];
        buffer[index] = null;
        return tmp;
    }

    public int getBufferSize() {
        return bufferSize;
    }
}

核心代码其实就是这一段

public final int incrementAndGet() {
    int next;
    do {
        next = value.incrementAndGet();
        if (next > endValue && value.compareAndSet(next, startValue)) {
            return startValue;
        }
    } while (next > endValue);

    return next;
}
  • 首选生产者获取下一个可用的index,直接在当前基础上调用incrementAndGet进行加1操作,该操作是原子的,故拿到的一定是没有被其他线程占用的index
  • 获取上一步的返回值,该返回值有可能超过ring buffer的最大下标值,如果超过则将其置为startValue,这一步使用compareAndSet,可能会失败,如果失败说明有其他线程做了该操作,故可以再调用一次incrementAndGet获取下一个下标

为何是极致的性能

目前有一款开源ring buffer的实现——disruptor,关于它的介绍网上可以找到很多,这里简单介绍一下。它也是使用的数组来充当环形队列,但与上面的实现有一点差别,它可以批量插入,所以它使用的是compareAndSet,它在缓冲区填满以后的策略是阻塞,它的下标是不回溯,永远往后加,采用取模来映射到对应的index,为了性能使用位运算(&),所以它的容量只能是2的N次方。

主要的不同就在compareAndSet(v1)与incrementAndGet(v0),我们可以将原先的incrementAndGet实现改为compareAndSet测试一下性能差异(缓冲区大小为1000):

Benchmark                    Mode  Cnt         Score   Error  Units
RingBufferBenchmark.testV0  thrpt    2  39969002.156          ops/s
RingBufferBenchmark.testV1  thrpt    2  15533576.961          ops/s

为什么会有三倍性能的差距?看一下incrementAndGet的实现:

/**
 * Atomically increments by one the current value.
 *
 * @return the updated value
 */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

发现incrementAndGet底层也是CAS的实现。其实在JDK8之后对incrementAndGet做了优化:

Unsafe是经过特殊处理的,不能理解成常规的java代码,在调用getAndAddInt的时候,如果系统底层支持fetch-and-add,那么它执行的就是native方法,使用的是fetch-and-add;如果不支持,就按照上面的所看到的getAndAddInt方法体那样,以java代码的方式去执行,使用的是compare-and-swap

CAS使用的是jdk层面的自旋锁,fetch-and-add是cpu指令,性能上fetch-and-add要强很多。所以如果拿上面的代码用jdk1.7来测试,会发现性能没有差别。

再优化空间

  • 缓存行填充 AtomicRangeInteger对象中存在三个属性,value,startValue,endValue,value是经常变化的,startValue,endValue是不会变化,所以当value经常变化会导致读取startValue,endValue时不会命中cpu缓存,性能有所下降,我们使用jdk8的注解@Contended来填充value所在行的缓存。
public class AtomicRangeIntegerV2 extends Number {

    @Contended
    protected final AtomicInteger value;

    protected final int startValue;
    protected final int endValue;
    ...
}

做一下benchmark(v2为加@Contended注解):

Benchmark                    Mode  Cnt         Score   Error  Units
RingBufferBenchmark.testV2  thrpt    2  72095754.040          ops/s
RingBufferBenchmark.testV0  thrpt    2  44360926.943          ops/s
  • 多个ring buffer分担冲突

这个优化就不做过多说明了,多线程中一般都采取分段的思想来降低冲突,ring buffer也可以,当一个ring buffer存在性能瓶颈时,可以利用多个ring buffer来分担,最佳状态是每个线程分配一个ring buffer,具体怎么分配,在之前的文章《实现一个比LongAdder更高性能的计数器有多难?》中有一个比较巧妙的办法,可以参考下。

彩蛋

这个ring buffer本来是借鉴skywalking中ring buffer的实现,但当时skywalking的实现也是使用CAS,性能不是很满意,于是我就做了这些优化,具体github的issue可以参考如下链接:

https://github.com/apache/skywalking/pull/2874 https://github.com/apache/skywalking/pull/2930

现在skywalking中已经是我写的这个版本了,算得上是一个极致性能的缓冲队列了。


搜索关注微信公众号"捉虫大师",后端技术分享,架构设计、性能优化、源码阅读、问题排查、踩坑实践。

一种极致性能的缓冲队列

收藏
评论区

相关推荐

python的requests模块的使用
前言: 在web后台开发过程中,会遇到需要向第三方发送http请求的场景,python中的requests库可以很好的满足这一要求,这里简要记录一下requests模块的使用! 说明: 这里主要记录一下requests模块的如下几点: 1.requests模块的安装 2.requests模块发送get请求 3.requests模块
HTTP 的本质?HTTP 和 RPC 的区别?
身为 Java Web 开发我发现很多人一些 Web 基础问题都答不上来。 上周我面试了一个三年经验的小伙子,一开始我问他 HTTP/1、HTTP/2相关的他到是能答点东西出来。 后来我问他:你怎么理解 HTTP 的,HTTP 的作用是什么? 他支支吾吾答不出来。 经过了一番引导交谈,他回答是用来客户端和服务端之间传输的。 我接着问那你知道什么是
一个免费的开源的html转markdown语法的工具
一个免费的开源的html转markdown语法的工具 大家好,我是待兔,今天为大家分享一个由 www.helloworld.net 网站开发并开源的一个非常好用的工具 html2md 现在好的技术文章确实多,每天各种技术群里,各种技术社区,有很多质量非常好的技术文章,于是我们就收藏了,可是问题来了,我们收藏到哪呢? 怎么收藏呢? 1. 微信群里发的文
helloworld.net 的总结以及2021年的期待
没有反思的人生不值得过!由时不时向外张望,彻底转向向内审视的一年。 2020年,公历闰年,共366天,52周零2天。二十一世纪二十年代的第1年。 大家好,我是待兔, helloworld.net也就是 helloworld开发者社区的创始人之一,由于前几天感冒了,你知道的,这个时间感冒是有点麻烦的,所以导致这篇文章来的稍晚了点,好饭不怕晚,晚点写有晚点写
JS - ES6 的 Module
一、介绍 模块,(Module),是能够单独命名并独立地完成一定功能的程序语句的集合(即程序代码和数据结构的集合体)。 两个基本的特征:外部特征和内部特征 外部特征是指模块跟外部环境联系的接口(即其他模块或程序调用该模块的方式,包括有输入输出参数、引用的全局变量)和模块的功能 内部特征是指模块的内部环境具有的特点(即该模
python中的split()函数的用法
函数:split() Python中有split()和os.path.split()两个函数,具体作用如下: split():拆分字符串。通过指定分隔符对字符串进行切片,并返回分割后的字符串列表(list) os.path.split():按照路径将文件名和路径分割开 一、函数说明 1、split()函数 语法:str.split(str
阿里二面,面试官居然把 TCP 三次握手问的这么细致
TCP 的三次握手和四次挥手,可以说是老生常谈的经典问题了,通常也作为各大公司常见的面试考题,具有一定的水平区分度。看似是简单的面试问题,如果你的回答不符合面试官期待的水准,有可能就直接凉凉了。本文会围绕,三次握手和四次挥手相关的一些列核心问题,分享如何更准确的回答和应对常见的面试问题,以后面对再刁钻的面试官,你都可以随意地跟他扯皮了。 面试TCP的意义
Java练习(三)——返回集合中的最大的和最小的元素
题目:在一个列表中存储以下元素:apple,grape,banana,pear,现要求将集合进行排序,返回集合中的最大的和最小的元素,并将排序后的结果打印在控制台上,要求的打印输出方法分别为默认toString输出、迭代器输出、for循环遍历输出和增强for循环输出。 package test;import java.util.;public class P
人工智能数学基础-线性代数1:向量的定义及向量加减法
一、向量 1.1、向量定义向量也称为欧几里得向量、几何向量、矢量,指具有大小(magnitude)和方向的量。它可以形象化地表示为带箭头的线段。箭头所指:代表向量的方向;线段长度:代表向量的大小。与向量对应的量叫做数量(物理学中称标量),数量(或标量)只有大小,没有方向。1. 在物理学和工程学中,几何向量更常被称为矢量。 2. 一般印刷用黑体的小写
人工智能数学基础-线性代数4:矩阵及矩阵运算
一、矩阵定义矩阵(Matrix)是一个按照长方阵列排列的复数或实数集合,定义如下: 由 m × n 个数aij排成的m行n列的数表称为m行n列的矩阵,简称m × n矩阵。记作: 这m×n 个数称为矩阵A的元素,简称为元,数aij位于矩阵A的第i行第j列,称为矩阵A的(i,j)元,以数 aij为(i,j)元的矩阵可记为(aij)或(aij)m × n,m×
凉凉!面试阿里我被Redis技术专题给搞的昏倒在地~
凉凉!面试阿里我被Redis技术专题给弄死了📚我本以为我可以像是别的博主一样去阿里面试随随便便,因为Redis,我直接被阿里大佬淦翻在地上好了不装了 没过没关系 我总结了一些这些最难的知识点!!!!然后自己总结归类再去百度查询了一些 最终得出这份Redis技术专题 题目开淦 Redis集群的主从复制模型是怎样的?为了是在部分节点失败或者大部分节点无法通信的情
python的学习难?你的方法不对罢了,看看我的。
1、选择Python版本对于使用python的人来说,python的版本就是我们的工作环境,因此,在学习之前需要考虑好学习哪个版本,python2和python3的版本不同,会存在一些差异,虽说不大,但直接学习python3 的话相对来说会好一点,而且跑一趟还能3相对来说对零基础的小白来说更加的友好,容易上手。2、学习Python基础知识Python 是一个
c++类和继承面试点25连问
本篇文章连问面试时经常会遇到的类和继承相关25个问题,看看你能回答出几道题呀。还是先看一下思维导图,如下: 1. c++的三大特性是什么c++的三大特性,说白了其实就是面向对象的三大特性,是指:封装、继承、多态,简单说明如下: 封装是一种技术,它使类的定义和实现分离,也就是隐藏了实现细节,只留下接口给他人调用,另外封装还有一层意义是它把某种事物具现出属性和方
浅析常用的Python Web的几大框架
在各种语言平台中,python涌现的web框架恐怕是最多的,是一个百花齐放的世界,各种microframework、framework不可胜数;猜想原因应该是在python中构造框架十分简单,使得轮子不断被发明。所 以在Python社区总有关于Python框架孰优孰劣的话题。下面就给大家介绍一下python的几大框架: Django Django 应该是最出
JAVA回调机制(CallBack)之小红是怎样买到房子的??
JAVA回调机制CallBack 序言最近学习java,接触到了回调机制(CallBack)。初识时感觉比较混乱,而且在网上搜索到的相关的讲解,要么一言带过,要么说的比较单纯的像是给CallBack做了一个定义。当然了,我在理解了回调之后,再去看网上的各种讲解,确实没什么问题。但是,对于初学的我来说,缺了一个循序渐进的过程。此处,将我对回调机制的个人理解,按