J.U.C体系进阶(四):juc

Wesley13
• 阅读 664

Java - J.U.C体系进阶

作者:Kerwin

邮箱:806857264@qq.com

说到做到,就是我的忍道!

juc-sync 同步器框架

同步器名称

作用

CountDownLatch

倒数计数器,构造时设定计数值,当计数值归零后,所有阻塞线程恢复执行;其内部实现了AQS框架

CyclicBarrier

循环栅栏,构造时设定等待线程数,当所有线程都到达栅栏后,栅栏放行;其内部通过ReentrantLock和Condition实现同步

Semaphore

信号量,类似于“令牌”,用于控制共享资源的访问数量;其内部实现了AQS框架

Exchanger

交换器,类似于双向栅栏,用于线程之间的配对和数据交换;其内部根据并发情况有“单槽交换”和“多槽交换”之分

Phaser

多阶段栅栏,相当于CyclicBarrier的升级版,可用于分阶段任务的并发控制执行;其内部比较复杂,支持树形结构,以减少并发带来的竞争

CountDownLatch

注意:CountDownLatch和CyclicBarrier非常相似,且CyclicBarrier是可以重用的,根据具体的场景不同,代码结构不同,其实两者之间可以相互转化,详见CyclicBarrier模块,下文是CountDownLatch-Demo

// 用法比较简单,直接上代码即可
// 1.CountDownLatch的同一对象传递
// 2.构造参数的默认值需要指定
// 3.线程完成的countDown()->会使默认值减一
// 4.主线程awiw()等待,所有线程都countDown之后,主线程执行
// 应用场景:比如五个子线程文件输出导出数据,主线程等所有子线程都完成之后开始压缩操作,上传文件
          
public class TestCountDownLatch {
    
    /*** * 关键点:面向对象的方式->参数传递,把CountDownLatch进行传递,使其共用同一个参数 * @param args */
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(5);

        ExecutorService executorService = Executors.newCachedThreadPool();
        MyWoker m1 = new MyWoker("work1", latch);
        MyWoker m2 = new MyWoker("work2", latch);
        MyWoker m3 = new MyWoker("work3", latch);
        MyWoker m4 = new MyWoker("work4", latch);
        MyWoker m5 = new MyWoker("work5", latch);
        Boss boss = new Boss("boos", latch);
        executorService.submit(m1);
        executorService.submit(m2);
        executorService.submit(m3);
        executorService.submit(m4);
        executorService.submit(m5);
        executorService.submit(boss);
        
        executorService.shutdown();
    }
}

class MyWoker implements Callable<String> {
    
    private String name;
    private CountDownLatch latch;
    
    public MyWoker (String name, CountDownLatch latch) {
        this.name = name;
        this.latch = latch;
    }
    
    @Override
    public String call() throws Exception {
        System.out.println(name + " 工人开始工作");
        int time = (int)(Math.random() * 100) * 50;
        Thread.sleep(time);
        System.out.println(name + " 工人已经完成任务!");
        latch.countDown();
        return "successful";
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public void setLatch(CountDownLatch latch) {
        this.latch = latch;
    }
}

class Boss implements Callable<String> {
    
    private String name;
    private CountDownLatch latch;
    
    public Boss (String name, CountDownLatch latch) {
        this.name = name;
        this.latch = latch;
    }
    
    @Override
    public String call() throws Exception {
        System.out.println("老板准备就绪,等工人都完成了就来视察~");
        latch.await();
        System.out.println("老板来了,快跑啊~");
        return "successful";
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public void setLatch(CountDownLatch latch) {
        this.latch = latch;
    }
}

CyclicBarrier

CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点

一句话概述就是:“人满发车”

重点理解:

CountDownLatch主要用于主线程阻塞,等待子线程执行完毕后,主线程执行,例如报表导出压缩上传,子线程处理报表,主线程等都执行完毕后,压缩,上传

CyclicBarrier侧重点是人满发车,比如LOL,需要等待是个用户都加载好了之后,再开启主线程执行工作,值得注意的是,这是一般意义的CyclicBarrier

但是,CyclicBarrier提供了另一个构造方法,即可以指定默认额外的执行线程

CyclicBarrier barrier = new CyclicBarrier(5,  new TotalTask(totalService));

这意味着,在很多情况CyclicBarrier可以代替CountDownLatch,主要看代码的结构设计

比如刚才的问题:报表导出压缩上传,子线程处理报表,主线程等都执行完毕后,压缩,上传

如果我用着这种构造方法,配合awit()的位置,让压缩上传线程默认作为最后执行的线程,即可保证执行的顺序,来看个Demo吧:

Demo-1 CyclicBarrier普通使用方法:

public class TestCyclicBarrier {

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cycli = new CyclicBarrier(10);
        
        for (int i = 0; i < 9; i++) {
            new Thread(new BarrierThread("张" + i, cycli)).start();
        }
        
        Thread.sleep(3000);
        new Thread(new BarrierThread("张" + 10, cycli)).start();
        
        Thread.sleep(5000);
    }

}

class BarrierThread implements Runnable{
    
    private String  name;
    private CyclicBarrier cycli;
    
    public BarrierThread(String name, CyclicBarrier cycli) {
        super();
        this.name = name;
        this.cycli = cycli;
    }

    @Override
    public void run() {
        System.out.println(name + " 准备就绪");
        try {
            cycli.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println(name + " 开始执行");
    }
    
    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public CyclicBarrier getCycli() {
        return cycli;
    }

    public void setCycli(CyclicBarrier cycli) {
        this.cycli = cycli;
    }
}

Demo-2 CyclicBarrier 另一种构造方法的使用:

注意 CyclicBarrier barrier = new CyclicBarrier(5, new TotalTask(totalService));

再注意子线程中,awit等待的代码位置,这个代码位置在程序的最后面,因此CyclicBarrier 的灵活性完全可以由我们来把控,到底在哪一点阻塞,完全是我们自己控制的,这样的变化,就可以在一定程度上替代CountDownLatch,达到更加灵活的目的,CyclicBarrier 且是可重复使用的,细节可以再去深入了解

/** * 各省数据独立,分库存偖。为了提高计算性能,统计时采用每个省开一个线程先计算单省结果,最后汇总。 * * @author guangbo email:weigbo@163.com * */  
public class Total {   
  
    // private ConcurrentHashMap result = new ConcurrentHashMap(); 
  
    public static void main(String[] args) {   
        TotalService totalService = new TotalServiceImpl();   
        CyclicBarrier barrier = new CyclicBarrier(5,   
                new TotalTask(totalService));   
  
        // 实际系统是查出所有省编码code的列表,然后循环,每个code生成一个线程。 
        new BillTask(new BillServiceImpl(), barrier, "北京").start();   
        new BillTask(new BillServiceImpl(), barrier, "上海").start();   
        new BillTask(new BillServiceImpl(), barrier, "广西").start();   
        new BillTask(new BillServiceImpl(), barrier, "四川").start();   
        new BillTask(new BillServiceImpl(), barrier, "黑龙江").start();   
  
    }   
}   
  
/** * 主任务:汇总任务 */  
class TotalTask implements Runnable {   
    private TotalService totalService;   
  
    TotalTask(TotalService totalService) {   
        this.totalService = totalService;   
    }   
  
    public void run() {   
        // 读取内存中各省的数据汇总,过程略。 
        totalService.count();   
        System.out.println("=======================================");   
        System.out.println("开始全国汇总");   
    }   
}   
  
/** * 子任务:计费任务 */  
class BillTask extends Thread {   
    // 计费服务 
    private BillService billService;   
    private CyclicBarrier barrier;   
    // 代码,按省代码分类,各省数据库独立。 
    private String code;   
  
    BillTask(BillService billService, CyclicBarrier barrier, String code) {   
        this.billService = billService;   
        this.barrier = barrier;   
        this.code = code;   
    }   
  
    public void run() {   
        System.out.println("开始计算--" + code + "省--数据!");   
        billService.bill(code);   
        // 把bill方法结果存入内存,如ConcurrentHashMap,vector等,代码略 
        System.out.println(code + "省已经计算完成,并通知汇总Service!");   
        try {   
            // 通知barrier已经完成 
            barrier.await();   
        } catch (InterruptedException e) {   
            e.printStackTrace();   
        } catch (BrokenBarrierException e) {   
            e.printStackTrace();   
        }   
    }   
  
}

相关方法:

— getParties()

获取CyclicBarrier打开屏障的线程数量,也成为方数。

— getNumberWaiting()

获取正在CyclicBarrier上等待的线程数量。

—await()

—await(timeout,TimeUnit)

—isBroken()

获取是否破损标志位broken的值,此值有以下几种情况:

CyclicBarrier初始化时,broken=false,表示屏障未破损。
如果正在等待的线程被中断,则broken=true,表示屏障破损。
如果正在等待的线程超时,则broken=true,表示屏障破损。
如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。
—reset()

使得CyclicBarrier回归初始状态,直观来看它做了两件事:

如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
将是否破损标志位broken置为false。

CountDownLatch和CyclicBarrier的主要联系和区别如下:

1.闭锁CountDownLatch做减计数,而栅栏CyclicBarrier则是加计数。

2.CountDownLatch是一次性的,CyclicBarrier可以重用。

3.CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成。

4.鉴于上面的描述,CyclicBarrier在一些场景中可以替代CountDownLatch实现类似的功能

Semaphore

信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可

Semaphore用来控制并发线程数,但有个问题FixedThreadPool也可以控制最大并发数,那两者有何不一样呢?首先,量级来看,Semaphore轻量级,是一个并发工具类,线程池重量级无疑,其次特点来讲,Semaphore内的线程是我们实实在在自己创建的,FixedThreadPool是分配给我们的线程池里面的线程

另外,Semaphore如果默认大小为1的时候,还可以当作互斥锁使用,且有公平锁和非公平锁之分(是否按顺序执行,是则就是公平的,但是非常耗性能)

代码Demo,线程队列和Semaphore配合使用

public class TestQueeThread_2 {
    static Semaphore semaphore = new Semaphore(1);
    public static void main(String[] args) throws InterruptedException {
        System.out.println("begin:" + (System.currentTimeMillis() / 1000)); 
        BlockingQueue<String> myQueue = new ArrayBlockingQueue<String>(1);
       
        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        String log = myQueue.take();
                        System.out.println(Thread.currentThread().getName() + ":" + doSome(log));
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        
        for (int i = 0; i < 100; i++) { // 这行代码不能改动
            String input = i + "";
            myQueue.put(input);
        }
    }

    public static String doSome(String input) {
        String output = null;
        try {
            Thread.sleep(1000);
            output = input + ":" + (System.currentTimeMillis() / 1000);
            return output;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } 
        return output;
    }
}

// 这种用法可能违背了oracle的本意,我们来看看oracle的官方Demo
// Semaphore是用来约束线程使用共享资源的,控制数据一致性,当然还得是锁
class Pool {
    private static final int MAX_AVAILABLE = 100; // 可同时访问资源的最大线程数
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    protected Object[] items = new Object[MAX_AVAILABLE];   //共享资源
    protected boolean[] used = new boolean[MAX_AVAILABLE];
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }
    private synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null;
    }
    private synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else
                    return false;
            }
        }
        return false;
    }
}

Exchanger

Exchanger是用作线程并发协作的工具类,简单一句话讲,如果A,B线程都拥有Exchanger对象,如果某一个调用Exchanger的交换方法exchange时候,快的那个会主动等慢的那个(等的意思就是挂起),然后都到位之后,互相唤醒交换数据

代码Demo:

public class ExchangerTest {
    static class Producer extends Thread {
        private Exchanger<Integer> exchanger;
        private static int data = 0;

        Producer(String name, Exchanger<Integer> exchanger) {
            super("Producer-" + name);
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            for (int i = 1; i < 5; i++) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = i;
                    System.out.println(getName() + " 交换前:" + data);
                    data = exchanger.exchange(data);
                    System.out.println(getName() + " 交换后:" + data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer extends Thread {
        private Exchanger<Integer> exchanger;
        private static int data = 0;

        Consumer(String name, Exchanger<Integer> exchanger) {
            super("Consumer-" + name);
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (true) {
                data = 0;
                System.out.println(getName() + " 交换前:" + data);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = exchanger.exchange(data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(getName() + " 交换后:" + data);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        new Producer("", exchanger).start();
        new Consumer("", exchanger).start();
        TimeUnit.SECONDS.sleep(7);
        System.exit(-1);
    }
}

结果打印:
Consumer- 交换前:0
Producer- 交换前:1
Consumer- 交换后:1
Producer- 交换后:0
Consumer- 交换前:0
Producer- 交换前:2
Producer- 交换后:0
Consumer- 交换后:2
Consumer- 交换前:0
Producer- 交换前:3
Producer- 交换后:0
Consumer- 交换后:3
Consumer- 交换前:0
Producer- 交换前:4
Producer- 交换后:0
Consumer- 交换后:4
Consumer- 交换前:0

我暂时没有碰到用需要此特点的需求,不过很显然它可以用作生产者消费者模式,通过网上的解释来看,Exchanger的实现是非常复杂的,主要是依赖CAS自旋操作

Phase

Phaser 是一个多栅栏的同步工具

phase(阶段) - Phaser也有栅栏,在Phaser中,栅栏的名称叫做phase(阶段),在任意时间点,Phaser只处于某一个phase(阶段),初始阶段为0,最大达到Integerr.MAX_VALUE,然后再次归零。当所有parties参与者都到达后,phase值会递增

parties(参与者) - 其实就是CyclicBarrier中的参与线程的概念,CyclicBarrier中的参与者在初始构造指定后就不能变更,而Phaser既可以在初始构造时指定参与者的数量,也可以中途通过register、bulkRegister、arriveAndDeregister等方法注册/注销参与者

arrive(到达) / advance(进阶) - Phaser注册完parties(参与者)之后,参与者的初始状态是unarrived的,当参与者到达(arrive)当前阶段(phase)后,状态就会变成arrived。当阶段的到达参与者数满足条件后(注册的数量等于到达的数量),阶段就会发生进阶(advance)——也就是phase值+1

public class SwimmerTest {

    // 游泳选手个数
    private static int swimmerNum = 6;

    public static void main(String[] args) {

        Phaser phaser = new Phaser(7){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
                return phase >= 2  || registeredParties == 0;
            }
        };

        for (int i = 0; i < swimmerNum; i++) {
            new Thread(new Swimmer(phaser), "swimmer" + i).start();
        }

        // 主线程到达,开启第二阶段
        phaser.arriveAndAwaitAdvance();
        
        // 主线程销毁,开启第三阶段
        phaser.arriveAndDeregister();
        
        // 加while是为了防止其它线程没结束就打印了"比赛结束"
        while (!phaser.isTerminated()) {}

        System.out.println("===== 比赛结束 =====");
    }
}

class Swimmer implements Runnable {
    
    private Phaser phaser;

    public Swimmer(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {

        // 从这里到第一个phaser.arriveAndAwaitAdvance()是第一阶段做的事
        System.out.println("游泳选手-" + Thread.currentThread().getName() + ":已到达赛场");

        phaser.arriveAndAwaitAdvance();

        // 从这里到第二个phaser.arriveAndAwaitAdvance()是第二阶段做的事
        System.out.println("游泳选手-" + Thread.currentThread().getName() + ":已准备好");

        phaser.arriveAndAwaitAdvance();

        // 从这里到第三个phaser.arriveAndAwaitAdvance()是第三阶段做的事
        System.out.println("游泳选手-" + Thread.currentThread().getName() + ":完成比赛");

        phaser.arriveAndAwaitAdvance();
    }
}

上文说到,Phase阶段概念,且注册数量和到达数量一致的之后,就会进入下一个阶段,代码中即是如此,一开始注册七个指标,游泳子线程会运行到达6个,然后由主线程控制到达,进入到下一个阶段,注意的是参与的线程可以注册也可以销毁,所以主线程阶段二是到达,阶段三测试了销毁

Phaser 的onAdvance方法:

Phaser phaser = new Phaser(7){
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
        return phase >= 2  || registeredParties == 0;
    }
};

当前阶段,最后一个线程到达后,会触发onAdvance方法,此处是打印了信息,且写明了Phaser终止的标志,注册线程数为0或阶段数到达2 (0,1,2)

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
2年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
2年前
J.U.C体系进阶(五):juc
JavaJ.U.C体系进阶作者:Kerwin邮箱:806857264@qq.com说到做到,就是我的忍道!juccollections集合框架ConcurrentHashMapConcurrentHashMap是线程
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这