可动态调节参数的线程池实现

孙皎
• 阅读 2883

背景

线程池是一种基于池化思想管理线程的工具,使用线程池可以减少创建销毁线程的开销,避免线程过多导致系统资源耗尽。在高并发的任务处理场景,线程池的使用是必不可少的。在双11主图价格表达项目中为了提升处理性能,很多地方使用到了线程池。随着线程池的使用,逐渐发现一个问题,线程池的参数如何设置?

线程池参数中有三个比较关键的参数,分别是corePoolSize(核心线程数)、maximumPoolSize(最大线程数)、workQueueSzie(工作队列大小)。根据任务的类型可以区分为IO密集型和CPU密集型,对于CPU密集型,一般经验是设置corePoolSize=CPU核数+1,对于IO密集型需要根据具体的RT和流量来设置,没有普适的经验值。然而,我们一般遇到的情况多数是处理IO密集型任务,如果线程池参数不可动态调节,就没办法根据实际情况实时调整处理速度,只能通过发布代码调整参数。

如果线程池参数不合理会导致什么问题呢?下面列举几种可能出现的场景:

  1. 最大线程数设置偏小,工作队列大小设置偏小,导致服务接口大量抛出RejectedExecutionException。
  2. 最大线程数设置偏小,工作队列大小设置过大,任务堆积过度,接口响应时长变长。
  3. 最大线程数设置过大,线程调度开销增大,处理速度反而下降。
  4. 核心线程数设置过小,流量突增时需要先创建线程,导致响应时长过大。
  5. 核心线程数设置过大,空闲线程太多,占用系统资源。

线程池任务调度机制

要明白线程池参数对运行时的影响,就必须理解其中的原理,所以下面先简单总结了线程池的核心原理。

Java中的线程池核心实现类是ThreadPoolExecutor,ThreadPoolExecutor一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?其运行机制如下图所示:
可动态调节参数的线程池实现

所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

其执行流程如下图所示:
可动态调节参数的线程池实现

动态调节线程池参数实现

线程池相关的重要参数有三个,分别是核心线程数、最大线程数和工作队列大小,接下来将阐述如何实现动态调节线程池参数。

调节核心和最大线程数的原理

ThreadPoolExecutor已经提供了两个方法在运行时设置核心线程数和最大线程数,分别是ThreadPoolExecutor.setCorePoolSize()ThreadPoolExecutor.setMaximumPoolSize()

setCorePoolSize方法的执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:
可动态调节参数的线程池实现

setMaximumPoolSize方法执行流程是:首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

调节工作队列大小的原理

线程池中是以生产者消费者模式,通过一个阻塞队列来缓存任务,工作线程从阻塞队列中获取任务。工作队列的接口是阻塞队列(BlockingQueue),在队列为空时,获取元素的线程会等待队列变为非空,当队列满时,存储元素的线程会等待队列可用。

目前JDK提供了以下阻塞队列的实现:
可动态调节参数的线程池实现

但是很不幸,这些阻塞队列的实现都不支持动态调整大小,那么为什么不自己实现一个可动态调整大小的阻塞队列呢。重复造轮子是不可取的,所以我选择改造轮子。LinkedBlockingQueue是比较常用的一个阻塞队列,它无法修改大小的原因是capacity字段设置成了final private final int capacity;。如果我把final去掉,并提供修改capacity的方法,是不是就满足我们的需求呢?事实证明是可行的,文章末尾上传了ResizeLinkedBlockingQueue的实现。

结合Diamond进行实现

Diamond可以管理我们的配置,如果可以通过Diamond实现线程池参数管理那就再好不过了。接下来就开始上代码了,首先实现一个Diamond配置管理类DispatchConfig,然后,实现一个线程池管理的工厂方法StreamExecutorFactory

DispatchConfig类是一个静态类,在初始化的时候获取了对应Diamond的内容并设置了监听,使用的时候只需要DispatchConfig.getConfig().getCorePoolSize()

_/**
 * @author moda
 */_
@Slf4j
@Data
public class DispatchConfig {
    public static final String DATA_ID = "com.alibaba.mkt.turbo.DispatchConfig";
    public static final String GROUP_ID = "mkt-turbo";
    private static DispatchConfig config;

    static {
        try {
            String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000);
            config = JSON.parseObject(content, DispatchConfig.class);
            Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() {
                @Override
                public void receiveConfigInfo(String content) {
                    try {
                        config = JSON.parseObject(content, DispatchConfig.class);
                    } catch (Throwable t) {
                        log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t);
                    }
                }
            });
        } catch (Exception e) {
            log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s ", DATA_ID, GROUP_ID), e);
        }
    }

    public static DispatchConfig getConfig() {
        return config;
    }

    private int corePoolSize = 10;

    private int maximumPoolSize = 30;

    private int workQueueSize = 1024;

    _/**
     * 商品分批处理每批大小
     */_
    private int itemBatchProcessPageSize = 200;
}

StreamExecutorFactory是一个静态类,维护了一个静态属性executor,并通过initExecutor()进行初始化。在初始化的时候,工作队列使用了可调节大小的阻塞队列ResizeLinkedBlockingQueue,并设置了监听Diamond变更。Diamond发生变更的时候通过在callback中对比值是否发生改变,如果发生改变则调整workQueueSize、corePoolSize、maximumPoolSize。使用的时候只需要StreamExecutorFactory.getExecutor(),修改Diamond配置就能动态修改线程池参数。

_/**
 * @author moda
 */_
@Slf4j
public class StreamExecutorFactory {
    private static final String THREAD_NAME = "mkt-turbo_stream_dispatch";

    private static ThreadPoolExecutor executor = initExecutor();

    private static ThreadPoolExecutor initExecutor() {
        ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build();
        ResizeLinkedBlockingQueue<Runnable> workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize());
        _//拒绝策略,调用者线程处理_
        RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> {
            String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" +
                    " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                    " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)",
                THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
            log.warn(msg);
            if (!e.isShutdown()) {
                r.run();
            }
        };
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            DispatchConfig.getConfig().getCorePoolSize(),
            DispatchConfig.getConfig().getMaximumPoolSize(),
            10,
            TimeUnit.SECONDS,
            workQueue,
            nameThreadFactory,
            rejectedExecutionHandler
        );

        Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() {
            @Override
            public void receiveConfigInfo(String content) {
                try {
                    DispatchConfig config = JSON.parseObject(content, DispatchConfig.class);
                    if (workQueue.getCapacity() != config.getWorkQueueSize()) {
                        workQueue.setCapacity(config.getWorkQueueSize());
                    }
                    if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) {
                        threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());
                    }
                    if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) {
                        threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize());
                    }
                } catch (Throwable t) {
                    log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t);
                }
            }
        });

        return threadPoolExecutor;
    }

    public static Executor getExecutor() {
        return executor;
    }
}

原文链接
本文为阿里云原创内容,未经允许不得转载。

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java 面试知识点笔记(十三)多线程与并发
java线程池,利用Exceutors创建不同的线程池满足不同场景需求:1.newSingleThreadExecutor() 创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。2.
Wesley13 Wesley13
3年前
java_线程池
血一样的教训,今天上午参加了一家现场面试java。在这之前,我一直认为我的java基础还是可以的,而今天一问三不知。现在将面试的问题整理出来一、说说java中的线程池?  1.线程池:线程池是线程的集合,不用自己创建线程,把线程直接给线程池,由线程池处理。   2.过程:首先,使用线程池可以重复利用已有的线程继续执行任务,避免线程在
Wesley13 Wesley13
3年前
java各种面试问题
二、Java多线程相关线程池的原理,为什么要创建线程池?创建线程池的方式;线程的生命周期,什么时候会出现僵死进程;说说线程安全问题,什么实现线程安全,如何实现线程安全;创建线程池有哪几个核心参数?如何合理配置线程池的大小?volatile、ThreadLocal的使用场景和原理;
利用DUCC配置平台实现一个动态化线程池
在后台开发中,会经常用到线程池技术,但线程池核心参数的配置很大程度上依靠经验,所以我们很难一劳永逸地规划一个合理的线程池参数。本文以公司DUCC配置平台作为中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。
zdd小小菜鸟 zdd小小菜鸟
2年前
多线程面试
多线程篇1.为什么要使用线程池tex避免频繁地创建和销毁线程,达到线程对象的重用。另外,使用线程池还可以根据项目灵活地控制并发的数目。2.java中如何获取到线程dump文件tex死循环、死锁、阻
Wesley13 Wesley13
3年前
Java多线程之线程池7大参数、底层工作原理、拒绝策略详解
Java多线程之线程池7大参数详解目录企业面试题线程池7大参数源码线程池7大参数详解底层工作原理详解线程池的4种拒绝策略理论简介面试的坑:线程池实际中使用哪一个?1\.企业面试题线程池的工作原理,几个重要参数,然后给了具体几个参数分析线程池会怎么做,最后问阻塞队列用是什么?线程池的构造类的方
Wesley13 Wesley13
3年前
Java通过Executors提供四种线程池
Java通过Executors提供四种线程池,分别为:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newScheduledThreadPool创建
Wesley13 Wesley13
3年前
Java 线程池原理分析
1.简介线程池可以简单看做是一组线程的集合,通过使用线程池,我们可以方便的复用线程,避免了频繁创建和销毁线程所带来的开销。在应用上,线程池可应用在后端相关服务中。比如Web服务器,数据库服务器等。以Web服务器为例,假如Web服务器会收到大量短时的HTTP请求,如果此时我们简单的为每个HTTP请求创建一个处理线程,那么服务器
Wesley13 Wesley13
3年前
Java 基础知识(七)
1.创建线程池1)newCacheThreadPool 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程 2)newFixedThreadPool  创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待 3)newScheduledThreadPool  创建一个定长线程池,支持
Wesley13 Wesley13
3年前
Java中的线程池
java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理使用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺
动态线程池思想学习及实践
相关文档美团线程池实践:线程池思想解析:引言在后台项目开发过程中,我们常常借助线程池来实现多线程任务,以此提升系统的吞吐率和响应性;而线程池的参数配置却是一个难以合理评估的值,虽然业界也针对cpu密集型,IO密集型等场景给出了一些参数配置的经验与方案,但是