40 动手设计并实现一个线程池上
Diego38 70 1

1. 前言

上两节我们学习了线程池ThreadPoolExecutor原理,线程池是Java并发编程中最核心的知识点了,即使普通的开发也需要掌握ThreadPoolExecutor的原理,不仅针对面试,如果对ThreadPoolExecutor的掌握的深度不够,在工作中遇到性能问题,都很难去解决。

就是因为Java应用对线程池ThreadPoolExecutor依赖度比较高,在一些特殊场景中,ThreadPoolExecutor并不能很好的满足,所以一些Java容器和网络框架往往实现了自定义的线程池,比如Jetty、Tomcat、Netty都各自实现了定制版的ThreadPoolExecutor。

接下来两节作为对ThreadPoolExecutor知识点的巩固,我们也选取两种场景,动手来实现一个线程池。

2. 实现压力感知扩容线程池

2.1 原生线程池的缺点

回忆前两节学习到的内容,ThreadPoolExecutor在构建成功后,核心线程数和最大线程数在启动成功后是很难被修改的,数量的大小和具体的业务量有关,构造线程池时也很难度量,好在ThreadPoolExecutor是支持扩容和缩容的,不至于在线程数设置过小造成处理不过来,以及线程数设置过大造成资源浪费。

一个对外提供服务的系统,线程池大小取决于外部流量对系统造成的压力,像Tomcat、Jetty这些容器都定制了线程池,解决原生线程池的弊端。

原生线程池线程扩容流程

提交任务—>小于核心线程数新起线程–>否则任务进入队列–>队列满了小于最大线程继续新起线程–>队列满并且达到最大线程数则拒绝。

从流程上来看有这样的问题:假设队列容量过长,而核心线程数又设置过小,这样需要等待队列满时才会触发再新建线程来处理任务,这样很容易造成任务长时间得不到处理产生饥饿,对应到用户请求,反映到前端就会造成请求超时。

2.2 压力感知线程池设计

解决原生线程池扩容延迟问题,可以从队列操作入手,我们将提交流程修改如下:

  • 提交任务
  • 小于核心线程数新起线程
  • 在进入队列前,判断活跃线程数达到一定阈值,比如等于当前核心线程数时则新起线程; 否则进入队列
  • 队列满了小于最大线程继续新起线程
  • 队列满并且达到最大线程数则拒绝

我们修改了第三步,不用等到队列满就开启创建最大线程数的线程,而是发现当前没有空闲线程就开始创建,对压力感知敏感,将扩容提前,这样做,更能适应移动互联网脉冲流量。

回顾下execute方法:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //队列满才会继续创建线程
        else if (!addWorker(command, false))
            reject(command);
    }

要完成流程的变更,需要改动三点

  1. ThreadPoolExecutor的execute方法内部有很多私有方法比如addWorker是无法被继承的,因此我们需要实现一个队列来精准影响workQueue.offer(command)方法
  2. 记录活跃线程数,覆盖beforeExecute和afterExecute,getActiveCount实际也能得到活跃线程数,但是需要加锁,通过beforeExecute和afterExecute操作计数器更加高效一些
  3. 活跃线程数等于当前线程数并且小于最大线程数时,就启动新线程,达不到缓冲效果,我们可以定一个公式,作为评判当前压力的基准:最大线程数-活跃线程数-队列里任务数量 < 1;当满足此条件时,即开启新线程

    2.3 压力感知线程池实现

    我们要实现一个队列,这个队列需要绑定ThreadPoolExecutor,命名为ExecutorInnerQueue。

自定义队列ExecutorInnerQueue:

public class ExecutorInnerQueue extends LinkedBlockingQueue<Runnable> {

    private FlowAwareThreadPoolExecutor executor;

    public ExecutorInnerQueue(int capacity, FlowAwareThreadPoolExecutor executor) {
        super(capacity);
        this.executor = executor;
    }


    @Override
    public  boolean offer(Runnable o) {
        int maxPoolSize = executor.getMaximumPoolSize();
        int queueSize = size();
        int curThreadSize = executor.getPoolSize();
        int leftSize = maxPoolSize - executor.getActiveCountSnap() - queueSize;
        //经过公式计算出,处理能力小于1,并且还未达到最大线程数,需要提前增加线程
        if (leftSize < 1 && curThreadSize < maxPoolSize) {
            return false;
        } else {
            return super.offer(o);
        }
    }

}

ExecutorInnerQueue继承自LinkedBlockingQueue,重写了offer方法,在压力过大时和executor进行联动,提前增加线程数,进行处理。

Executor也需要对应的定制,代码如下:

public class FlowAwareThreadPoolExecutor extends ThreadPoolExecutor {

    private AtomicInteger activeCounter = new AtomicInteger();

    public FlowAwareThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, Integer capacity) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ExecutorInnerQueue(capacity));
        ExecutorInnerQueue queue = (ExecutorInnerQueue)getQueue();
        queue.setExecutor(this);
    }

    protected void beforeExecute(Thread t, Runnable r) {
        activeCounter.incrementAndGet();
    }

    //赋予了新的方法getActiveCountSnap,它可以更加轻量的获取当前活跃的线程数,相比原生加锁的方式性能更优。
    public Integer getActiveCountSnap() {
        return activeCounter.get();
    }

    protected void afterExecute(Runnable r, Throwable t) {
        activeCounter.decrementAndGet();
    }

    public static class ExecutorInnerQueue extends LinkedBlockingQueue<Runnable> {

        private FlowAwareThreadPoolExecutor executor;

        public ExecutorInnerQueue(int capacity) {
            super(capacity);
        }

        public void setExecutor( FlowAwareThreadPoolExecutor executor) {
            this.executor = executor;
        }

        @Override
        public  boolean offer(Runnable o) {
            int maxPoolSize = executor.getMaximumPoolSize();
            int queueSize = size();
            int curThreadSize = executor.getPoolSize();
            int leftSize = maxPoolSize - executor.getActiveCountSnap() - queueSize;
            //经过公式计算出,处理能力小于1,并且还未达到最大线程数,需要提前增加线程
            if (leftSize < 1 && curThreadSize < maxPoolSize) {
                return false;
            } else {
                return super.offer(o);
            }
        }

    }



    public static void main(String[] args) {
        FlowAwareThreadPoolExecutor executor = new FlowAwareThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, 20000);
//        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS , new LinkedBlockingQueue<>(20000));
        long begin = System.currentTimeMillis();
        for (int i =0 ; i < 10000 ; i++) {
            AtomicInteger finalI = new AtomicInteger(i);
            executor.submit(() -> {
                try {
                    Thread.sleep(10);
                    System.out.println("任务" + finalI + "执行完毕");
                    if (Objects.equals(finalI.get(), 9999)) {
                        System.out.println("所有任务执行完毕耗时:" + (System.currentTimeMillis() - begin) + "毫秒");
                    }
                } catch (InterruptedException e) { }
            });
            System.out.println("当前活跃线程数" + executor.getActiveCountSnap() + " 当前总的线程数" + executor.getPoolSize());
        }


    }
}

输出如下:

当前活跃线程数9 当前总的线程数10
当前活跃线程数10 当前总的线程数10
当前活跃线程数10 当前总的线程数10
当前活跃线程数10 当前总的线程数10
当前活跃线程数10 当前总的线程数10
...
当前活跃线程数14 当前总的线程数15
当前活跃线程数15 当前总的线程数16
当前活跃线程数16 当前总的线程数17
当前活跃线程数17 当前总的线程数18
当前活跃线程数18 当前总的线程数19
当前活跃线程数19 当前总的线程数20
当前活跃线程数19 当前总的线程数20
...
任务9999执行完毕
所有任务执行完毕耗时:5802毫秒

压力感知线程池FlowAwareThreadPoolExecutor,在队列未满并且并发压力迅速增加的情况下,快速创建线程,来处理新任务,将资源实现最优分配。在MAC单机运行,最终处理时间是5802毫秒。将main函数的原生线程池注释解开,对比原生线程池,由于队列设置了20000,压力即使增加,也未创建更多线程,线程数持续保持在10个,导致最终处理性能不佳,耗时如下:

任务9998执行完毕
任务9995执行完毕
任务9996执行完毕
任务9999执行完毕
所有任务执行完毕耗时:11613毫秒

3. 总结

压力感知线程池有效避免了瞬间流量无法快速响应的问题,通过定制队列和线程池,改变了线程池扩容流程,是一次对ThreadPoolExecutor的深度探险,下节我们继续对ThreadPoolExecutor进行定制化。

预览图
评论区

索引目录