Dubbo 线程池源码解析

Stella981
• 阅读 402

本文首发于个人微信公众号《andyqian》,期待你的关注!

前言

之前文章《Java线程池ThreadPoolExecutor》《ThreadPoolExecutor 原理解析》中,分别讲述了ThreadPoolExecutor 的概念以及原理,今天就一起来看看其在 Dubbo 框架中的应用。

ThreadFactory 与 AbortPolicy

  Dubbo 为我们提供了几种不同类型的线程池实现,其底层均使用的是 JDK 中的 ThreadPoolExecutor 线程池。ThreadPoolExecutor 我们都已经非常熟悉,其构造函数中有几个非常重要的参数。其中就包括:拒绝策略( ThreadPoolExecutor.AbortPolicy ) 以及 ThreadFactory,在 Dubbo 中自定义了 ThreadPoolExecutor.AbortPolicy 以及 ThreadFactory。在学习线程池之前,我们先来看看这两者的实现,更有益于后面的理解。

在Dubbo中NamedInternalThreadFactory 为自定义的线程 ThreadFactory 的子类。其类图如下:

Dubbo 线程池源码解析

其中 NamedInternalThreadFactory 类,其实现如下所示:

public class NamedInternalThreadFactory extends NamedThreadFactory {

    public NamedInternalThreadFactory() {
        super();
    }

    public NamedInternalThreadFactory(String prefix) {
        super(prefix, false);
    }

    public NamedInternalThreadFactory(String prefix, boolean daemon) {
        super(prefix, daemon);
    }

    @Override
    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        InternalThread ret = new InternalThread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        return ret;
    }
}

其中 NamedThreadFactory 类的实现如下:

public class NamedThreadFactory implements ThreadFactory {

    protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);

    protected final AtomicInteger mThreadNum = new AtomicInteger(1);

    protected final String mPrefix;

    protected final boolean mDaemon;

    protected final ThreadGroup mGroup;

    public NamedThreadFactory() {
        this("pool-" + POOL_SEQ.getAndIncrement(), false);
    }

    public NamedThreadFactory(String prefix) {
        this(prefix, false);
    }

    public NamedThreadFactory(String prefix, boolean daemon) {
        mPrefix = prefix + "-thread-";
        mDaemon = daemon;
        SecurityManager s = System.getSecurityManager();
        mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
    }

    @Override
    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        Thread ret = new Thread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        return ret;
    }

    public ThreadGroup getThreadGroup() {
        return mGroup;
    }

到这里,上述代码描述的是Dubbo对线程池中线程的命名规则,其作用是为了方便追踪信息。

接下来,我们来看下拒绝策略 AbortPolicyWithReport 类的实现,其类图如下所示:

Dubbo 线程池源码解析

源码如下:

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

    private final String threadName;

    private final URL url;

    private static volatile long lastPrintTime = 0;

    private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;

    private static final String OS_WIN_PREFIX = "win";

    private static final String OS_NAME_KEY = "os.name";

    private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss";

    private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss";

    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    // 覆盖 父类 ThreadPoolExecutor.AbortPolicy 的 rejectedExecution 方法。
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       // 构造 warn 参数,其中包括:线程状态,线程池数量,活跃数量,核心线程池数量,最大线程池数量 等信息。
        String msg = String.format("Thread pool is EXHAUSTED!" +
                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
                + "%d)," +
                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
            threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
            e.getLargestPoolSize(),
            e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
            url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        //  dump  堆栈信息
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

 // 当执行 rejectedExecution 方法时,会执行该方法。将会 dump 堆栈信息 至 DUMP_DIRECTORY 目录,默认为:user.name 目录下。
    private void dumpJStack() {
        long now = System.currentTimeMillis();

        //dump every 10 minutes
        if (now - lastPrintTime < TEN_MINUTES_MILLS) {
            return;
        }

        if (!guard.tryAcquire()) {
            return;
        }

        ExecutorService pool = Executors.newSingleThreadExecutor();
        pool.execute(() -> {
            String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));

            SimpleDateFormat sdf;

            String os = System.getProperty(OS_NAME_KEY).toLowerCase();

            // window system don't support ":" in file name
            if (os.contains(OS_WIN_PREFIX)) {
                sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
            } else {
                sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
            }

            String dateStr = sdf.format(new Date());
            //try-with-resources
            try (FileOutputStream jStackStream = new FileOutputStream(
                new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
                // 工具类,此处实现省略,有兴趣的可以查看。
                JVMUtil.jstack(jStackStream);
            } catch (Throwable t) {
                logger.error("dump jStack error", t);
            } finally {
                guard.release();
            }
            lastPrintTime = System.currentTimeMillis();
        });
        //must shutdown thread pool ,if not will lead to OOM
        pool.shutdown();
    }
}

上面的代码不难,都是打日志,dump 堆栈信息,其目的就是:用于在线程池被打满时,也就是记录执行AbortPolicy时现场信息,主要是便于后期的分析与问题排查。

线程池的实现

  上面讲述了Dubbo线程池中自定义的 ThreadFactory 类 以及 AbortPolicyWithReport 类。接下来,我们继续讲解 Dubbo 提供的不同线程池实现,其类图如下所示:

Dubbo 线程池源码解析

1. LimitedThreadPool 线程池

源码如下:

public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

其中:

  1. THREAD_NAME_KEY 值为:threadname ,表示为:线程名,其默认值为:Dubbo。

  2. CORE_THREADS_KEY 值为:corethreads,表示:核心线程池数量,其默认值为:0。

  3. THREADS_KEY 值为:threads 表示:最大线程数,默认值为:200。

  4. QUEUES_KEY 值为:queues 表示:阻塞队列大小,默认值为:0。

备注:

  1. 该线程池中的 cores,threads 参数由外部制定,其中 keepAliveTime 值为:Long.MAX_VALUE,TimeUnit 为 TimeUnit.MILLISECONDS (毫秒)。(意味着线程池中的所有线程永不过期,理论上大于Long.MAX_VALUE 即会过期,因为其足够大,这里可以看为是永不过期 )。

  2. 此处使用了三目运算符:

    当 queues = 0 时,BlockingQueue为SynchronousQueue。

    当 queues < 0 时,则构造一个新的LinkedBlockingQueue。

    当 queues > 0 时,构造一个指定元素的LinkedBlockingQueue。

    queues == 0 ? new SynchronousQueue<Runnable>() :
      (queues < 0 ? new LinkedBlockingQueue<Runnable>()
          : new LinkedBlockingQueue<Runnable>(queues)
    

该线程池的特点是:可以创建若干个线程,其默认值为 200,线程池中的线程生命周期非常长,甚至可以看做是永不过期。

2.  CachedThreadPool 线程池

源码:

 public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

其中:

  1. THREAD_NAME_KEY 值为:threadname , 表示为:线程名,其默认值为:Dubbo。

  2. CORE_THREADS_KEY 值为:corethreads,表示为:核心线程池数量,其默认值为:0。

  3. THREADS_KEY 值为:threads,表示:最大线程数,默认值为:Integer.MAX_VALUE。

  4. QUEUES_KEY 值为:queues,表示:阻塞队列大小,默认值为:0。

  5. ALIVE_KEY 值为:alive, 表示: keepAliveTime 表示线程池中线程的存活时间,其默认值为:60 * 1000 (毫秒) 也就是一分钟。

该线程池的特点是:可创建无限多线程(在操作系统的限制下,会远远低于Integer.MAX_VALUE值,这里视为无限大),其线程的最大存活时间默认为 1 分钟。意味着可以创建无限多线程,但是线程的生命周期默认较短!

3. FixedThreadPool 线程池

public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

其中:

  1. THREADS_KEY 值为:threads,表示:最大线程数,默认值为:200。

  2. QUEUES_KEY 值为:queues,表示:阻塞队列大小,默认值为:0。

  3. corePoolSize,maximumPoolSize 的线程数量均为:threads,(也就意味着核心线程数等于最大线程数)。

  4. keepAliveTime 的默认值为0,当线程数大于corePoolSize 时,多余的空闲线程会立即终止。

该线程池的特点是:该线程池中corePoolSize 数量 与 maxinumPoolSize 数量一致,当提交的任务大于核心线程池时,则会将其放入到LinkedBlockingQueue队列中等待执行,也是Dubbo中默认使用的线程池。

4. EagerThreadPool 线程池

源码:

public class EagerThreadPool implements ThreadPool {

        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

            // init queue and executor
            TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
            EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                    threads,
                    alive,
                    TimeUnit.MILLISECONDS,
                    taskQueue,
                    new NamedInternalThreadFactory(name, true),
                    new AbortPolicyWithReport(name, url));
            taskQueue.setExecutor(executor);
            return executor;
        }
}

备注:

  该线程池与上面的线程池实现方式有些不一样,上面是直接使用了ThreadPoolExecutor 类的构造函数。在该线程池实现中,首先构造了一个自定义的 EagerThreadPoolExecutor 线程池,其底层实现也是基于 ThreadPoolExecutor 类的,其代码如下所示:

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * task count
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * @return current tasks which are executed
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 执行任务数依次递减
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        // 提交任务书 依次递加。
        submittedTaskCount.incrementAndGet();
        try {
          // 调用父类方法执行线程任务
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // 将任务重新添加到队列中
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
               //如果添加失败,则减少任务数,并抛出异常。
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }

在这里我们发现,在 EagerThreadPoolExecutor 类中,重载了父类ThreadPoolExecutor 类的几个方法,分别如下:afterExecuteexecute方法。分别加入 submittedTaskCount 属性进行任务的统计,当父类的execute方法抛出 RejectedExecutionExcetion 异常时,则会将任务重新放入队列中执行,其TaskQueue代码如下:

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;

    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        // 当提交的任务数,小于 当前线程时,则之间调用父类的offer 方法。
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        // 当当前线程数大小小于,最大线程数时,则直接返回false,创建worker。
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // currentPoolThreadSize >= max
        return super.offer(runnable);
    }

    /**
     * retry offer task
     *
     * @param o task
     * @return offer success or not
     * @throws RejectedExecutionException if executor is terminated.
     */
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}

该线程池的特点是:可以重新将拒绝掉的task,重新添加的work queue中执行。相当于有一个重试机制!

结语

  通过上面的分析,我相信大家对Dubbo中线程池应该有所了解。如果还有不清楚的地方,可以通过debug的方式进行跟踪分析。其实在很多的开源框架中,都有自定义的线程池,但其底层最终使用的还是 ThreadPoolExecutor 线程池,这个知识点建议大家一定要掌握,无论是实际工作还是面试,都是一个常用的知识点。


相关阅读:

你所不知道的 BigDecimal

ThreadPoolExecutor 原理解析

Java线程池ThreadPoolExecutor

使用 Mybatis 真心不要偷懒!

点赞
收藏
评论区
推荐文章
秃头王路飞 秃头王路飞
2个月前
webpack5手撸vue2脚手架
webpack5手撸vue相信工作个12年的小伙伴们在面试的时候多多少少怕被问到关于webpack方面的知识,本菜鸟最近闲来无事,就尝试了手撸了下vue2的脚手架,第一次发帖实在是没有经验,望海涵。languageJavaScript"name":"vuecliversion2","version":"1.0.0","desc
技术小男生 技术小男生
2个月前
linux环境jdk环境变量配置
1:编辑系统配置文件vi/etc/profile2:按字母键i进入编辑模式,在最底部添加内容:JAVAHOME/opt/jdk1.8.0152CLASSPATH.:$JAVAHOME/lib/dt.jar:$JAVAHOME/lib/tools.jarPATH$JAVAHOME/bin:$PATH3:生效配置
光头强的博客 光头强的博客
2个月前
Java面向对象试题
1、请创建一个Animal动物类,要求有方法eat()方法,方法输出一条语句“吃东西”。创建一个接口A,接口里有一个抽象方法fly()。创建一个Bird类继承Animal类并实现接口A里的方法输出一条有语句“鸟儿飞翔”,重写eat()方法输出一条语句“鸟儿吃虫”。在Test类中向上转型创建b对象,调用eat方法。然后向下转型调用eat()方
刚刚好 刚刚好
2个月前
css问题
1、在IOS中图片不显示(给图片加了圆角或者img没有父级)<div<imgsrc""/</divdiv{width:20px;height:20px;borderradius:20px;overflow:h
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
小森森 小森森
2个月前
校园表白墙微信小程序V1.0 SayLove -基于微信云开发-一键快速搭建,开箱即用
后续会继续更新,敬请期待2.0全新版本欢迎添加左边的微信一起探讨!项目地址:(https://www.aliyun.com/activity/daily/bestoffer?userCodesskuuw5n)\2.Bug修复更新日历2.情侣脸功能大家不要使用了,现在阿里云的接口已经要收费了(土豪请随意),\\和注意
晴空闲云 晴空闲云
2个月前
css中box-sizing解放盒子实际宽高计算
我们知道传统的盒子模型,如果增加内边距padding和边框border,那么会撑大整个盒子,造成盒子的宽度不好计算,在实务中特别不方便。boxsizing可以设置盒模型的方式,可以很好的设置固定宽高的盒模型。盒子宽高计算假如我们设置如下盒子:宽度和高度均为200px,那么这会这个盒子实际的宽高就都是200px。但是当我们设置这个盒子的边框和内间距的时候,那
艾木酱 艾木酱
1个月前
快速入门|使用MemFire Cloud构建React Native应用程序
MemFireCloud是一款提供云数据库,用户可以创建云数据库,并对数据库进行管理,还可以对数据库进行备份操作。它还提供后端即服务,用户可以在1分钟内新建一个应用,使用自动生成的API和SDK,访问云数据库、对象存储、用户认证与授权等功能,可专
Stella981 Stella981
1年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
1年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
helloworld_28799839 helloworld_28799839
2个月前
常用知识整理
Javascript判断对象是否为空jsObject.keys(myObject).length0经常使用的三元运算我们经常遇到处理表格列状态字段如status的时候可以用到vue