动态调整线程池参数

武松
• 阅读 1374

目标

  1. 实现动态调整线程池参数
  2. 对线程池运行情况进行监控

实现

一,线程池可调整的参数

  1. 核心线程数
  2. 超时时间
  3. 最大线程数
  4. 拒绝策略

动态调整线程池参数
而队列BlockingQueue因为是final类型,所以没有对外修改入口。但可以通过重写LinkedBlockingQueue并把capacity设置为非final。

二,结合配置中心实现动态调整

这里的配置中心使用Apollo, 通过监听配置中心变化,然后更新线程池配置。示例代码如下:

@Slf4j
@Component
public class DynamicThreadPoolConfig {
    /** 线程执行器 **/
    private volatile ThreadPoolExecutor executor;

    /** 核心线程数 **/
    private Integer corePoolSize = 10;

    /** 最大值线程数 **/
    private Integer maximumPoolSize = 20;

    /** 待执行任务的队列的长度 **/
    private Integer workQueueSize = 1000;

    /** 线程空闲时间 **/
    private Long keepAliveTime = 1000L;

    /** 线程名 **/
    private String threadName;

    private Config config = ConfigService.getConfig("项目配置中心namespace");

    public DynamicThreadPoolConfig() {
        init(config);
    }

    /** * 初始化 */
    private void init(Config config) {
        log.info("线程池初始化中..........");
        if (executor == null) {
            synchronized (DynamicThreadPoolConfig.class) {
                if (executor == null) {
                    String corePoolSizeProperty = config.getProperty("corePoolSize", corePoolSize.toString());
                    log.info("修改前的核心线程池:{}",corePoolSizeProperty);
                    String maximumPoolSizeProperty = config.getProperty("maximumPoolSize", maximumPoolSize.toString());
                    String keepAliveTImeProperty = config.getProperty("keepAliveTime", keepAliveTime.toString());
                    BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize);
                    executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty),
                            Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty);
                }
            }
        }
    }

    /**
     * 监听到配置中心发生变化后,更新线程池配置
     * @param changeEvent
     */
    @ApolloConfigChangeListener
    public void onChange(ConfigChangeEvent changeEvent){
        log.info("线程池参数配置发生变化,namespace:{}",changeEvent.getNamespace());
            for(String key : changeEvent.changedKeys()){
                ConfigChange change = changeEvent.getChange(key);
                String newValue = change.getNewValue();
                refreshThreadPool(key,newValue);
            }
    }

    /**
     * 更新线程池配置
     * @param key
     * @param newValue
     */
    private void refreshThreadPool(String key, String newValue) {
        if (executor == null) {
            return;
        }
        if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) {
            executor.setCorePoolSize(Integer.valueOf(newValue));
            log.info("修改核心线程数key={},value={}",key,newValue);
        }
        if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) {
            executor.setMaximumPoolSize(Integer.valueOf(newValue));
            log.info("修改最大线程数key={},value={}", key, newValue);
        }
        if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) {
            executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);
            log.info("修改线程空闲时间key={},value={}", key, newValue);
        }
    }

    public ThreadPoolExecutor getExecutor() {
        return executor;
    }
}

@AllArgsConstructor
public enum ParamsEnum {

    CORE_POOL_SIZE("apollo.async.executor.thread.core_pool_size", "核心线程数"),
    MAXIMUM_POOL_SIZE("dynamic.maximumPoolSize", "最大线程数"),
    KEEP_ALIVE_TIME("dynamic.keepAliveTime", "线程空闲时间"),
    ;

    @Getter
    private String param;

    @Getter
    private String desc;

}

三,监控方式

修改线程池有关参数重要,但知道何时修改同样重要,可以考虑间隔一段时间进行采集,通过日志输出,达到临界点后告警。
同样,ThreadPoolExecutor也提供获取线程池相关信息的API:
动态调整线程池参数

这里通过一个定时任务进行统计,需要注意的是启动类上需要加上EnableScheduling注解

@Slf4j
@Component
@Async
@ConditionalOnBean(DynamicThreadExecutor.class)
public class ThreadPoolMonitorSchedule {

    @Autowired
    private DynamicThreadExecutor dynamicThreadExecutor;

    @Scheduled(fixedDelay = 2000)
    public void watchThreadPoolInfo(){
        log.info("开始统计线程池相关数据");
        ThreadPoolExecutor threadPoolExecutor = dynamicThreadExecutor.getExecutor();

        BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
        //线程活跃度:活跃线程数趋向于maximumPoolSize的时候,代表线程负载趋高。
        log.info("核心线程数:{},活动线程数:{},最大线程数:{},线程池活跃度:{},任务完成数:{}," +
                 "队列大小:{},当前排队线程数:{},队列剩余大小:{},队列使用度:{}",
                threadPoolExecutor.getCorePoolSize(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getMaximumPoolSize(),
                divide(threadPoolExecutor.getActiveCount(), threadPoolExecutor.getMaximumPoolSize()),
                threadPoolExecutor.getCompletedTaskCount(),
                (queue.size() + queue.remainingCapacity()),
                queue.size(),
                queue.remainingCapacity(),
                divide(queue.size(), queue.size() + queue.remainingCapacity()));
    }


    private String divide(int num1,int num2){
        return String.format("%1.2f%%",Double.parseDouble(num1+"") / Double.parseDouble(num2+""));
    }
}

/**
 *启动类
 */
@MapperScan({"com.demo.dao"})
@SpringBootApplication
@EnableScheduling
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java各种面试问题
二、Java多线程相关线程池的原理,为什么要创建线程池?创建线程池的方式;线程的生命周期,什么时候会出现僵死进程;说说线程安全问题,什么实现线程安全,如何实现线程安全;创建线程池有哪几个核心参数?如何合理配置线程池的大小?volatile、ThreadLocal的使用场景和原理;
利用DUCC配置平台实现一个动态化线程池
在后台开发中,会经常用到线程池技术,但线程池核心参数的配置很大程度上依靠经验,所以我们很难一劳永逸地规划一个合理的线程池参数。本文以公司DUCC配置平台作为中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。
Wesley13 Wesley13
3年前
JAVA多线程学习
Java通过Excutors提供四种线程池:newCachedThreadPool        创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool        创建一个定长线程,可控制线程最大并发数
Wesley13 Wesley13
3年前
Java多线程之线程池7大参数、底层工作原理、拒绝策略详解
Java多线程之线程池7大参数详解目录企业面试题线程池7大参数源码线程池7大参数详解底层工作原理详解线程池的4种拒绝策略理论简介面试的坑:线程池实际中使用哪一个?1\.企业面试题线程池的工作原理,几个重要参数,然后给了具体几个参数分析线程池会怎么做,最后问阻塞队列用是什么?线程池的构造类的方
Wesley13 Wesley13
3年前
Java通过Executors提供四种线程池
Java通过Executors提供四种线程池,分别为:newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newScheduledThreadPool创建
Wesley13 Wesley13
3年前
Java多线程之线程池的手写改造和拒绝策略
目录自定义线程池的使用四种拒绝策略代码体现1\.自定义线程池的使用自定义线程池(拒绝策略默认AbortPolicy)publicclassMyThreadPoolDemo{  publicstaticvoidmain(Stringargs){    ExecutorSe
Stella981 Stella981
3年前
Noark入门之线程模型
0x00单线程多进程单线程与单进程多线程的目的都是想尽可能的利用CPU,减少CPU的空闲时间,特别是多核环境,今天咱不做深度解读,跳过...0x01线程池锁最早的一部分游戏服务器是采用线程池的方式来处理玩家的业务请求,以达最大限度的利用多核优势来提高处理业务能力。但线程池同时也带来了并发问题,为了解决同一玩家多个业务请求不被
Wesley13 Wesley13
3年前
(CSDN 迁移) JAVA多线程实现
前几篇文章中分别介绍了单线程化线程池(newSingleThreadExecutor)可控最大并发数线程池(newFixedThreadPool)可回收缓存线程池(newCachedThreadPool)newScheduledThreadPool用于构造安排线程池,能够根据需要安排在给定延迟后运行命令或者定期地执行。在JAVA文档的介绍
Wesley13 Wesley13
3年前
Java 基础知识(七)
1.创建线程池1)newCacheThreadPool 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程 2)newFixedThreadPool  创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待 3)newScheduledThreadPool  创建一个定长线程池,支持
Easter79 Easter79
3年前
ThreadPoolExecutor和ScheduledThreadPoolExecutor
ThreadPoolExecutor构造方法参数说明corePoolSize核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true。maximumPoolSize线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有
Wesley13 Wesley13
3年前
Java多线程之线程池
 newFixedThreadPool:固定线程池,核心线程数和最大线程数固定相等,而空闲存活时间为0毫秒,说明此参数也无意义,工作队列为最大为Integer.MAX\_VALUE大小的阻塞队列。当执行任务时,如果线程都很忙,就会丢到工作队列等有空闲线程时再执行,队列满就执行默认的拒绝策略 newCachedThreadPool:带缓冲
武松
武松
Lv1
如果哪天可不可以把我给你的温柔还给我
文章
5
粉丝
0
获赞
0