Java的Executor框架和线程池实现原理, callable, Future, Runnable

Wesley13
• 阅读 482

Java的Executor框架和线程池实现原理, callable, Future, Runnable

https://blog.csdn.net/tuke\_tuke/article/details/51353925

一,Java的Executor框架

Java的Executor框架和线程池实现原理, callable, Future, Runnable

1,Executor接口

  1. public interface Executor {

  2. void execute(Runnable command);

  3. }

Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法,它没有实现类只有另一个重要的子接口ExecutorService

2,ExecutorService接口

  1. //继承自Executor接口

  2. public interface ExecutorService extends Executor {

  3. /**

  4. * 关闭方法,调用后执行之前提交的任务,不再接受新的任务

  5. */

  6. void shutdown();

  7. /**

  8. * 从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表

  9. */

  10. List shutdownNow();

  11. /**

  12. * 判断执行器是否已经关闭

  13. */

  14. boolean isShutdown();

  15. /**

  16. * 关闭后所有任务是否都已完成

  17. */

  18. boolean isTerminated();

  19. /**

  20. * 中断

  21. */

  22. boolean awaitTermination(long timeout, TimeUnit unit)

  23. throws InterruptedException;

  24. /**

  25. * 提交一个Callable任务

  26. */

  27. Future submit(Callable task);

  28. /**

  29. * 提交一个Runable任务,result要返回的结果

  30. */

  31. Future submit(Runnable task, T result);

  32. /**

  33. * 提交一个任务

  34. */

  35. Future<?> submit(Runnable task);

  36. /**

  37. * 执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表

  38. */

  39. List<Future> invokeAll(Collection<? extends Callable> tasks)

  40. throws InterruptedException;

  41. /**

  42. * 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。

  43. */

  44. List<Future> invokeAll(Collection<? extends Callable> tasks,

  45. long timeout, TimeUnit unit)

  46. throws InterruptedException;

  47. /**

  48. * 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。

  49. */

  50. T invokeAny(Collection<? extends Callable> tasks)

  51. throws InterruptedException, ExecutionException;

  52. /**

  53. * 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。

  54. */

  55. T invokeAny(Collection<? extends Callable> tasks,

  56. long timeout, TimeUnit unit)

  57. throws InterruptedException, ExecutionException, TimeoutException;

  58. }

ExecutorService接口继承自Executor接口,定义了终止、提交,执行任务、跟踪任务返回结果等方法

1,execute(Runnable command):履行Ruannable类型的任务,

2,submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
3,shutdown():在完成已提交的任务后封闭办事,不再接管新任务,

4,shutdownNow():停止所有正在履行的任务并封闭办事。
5,isTerminated():测试是否所有任务都履行完毕了。,

6,isShutdown():测试是否该ExecutorService已被关闭

3,Executors的静态方法:负责生成各种类型的ExecutorService线程池实例

+newFixedThreadPool(numberOfThreads:int):(固定线程池)ExecutorService 创建一个固定线程数量的线程池,并行执行的线程数量不变,线程当前任务完成后,可以被重用执行另一个任务
+newCachedThreadPool():(可缓存线程池)ExecutorService 创建一个线程池,按需创建新线程,就是有任务时才创建,空闲线程保存60s,当前面创建的线程可用时,则重用它们

+new SingleThreadExecutor();(单线程执行器)线程池中只有一个线程,依次执行任务

+new ScheduledThreadPool():线程池按时间计划来执行任务,允许用户设定执行任务的时间

+new SingleThreadScheduledExcutor();线程池中只有一个线程,它按规定时间来执行任务

4,Runnable、Callable、Future接口

Runnable接口:

  1. // 实现Runnable接口的类将被Thread执行,表示一个基本的任务

  2. public interface Runnable {

  3. // run方法就是它所有的内容,就是实际执行的任务

  4. public abstract void run();

  5. }

Callable接口:与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容

  1. // Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容

  2. public interface Callable {

  3. // 相对于run方法的带有返回值的call方法

  4. V call() throws Exception;

  5. }

Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor和ScheduledThreadPoolExecutor执行,他们之间的区别是Runnable不会返回结果,而Callable可以返回结果。

Executors可以把一个Runnable对象转换成Callable对象:

public static Callable<Object> callable(Runnbale task);

Executors把一个Runnable和一个待返回的结果包装成一个Callable的API:

public static<T> Callable<T> callable(Runnbale task,T result);

当把一个Callable对象(Callable1,Callable2)提交给ThreadPoolExecutor和ScheduledThreadPoolExecutor执行时,submit(...)会向我们返回一个FutureTask对象。我们执行FutureTask.get()来等待任务执行完成,当任务完成后,FutureTask.get()将返回任务的结果。

Future接口:

  1. // Future代表异步任务的执行结果

  2. public interface Future {

  3. /**

  4. * 尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。

  5. */

  6. boolean cancel(boolean mayInterruptIfRunning);

  7. /**

  8. * 返回代表的任务是否在完成之前被取消了

  9. */

  10. boolean isCancelled();

  11. /**

  12. * 如果任务已经完成,返回true

  13. */

  14. boolean isDone();

  15. /**

  16. * 获取异步任务的执行结果(如果任务没执行完将等待)

  17. */

  18. V get() throws InterruptedException, ExecutionException;

  19. /**

  20. * 获取异步任务的执行结果(有最常等待时间的限制)

  21. *

  22. * timeout表示等待的时间,unit是它时间单位

  23. */

  24. V get(long timeout, TimeUnit unit)

  25. throws InterruptedException, ExecutionException, TimeoutException;

  26. }

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果

在Future接口中声明了5个方法,下面依次解释每个方法的作用:
+cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
+isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
+isDone方法表示任务是否已经完成,若任务完成,则返回true;
+get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
+get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
也就是说Future提供了三种功能:
  1)判断任务是否完成;
  2)能够中断任务;
  3)能够获取任务执行结果。

FutureTask:

通常使用FutureTask来处理我们的任务。FutureTask类同时又实现了Runnable接口,所以可以直接提交给Executor执行。

  1. FutureTask提供了 2个构造器:

  2. public FutureTask(Callable callable) {

  3. }

  4. public FutureTask(Runnable runnable, V result) {

  5. }

  6.    //事实上,FutureTask是Future接口的一个唯一实现类。

使用FutureTask实现超时执行的代码如下:

  1. xecutorService executor = Executors.newSingleThreadExecutor();

  2. FutureTask future =

  3. new FutureTask(new Callable() {//使用Callable接口作为构造参数

  4. public String call() {

  5. //真正的任务在这里执行,这里的返回值类型为String,可以为任意类型

  6. }});

  7. executor.execute(future);

  8. //在这里可以做别的任何事情

  9. try {

  10. result = future.get( 5000, TimeUnit.MILLISECONDS); //取得结果,同时设置超时执行时间为5秒。同样可以用future.get(),不设置执行超时时间取得结果

  11. } catch (InterruptedException e) {

  12. futureTask.cancel( true);

  13. } catch (ExecutionException e) {

  14. futureTask.cancel( true);

  15. } catch (TimeoutException e) {

  16. futureTask.cancel( true);

  17. } finally {

  18. executor.shutdown();

  19. }

不直接构造Future对象,也可以使用ExecutorService.submit方法来获得Future对象,submit方法即支持以 Callable接口类型,也支持Runnable接口作为参数,具有很大的灵活性。使用示例如下:

  1. ExecutorService executor = Executors.newSingleThreadExecutor();

  2. FutureTask future = executor.submit(

  3. new Callable() {//使用Callable接口作为构造参数

  4. public String call() {

  5. //真正的任务在这里执行,这里的返回值类型为String,可以为任意类型

  6. }});

  7. //在这里可以做别的任何事情

  8. //同上面取得结果的代码

线程池实现原理详解:

ThreadPoolExecutor是线程池的实现类:

  1. public ThreadPoolExecutor(int corePoolSize,

  2. int maximumPoolSize,

  3. long keepAliveTime,

  4. TimeUnit unit,

  5. BlockingQueue workQueue,

  6. ThreadFactory threadFactory,

  7. RejectedExecutionHandler handler)

(1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程会创建一个线程来执行任务,

即使有其他空闲的基本线程还是会去创建线程,等到需要执行的任务数大于线程池基本大小corePoolSize时就不会再创建。

(2)maximumPoolSize(线程池最大大小):线程池允许最大线程数。如果阻塞队列满了,并且已经创建的线程数小于最大线程数,则线程池会再创建新的线程执行。因为线程池执行任务时是线程池基本大小满了,后续任务进入阻塞队列,阻塞队列满了,在创建线程。

(3)keepAliveTime(线程活动保持时间):空闲线程的保持存活时间。
(4)TimeUnit(线程活动保持时间的单位):

        TimeUnit.DAYS; //天
        TimeUnit.HOURS; //小时
        TimeUnit.MINUTES; //分钟
        TimeUnit.SECONDS; //秒
        TimeUnit.MILLISECONDS; //毫秒
        TimeUnit.MICROSECONDS; //微妙
        TimeUnit.NANOSECONDS; //纳秒

(5)workQueue(任务队列):用于保存等待执行的任务的阻塞队列。一个阻塞队列,用来存储等待执行的任务:数组,链表,不存元素的阻塞队列

      5.1)ArrayBlockingQueue;数组结构的有界阻塞队列,先进先出FIFO
      5.2)LinkedBlockingQueue;链表结构的无界阻塞队列。先进先出FIFO排序元素,静态方法Executors.newFixedThreadPool使用这个方法

      5.3)SynchronousQueue;不存储元素的阻塞队列,就是每次插入操作必须等到另一个线程调用移除操作,静态方法Executors.newCachedThreadPool使用这个方法

 (6)threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字

 (7)handler(饱和策略):表示当拒绝处理任务时的策略。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

     ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

     ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

     ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

我们尽量优先使用Executors提供的静态方法来创建线程池,如果Executors提供的方法无法满足要求,再自己通过ThreadPoolExecutor类来创建线程池   

  1. Executors.newFixedThreadPool( int); //创建固定容量大小的缓冲池

  2. Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE

  3. Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池

下面是这三个静态方法的具体实现;

  1. public static ExecutorService newFixedThreadPool(int nThreads) {

  2. return new ThreadPoolExecutor(nThreads, nThreads,

  3. 0L, TimeUnit.MILLISECONDS,

  4. new LinkedBlockingQueue());

  5. }

  6. public static ExecutorService newSingleThreadExecutor() {

  7. return new FinalizableDelegatedExecutorService

  8. ( new ThreadPoolExecutor(1, 1,

  9. 0L, TimeUnit.MILLISECONDS,

  10. new LinkedBlockingQueue()));

  11. }

  12. public static ExecutorService newCachedThreadPool() {

  13. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

  14. 60L, TimeUnit.SECONDS,

  15. new SynchronousQueue());

  16. }

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的(n,n),它使用的LinkedBlockingQueue;

newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1(1,1),也使用的LinkedBlockingQueue;

newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

1)newFixedThreadPool:(固定线程池)

  1. public static ExecutorService newFixedThreadPool(int nThreads) {

  2. return new ThreadPoolExecutor(nThreads, nThreads,

  3. 0L, TimeUnit.MILLISECONDS,

  4. new LinkedBlockingQueue());

  5. }

线程池corePoolSize和maximumPoolSize值是相等的(n,n),把keepAliveTime设置0L,意味着多余的空闲线程会被立即终止。

newFixedThreadPool的execute方法执行过程:

1,如果当前运行线程数少于corePoolSize,则创建新线程来执行任务(优先满足核心池)

2,当前运行线程数等于corePoolSize时,将任务加入LinkedBlockingQueue链式阻塞队列(核心池满了在进入队列)

3,当线程池的任务完成之后,循环反复从LinkedBlockingQueue队列中获取任务来执行

2)newSingleThreadExecutor:(单线程执行器)

newSingleThreadExecutor是使用单个worker线程的Executors.

  1. public static ExecutorService newSingleThreadExecutor() {

  2. return new FinalizableDelegatedExecutorService

  3. ( new ThreadPoolExecutor(1, 1,

  4. 0L, TimeUnit.MILLISECONDS,

  5. new LinkedBlockingQueue()));

  6. }

newSingleThreadExecutor的execute方法执行过程如下:

  1,当前运行的线程数少于corePoolSize(即当前线程池中午运行的线程),则创建一个新的线程来执行任务

  2,当线程池中有一个运行的线程时,将任务加入阻塞队列

  3,当线程完成任务时,会无限反复从链式阻塞队列中获取任务来执行

3,)newCachedThreadPool:可缓存线程池

  1. public static ExecutorService newCachedThreadPool() {

  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

  3. 60L, TimeUnit.SECONDS,

  4. new SynchronousQueue());

  5. }

newCachedThreadPool是一个根据需要创建线程的线程池。

newCachedThreadPool的corePoolSize设置0,即核心池是空,maxmumPoolSize设置为Integer.MAX_VALUE,即maxmumPool是无界的。keepAliveTime设置60L,当空闲线程等待新任务最长时间是60s,超过60s就终止

三个线程池的特点:

1、newFixedThreadPool创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数corePoolSize,则将提交的任务存入到池队列中。

2、newCachedThreadPool创建一个可缓存的线程池。这种类型的线程池特点是:
1).工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
2).如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。

3、newSingleThreadExecutor创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它,保证顺序执行(我觉得这点是它的特色)。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的 

线程池的处理流程:

线程池执行示意图:

Java的Executor框架和线程池实现原理, callable, Future, Runnable

1,首先线程池判断基本线程池是否已满(< corePoolSize ?)?没满,创建一个工作线程来执行任务。满了,则进入下个流程。

2,其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。

3,最后线程池判断整个线程池是否已满(< maximumPoolSize ?)?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

总结:线程池优先要创建出基本线程池大小(corePoolSize)的线程数量,没有达到这个数量时,每次提交新任务都会直接创建一个新线程,当达到了基本线程数量后,又有新任务到达,优先放入等待队列,如果队列满了,才去创建新的线程(不能超过线程池的最大数maxmumPoolSize)

向线程池提交任务的两种方式:

1)通过execute()方法

  1. ExecutorService threadpool= Executors.newFixedThreadPool( 10);

  2. threadpool.execute( new Runnable(){...});

这种方式提交没有返回值,也就不能判断任务是否被线程池执行成功。

2)通过submit()方法

  1. Future<?> future = threadpool.submit( new Runnable(){...});

  2. try {

  3. Object res = future.get(); //获取任务执行结果

  4. } catch (InterruptedException e) {

  5. // 处理中断异常

  6. e.printStackTrace();

  7. } catch (ExecutionException e) {

  8. // 处理无法执行任务异常

  9. e.printStackTrace();

  10. } finally{

  11. // 关闭线程池

  12. executor.shutdown();

  13. }

使用submit 方法来提交任务,它会返回一个Future对象,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

线程池的关闭:

• shutdown():不会立即终止线程池,而是再也不会接受新的任务,要等所有任务缓存队列中的任务都执行完后才终止
• shutdownNow():立即终止线程池,再也不会接受新的任务,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

线程池本身的状态

  1. volatile int runState;

  2. static final int RUNNING = 0; //运行状态

  3. static final int SHUTDOWN = 1; //关闭状态

  4. static final int STOP = 2; //停止

  5. static final int TERMINATED = 3; //终止,终结

1,当创建线程池后,初始时,线程池处于RUNNING状态;
2,如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕,最后终止;
3,如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务,返回没有执行的任务列表;
4,当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

参考:http://blog.csdn.net/shakespeare001/article/details/51330745

http://singleant.iteye.com/blog/1423931

http://blog.csdn.net/it\_man/article/details/7193727

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
2年前
java executor
在java.util.concurrent包中的ExecutorService的实现就是壹個线程池的实现任务的委托(TaskDelegation)壹旦线程把任务委托给ExecutorService,该线程就会继续执行与运行任务无关的其它任务。Executor框架的两级调度模型在HotSpotVM的线程模型中,Java线程
Wesley13 Wesley13
2年前
Java编程思想笔记整理
实现线程的方法:(1)继承thread(底层实现了runable)(2)实现Runable(3)Executor创建线程池(4)实现Callable接口(带返回结果)对于callable接口,可以通过FutureTask包装实现线程,也可以使用ExecutorService对象的submit实现。使用executor创建线程
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Stella981 Stella981
2年前
Executor框架
任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。线程池简化了线程的管理工作,并且java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。在Java类库中,任务执行的主要抽象不是Thread,而是Executor,如下所示:publicinterfaceExecutor{void
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
2年前
Java多线程学习笔记
Java中创建多线程的三种方法1、继承Thread类创建线程2、实现Runnable接口创建线程3、使用Callable和Future创建线程\
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这