Java并行程序基础(六)

Wesley13
• 阅读 526
  • ThreadFactory

ThreadFactory是一个接口,它只有一个方法,用来创建线程:

Thread newThread(Runnable r);

自定义线程池,可以跟踪线程池究竟何时创建了多少线程,也可以自定义线程的名称,组以及优先级等信息,甚至可以任性的将所有的线程设置为守护线程。总之,使用自定义线程池可以让我们更加自由的设置池子中所有的线程状态

package com.thread.t02;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test04 {
    public static class MyTask implements Runnable{
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, 
                TimeUnit.MILLISECONDS, 
                new SynchronousQueue<Runnable>(), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        System.out.println("create "+ t);
                        return t;
                    }
                });
        for(int i=0;i<5;i++){
            es.submit(task);
        }
        Thread.sleep(2000);
    }
}
  • 扩展线程池

ThreadPoolExecutor是一个可以扩展的线程池。它提供了beforeExecute(),afterExecute(),terminated()三个接口对线程池进行控制。

package com.thread.t02;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test05 {

    public static class MyTask implements Runnable{
        public String name;
        
        public MyTask(String name) {
            super();
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在执行"+":Thread ID:"+Thread.currentThread().getId()+",Task Name="+name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, 
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:"+((MyTask)r).name);
            }
            @Override
                    protected void afterExecute(Runnable r, Throwable t) {
                        // TODO Auto-generated method stub
                System.out.println("准备执行:"+((MyTask)r).name);
                    }
            @Override
                    protected void terminated() {
                        // TODO Auto-generated method stub
                        System.out.println("线程池退出");
                    }
        };
        for(int i=0;i<5;i++){
            MyTask task = new MyTask("Task-GEMY-"+i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
    
    
    
}
  • 优化线程池线程数量

Ncpu=CPU的数量

Ucpu=目标CPU的使用率,0<=Ucpu<=1

W/C = 等待时间与计算时间的比率

为保持处理器达到期望的使用率,最优的池的大小等于:

Ntheads=Ncpu*Ucpu*(1+W/C)

在java中,可以通过

Runtime.getRuntime().availableProcessors()

获得可用的CPU的数量。

  • 在线程池中寻找堆栈

    package com.thread.t02;

    import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;

    public class Test06 {

    public static class DivTask implements Runnable{
        int a,b;
    
        public DivTask(int a, int b) {
            super();
            this.a = a;
            this.b = b;
        }
    
        @Override
        public void run() {
            double re = a/b;
            System.out.println(re);
        }
        
    }
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor pools = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for(int i=0;i<5;i++){
    

    // pools.submit(new DivTask(100,i));//用execute代替submit,可以得到堆栈信息,submit不会打印堆栈信息 pools.execute(new DivTask(100,i)); } } }

    Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero at com.thread.t02.Test06$DivTask.run(Test06.java:22) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 100.0 50.0 25.0 33.0

我们可以从这个异常堆栈中只能知道异常时在哪里抛出的。但是我们希望得到另外一个更重要的信息,那就是这个任务到底在哪里提交的?

package com.thread.t02;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TraceThreadPoolExecutor extends ThreadPoolExecutor{
    public static class DivTask implements Runnable{
        int a,b;

        public DivTask(int a, int b) {
            super();
            this.a = a;
            this.b = b;
        }

        @Override
        public void run() {
            double re = a/b;
            System.out.println(re);
        }
        
    }
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    
    private Runnable wrap(final Runnable task ,final Exception clientStack,String clientThreadName){
        return new Runnable() {
            @Override
            public void run() {
                try{
                    task.run();
                }catch (Exception e) {
                    clientStack.printStackTrace();
                    throw e;
                }
            }
        };
    }
    
    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }
    
    private Exception clientTrace(){
        return new Exception("Client stack trace");
    }
    
    public static void main(String[] args) {
        ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 
                0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        
        for(int i=0;i<5;i++){
            pools.execute(new DivTask(100, i));
        }
    }
}

现在,我们不仅可以得到异常发生的Runnable实现内的信息,也知道这个任务是在哪里提价的。

java.lang.Exception: Client stack trace100.0
33.0

    at com.thread.t02.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:50)
    at com.thread.t02.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:46)
    at com.thread.t02.TraceThreadPoolExecutor.main(TraceThreadPoolExecutor.java:58)
50.0
Exception in thread "pool-1-thread-1" 25.0java.lang.ArithmeticException: / by zero

    at com.thread.t02.TraceThreadPoolExecutor$DivTask.run(TraceThreadPoolExecutor.java:20)
    at com.thread.t02.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:35)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
2年前
java 面试知识点笔记(十三)多线程与并发
java线程池,利用Exceutors创建不同的线程池满足不同场景需求:1.newSingleThreadExecutor() 创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。2.
Wesley13 Wesley13
2年前
java_线程池
血一样的教训,今天上午参加了一家现场面试java。在这之前,我一直认为我的java基础还是可以的,而今天一问三不知。现在将面试的问题整理出来一、说说java中的线程池?  1.线程池:线程池是线程的集合,不用自己创建线程,把线程直接给线程池,由线程池处理。   2.过程:首先,使用线程池可以重复利用已有的线程继续执行任务,避免线程在
Wesley13 Wesley13
2年前
java各种面试问题
二、Java多线程相关线程池的原理,为什么要创建线程池?创建线程池的方式;线程的生命周期,什么时候会出现僵死进程;说说线程安全问题,什么实现线程安全,如何实现线程安全;创建线程池有哪几个核心参数?如何合理配置线程池的大小?volatile、ThreadLocal的使用场景和原理;
zdd小小菜鸟 zdd小小菜鸟
1年前
多线程面试
多线程篇1.为什么要使用线程池tex避免频繁地创建和销毁线程,达到线程对象的重用。另外,使用线程池还可以根据项目灵活地控制并发的数目。2.java中如何获取到线程dump文件tex死循环、死锁、阻
Wesley13 Wesley13
2年前
Java多线程之线程池的手写改造和拒绝策略
目录自定义线程池的使用四种拒绝策略代码体现1\.自定义线程池的使用自定义线程池(拒绝策略默认AbortPolicy)publicclassMyThreadPoolDemo{  publicstaticvoidmain(Stringargs){    ExecutorSe
Wesley13 Wesley13
2年前
Java基础教程——线程池
启动新线程,需要和操作系统进行交互,成本比较高。使用线程池可以提高性能——线程池会提前创建大量的空闲线程,随时待命执行线程任务。在执行完了一个任务之后,线程会回到空闲状态,等待执行下一个任务。(这个任务,就是Runnable的run()方法,或Callable的call()方法)。Java5之前需要手动实现线程池,Java5之
Wesley13 Wesley13
2年前
Java 线程池原理分析
1.简介线程池可以简单看做是一组线程的集合,通过使用线程池,我们可以方便的复用线程,避免了频繁创建和销毁线程所带来的开销。在应用上,线程池可应用在后端相关服务中。比如Web服务器,数据库服务器等。以Web服务器为例,假如Web服务器会收到大量短时的HTTP请求,如果此时我们简单的为每个HTTP请求创建一个处理线程,那么服务器
Wesley13 Wesley13
2年前
Java 基础知识(七)
1.创建线程池1)newCacheThreadPool 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程 2)newFixedThreadPool  创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待 3)newScheduledThreadPool  创建一个定长线程池,支持
Wesley13 Wesley13
2年前
Java多线程:线程属性
\线程属性id:线程唯一标识。自动生成。不允许修改。name:线程的名字,可以自定义成有具体含义的名字,便于识别不同作用的线程。(可同名)isDaemon:是否是守护线程。true守护线程,false用
京东云开发者 京东云开发者
4个月前
ThreadPoolExecutor线程池内部处理浅析 | 京东物流技术团队
我们知道如果程序中并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束时,会因为频繁创建线程而大大降低系统的效率,因此出现了线程池的使用方式,它可以提前创建好线程来执行任务。本文主要通过java的ThreadPoolExecutor来查看线程池