37 开启线程池的上帝视角--线程池类结构图总览
Diego38 49 1

1. 前言

本章我们将开始学习线程池,线程池是作为并发编程最核心的知识点。相信大家平时使用线程池的地方也不少,但是为什么用线程池,线程池使用的最佳实践又是什么,这些问题需要系统的来梳理。

本节就开启线程池的学习之旅。

2. 什么是线程池

我们都使用过连接池,之所以池化对象是因为该对象占用资源开销较大,线程也是一种开销比较大的对象,当线程创建的过多,会造成系统上下文切换频繁,导致CPU利率用上升。

使用线程池可以带来如下好处:

  • 降低线程创建和销毁的开销
  • 不同类型的任务通过线程池进行相互隔离
  • 线程池内部可以对任务执行和缓冲队列数量进行监控,可观测性比较强

线程池是利用池化技术,将复用多个线程处理任务的过程,这些线程的创建和销毁交由线程池管理。

线程池使用场景非常广泛,tomcat线程池用于处理http请求,netty线程池用于处理tcp请求,dubbo线程池用于处理rpc请求。

通过多个任务处理复用线程,消除了线程创建所带来的延迟,使得应用程序响应更快,当然线程池也有一些弊端:

  • 死锁,只要是使用多线程都会引发死锁的风险,而线程池有另外一种死锁可能,即当处于线程池里的线程再次提交新任务到当前线程池后,由于线程池线程都在执行,而缓冲队列又已经满了,会造成死锁。死锁也是造成线程泄露的主要元凶。

  • 线程池需要谨慎设计线程最大最小数量、队列数量、休眠时间。一旦设置不当,都会造成严重后果,比如设置的线程池数量过小,会造成饥饿,设置过大会带来上下文切换,队列长度设置过长或无界会造成内存OOM。

    3. 线程池类结构图

    3.1 总类图

    image

  • 上述脑图中最核心的就是ThreadPoolExecutor,它实现了Executor和ExecutorService接口,ThreadPoolExecutor就是线程池的承载类,内部维护了线程列表,任务队列,丢弃策略,即线程数配置。

  • ScheduledThreadPoolExecutor是定制版的ThreadPoolExecutor,它支持以定时任务来运行提交到队列里的任务。

  • Future是任务的抽象,当任务被提交到线程池后,需要返回调用者一个Future对象,核心的方法就是get和限时等待get,方便调用者阻塞获取结果,同时也可以调用cancel方法进行取消。

  • FutureTask即是经过包装后的任务对象,实现了RunnableFuture接口,它是线程池中实际执行的任务,内部有设置结果的set方法,有设置异常的setException方法。

  • 而CompletableFuture是一个工具类,是ThreadPoolExecutor的进一步的升级,方便了任务提交、获取等操作。

  • 线程池中中每个线程执行运行的是同一个Runnable对象,那就是worker,worker实现了AQS的抽象方法,worker内部实际上是一个while(true)执行任务,它会不停的从队列或者用户提交的任务进行执行。

  • CompletionService解决了如何按照执行完毕的顺序获取结果的问题,这在某些情况下可以提高任务执行的并发,调用线程不必在长时间任务上等待过多时间。

    3.2 Executor和ExecutorService的类图

    既然ThreadPoolExecutor是线程池的主类,就有必要了解它具备哪些能力,即它实现了哪些接口,这些接口方法是如何适用的

  • Executor ···java public interface Executor {

    /**

    • Executes the given command at some time in the future. The command
    • may execute in a new thread, in a pooled thread, or in the calling
    • thread, at the discretion of the {@code Executor} implementation.
    • @param command the runnable task
    • @throws RejectedExecutionException if this task cannot be
    • accepted for execution
    • @throws NullPointerException if command is null
    • / void execute(Runnable command); }
      Executor接口仅仅有一个方法–execute,接收一个Runnable对象, 它提供了提交任务的入口。
  • ExecutorService

image

ExecutorService是是一个Executor,它可以提交任务、停止任务、追踪任务生命周期,提交任务后会返回一个Future对象,用于检测任务执行过程和结果。

submit用于提交单个任务,而invoke*用于提交批量任务,它们针对每个任务都会关联一个Future用于监控,其余的方法是用于停止任务和判断任务是否停止成功。

  • ThreadPoolExecutor

ThreadPoolExecutor实现了以上两个接口,实际上ThreadPoolExecutor底层还有一个抽象类AbstractExecutorService,AbstractExecutorService负责执行基本的任务提交工作。


   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {


 }

构造器中一共有7个参数

  • corePoolSize 线程池内线程数最小数量 初始期间,线程池内的线程数量为0,一旦任务提交进来,即会创建线程执行任务, 一直等到线程数量等于corePoolSize,才会停止创建,转而将任务放到任务队列workQueue中。

  • maximumPoolSize 线程池内线程数最大数量 当线程数达到最小线程数,并且workQueue队列满后,线程池开始继续创建线程来满足任务的执行,知道线程数达到 maximumPoolSize的数量。

  • keepAliveTime 和 unit 两个参数是配对存在的,表示达到最大线程数maximumPoolSize时,如果存在空闲线程持续时间达到以unit单位的keepAliveTime时长时,则会退出该线程,直到达到最小线程池数corePoolSize。 JDK1.6之后,ThreadPoolExecutor还提供了新的方法allowCoreThreadTimeOut,即允许核心线程(线程数小于最大线程数时)也会在空闲一段时间内退出。

  • workQueue是任务的缓冲队列 当线程池达到最小线程数后,任务将直接提交到缓冲阻塞队列中,队列可以是无界队列也可以是有界队列,当队列是无界队列时,那意味着队列永远无法达到满状态,进而也不会创建大于corePoolSize的线程数,但无界队列有内存溢出的风险。 workQueue限制必须是阻塞队列,否则线程池中的Worker会进入忙等,导致CPU繁忙。

  • ThreadFactory 线程创建工程 线程创建工程内只有一个接受Runnable对象new Thread方法,用于在在小于最小线程数和队列满的情况下触发线程的创建时使用,在创建线程时可以指定线程的名字以及是否守护线程。

  • RejectedExecutionHandler 任务拒绝策略 当队列满了,最大线程数也达到后,意味着线程池已经到了处理任务的极限,无法处理更多的任务,这时如果还有新的任务提交进来,将根据设置的拒绝策略进行拒绝,JDK中提供了4种拒绝策略

    • CallerRunsPolicy 交给提交任务的线程执行该任务
    • AbortPolicy 直接抛出RejectedExecutionException
    • DiscardPolicy 直接丢弃任务,不做任何处理
    • DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务

    使用比较多的是CallerRunsPolicy和AbortPolicy,CallerRunsPolicy是相当友好的丢弃策略,至少能保证任务可以执行,但同时具有一个缺点,会导致线程满向上传导,导致业务线程池满的风险。

看一下ThreadPoolExecutor的简单使用, 开启一个线程池,提交100个任务,每个任务输出一段随机数字

   public class ThreadPoolExecutorTest {

   public static void main(String[] args) throws InterruptedException {
       ThreadPoolExecutor sampleExecutor = new ThreadPoolExecutor(3, 5, 10, TimeUnit.SECONDS,
               new LinkedBlockingDeque<>(1000),
               new ThreadFactory() {
                   @Override
                   public Thread newThread(Runnable r) {
                       Thread thread = new Thread(r);
                       //防止重名
                       thread.setName("pool-" + Long.toHexString(ThreadLocalRandom.current().nextLong()));
                       return thread;
                   }
               }, new ThreadPoolExecutor.CallerRunsPolicy());


       for (int i = 0; i < 100; i ++) {
           sampleExecutor.submit(() ->{
               System.out.println(Thread.currentThread().getName() + " 线程执行任务" + ThreadLocalRandom.current().nextLong(10000));
               return null;
           });
       }

       sampleExecutor.awaitTermination(1, TimeUnit.DAYS);
     }
 } 

输出结果如下:

  ..
  poolb04fcf6748af9561 线程执行任务3117
  poold3c6a20745892197 线程执行任务676
  pool5063e095770b8430 线程执行任务860
  poold3c6a20745892197 线程执行任务335
  ..

以上代码演示了ThreadPoolExecutor从构造到提交任务的使用过程,我们使用的是submit方法而不是execute方法来执行任务,只有前者才会返回Future对象,尽量使用submit,后续会详细讲解两者的不同。

4. 总结

以上就是线程池的总览,实际上以ThreadPoolExecutor为入口,我们即可以了解整个线程池的全貌,其余的比如Executors工具类只是辅助。在学习本章时,前6节是重点,需要分配更多精力在前6节。

预览图
评论区

索引目录