Java同步组件之CountDownLatch,Semaphore

代码逐风鹤
• 阅读 1208

Java同步组件概况

  • CountDownLatch : 是闭锁,通过一个计数来保证线程是否一直阻塞
  • Semaphore: 控制同一时间,并发线程数量
  • CyclicBarrier:字面意思是回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
  • ReentrantLock:是一个重入锁,一个线程获得了锁之后仍然可以反复加锁,不会出现自己阻塞自己的情况。
  • Condition:配合ReentrantLock,实现等待/通知模型
  • FutureTask:FutureTask实现了接口Future,同Future一样,代表异步计算的结果。

CountDownLatch 同步辅助类

CountDownLatch类位于java.util.concurrent包,利用它可以实现类似计数器的功能,比如有一个任务A,它要等待其它4个任务执行完毕后才能执行,此时就可以使用CountDownLatch来实现这种功能。

Java同步组件之CountDownLatch,Semaphore

假设计数器的值是3,线程A调用await()方法后,当前线程就进入了等待状态,之后其它线程中执行CountDownLatch,计数器就会减1,当计数器从3变成0,线程A继续执行,CountDownLatch这个类可以阻塞当前线程,保证线程在某种条件下,继续执行。

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量,这个值只能被设置一次,而且CountDownLatch没有提供任何机会修改这个计数值。

CountDownLatch代码案例

package com.rumenz.task;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class CountDownLatchTest {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch=new CountDownLatch(2);
        executorService.execute(()->{
            try{
                Thread.sleep(3000);
                System.out.println("任务一完成");

            }catch (Exception e){
               e.printStackTrace();
            }


            countDownLatch.countDown();
        });
        executorService.execute(()->{
            try{
                Thread.sleep(5000);
                System.out.println("任务二完成");

            }catch (Exception e){
                e.printStackTrace();
            }

            countDownLatch.countDown();
        });
        countDownLatch.await();
        //所有子任务执行完后才会执行
        System.out.println("主线程开始工作.....");
        executorService.shutdown();
    }
}

任务一完成
任务二完成
主线程开始工作.....

CountDownlatch指定时间完成任务,如果在规定时间内完成,则等待之前的等待线程(countDownLatch.await())继续执行

countDownLatch.await(int timeout,TimeUnit timeUnit);设置,第一个参数没超时时间,第二个参数为时间单位。
package com.rumenz.task;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class CountDownLatchTest {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch=new CountDownLatch(2);
        executorService.execute(()->{
            try{
                Thread.sleep(3000);
                System.out.println("任务一完成");

            }catch (Exception e){
               e.printStackTrace();
            }


            countDownLatch.countDown();
        });
        executorService.execute(()->{
            try{
                Thread.sleep(5000);
                System.out.println("任务二完成");

            }catch (Exception e){
                e.printStackTrace();
            }

            countDownLatch.countDown();
        });
        //这里只等3秒
        countDownLatch.await(3, TimeUnit.SECONDS);
        //所有子任务执行完后才会执行
        System.out.println("主线程开始工作.....");
        executorService.shutdown();
    }
}
//任务一完成
//主线程开始工作.....
//任务二完成

Semaphore控制线程数量

Semaphore经常用于限制获取某种资源的线程数量,其内部是基于AQS的共享模式,AQS的状态可以表示许可证的数量,许可证数量不够线程被挂起;而一旦有一个线程释放资源,那么可唤醒等待队列中的线程继续执行。

Java同步组件之CountDownLatch,Semaphore

Semaphore翻译过来就是信号量,Semaphore可以阻塞进程并控制同时访问的线程数,通过acquire()获取一个许可,如果没有就等待,而release()释放一个许可,Semaphore有点类似锁。

CountDownLatchSemaphore在使用时,通过和线程池配合使用。
Semaphore适合控制并发,CountDownLatch比较适合保证线程执行完后再执行其它处理,因此模拟并发两者结合最好。

Semaohore应用场景

Semaphore适合做流量控制,特别是共享的有限资源,比如数据库连接,假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控。
package com.rumenz.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


public class SemaphoreExample1 {
    private static Integer clientTotal=30;
    private static Integer threadTotal=3;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore=new Semaphore(threadTotal);
        for (int i = 0; i < clientTotal; i++) {
            final Integer j=i;
            executorService.execute(()->{
                try{
                    semaphore.acquire(); // 获取一个许可
                    update(j);
                    semaphore.release(); // 释放一个许可

                }catch (Exception e) {
                    e.printStackTrace();
                }

            });
            
        }
        executorService.shutdown();
    }

    private static void update(Integer j) throws Exception {
        System.out.println(j);
        Thread.sleep(2000);

    }
}

每2秒打印3个数字。
package com.rumenz.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


public class SemaphoreExample1 {
    private static Integer clientTotal=30;
    private static Integer threadTotal=3;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore=new Semaphore(threadTotal);
        for (int i = 0; i < clientTotal; i++) {
            final Integer j=i;
            executorService.execute(()->{
                try{
                    semaphore.acquire(3); // 获取多个许可
                    update(j);
                    semaphore.release(3); // 释放多个许可

                }catch (Exception e) {
                    e.printStackTrace();
                }

            });
            
        }
        executorService.shutdown();
    }

    private static void update(Integer j) throws Exception {
        System.out.println(j);
        Thread.sleep(2000);

    }
}
每2秒打印一个数字。

tryAcquire

尝试获取许可,如果获取不成功,则放弃操作,tryAcquire方法提供几个重载
  • tryAcquire() : boolean
  • tryAcquire(int permits) : boolean 尝试获取指定数量的许可
  • tryAcquire(int permits,long timeout,TimeUnit timeUnit) : boolean
  • tryAcquire(long timeout,TimeUnit timeUnit) : boolean 尝试获取许可的时候可以等待一段时间,在指定时间内未获取到许可则放弃

Semaphore源码分析

Semaphore有两种模式,公平模式和非公平模式。公平模式就是调用acquire的顺序就是获取许可证的顺序,遵循FIFO;而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。
// 非公平模式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// fair=true为公平模式,false=非公平模式
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

public class Semaphore implements java.io.Serializable {
    /*
     * 只指定许可量,构造不公平模式
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
   
    /*
     * 指定许可量,并指定模式
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    
    //Semaphore内部基于AQS的共享模式,所以实现都委托给了Sync类。 
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    
        /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            // 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            // 可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    
}

关注微信公众号:【入门小站】,解锁更多知识点

Java同步组件之CountDownLatch,Semaphore

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java 线程篇 之CyclicBarrier、CountDownLatch、Semaphore
java提供了很多控制线程到达某一状态导致之前阻塞线程运行的函数,这些在控制任务执行提供了很大的便利,比如在zookper使用Semaphore实现分布式锁1、CountDownLatchcountDownLatch提供await(),CountDownLatch()来控制,前面我很多例子,使用这个来模拟多线程运行的,所以这里不过多介绍2
Wesley13 Wesley13
3年前
java并发编程之二
CountDownLatch类  允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。  CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在
Wesley13 Wesley13
3年前
java并发中CountDownLatch的使用
java并发中CountDownLatch的使用在java并发中,控制共享变量的访问非常重要,有时候我们也想控制并发线程的执行顺序,比如:等待所有线程都执行完毕之后再执行另外的线程,或者等所有线程都准备好了才开始所有线程的执行等。这个时候我们就可以使用到CountDownLatch。简单点讲,CountDownLatch存有一个放在QueuedS
Wesley13 Wesley13
3年前
Java多线程并发控制工具CountDownLatch,实现原理及案例
闭锁(CountDownLatch)是Java多线程并发中的一种同步器,它是JDK内置的同步器。通过它可以定义一个倒计数器,当倒计数器的值大于0时,所有调用await方法的线程都会等待。而调用countDown方法则可以让倒计数器的值减一,当倒计数器值为0时所有等待的线程都将继续往下执行。闭锁的主要应用场景是让某个或某些线程在某个运行节点上等待N个条件都
Wesley13 Wesley13
3年前
Java多线程并发控制工具信号量Semaphore,实现原理及案例
信号量(Semaphore)是Java多线程兵法中的一种JDK内置同步器,通过它可以实现多线程对公共资源的并发访问控制。一个线程在进入公共资源时需要先获取一个许可,如果获取不到许可则要等待其它线程释放许可,每个线程在离开公共资源时都会释放许可。其实可以将Semaphore看成一个计数器,当计数器的值小于许可最大值时,所有调用acquire方法的线程都可以得到
Wesley13 Wesley13
3年前
Java多线程信号量同步类CountDownLatch与Semaphore
  信号量同步是指在不同线程之间,通过传递同步信号量来协调线程执行的先后次序。CountDownLatch是基于时间维度的Semaphore则是基于信号维度的。1:基于执行时间的同步类CountDownLatch  例如现有3台服务器,需编写一个获取各个服务器状态的接口,准备开三个子线程每个线程获取一台服务器状态后统一返回三台
Stella981 Stella981
3年前
CountDownLatch和CylicBarrier以及Semaphare你使用过吗
CountDownLatch是什么CountDownLatch的字面意思:倒计时门栓它的功能是:让一些线程阻塞直到另一些线程完成一系列操作后才唤醒。它通过调用await方法让线程进入阻塞状态等待倒计时0时唤醒。它通过线程调用countDown方法让倒计时中的计数器减去1,当计数器为0时,会唤醒哪些因为调用了await而阻塞的线程。
Wesley13 Wesley13
3年前
Java CyclicBarrier介绍
CyclicBarrier(周期障碍)类可以帮助同步,它允许一组线程等待整个线程组到达公共屏障点。CyclicBarrier是使用整型变量构造的,其确定组中的线程数。当一个线程到达屏障时(通过调用CyclicBarrier.await()),它会被阻塞,直到所有线程都到达屏障,然后在该点允许所有线程继续执行。与CountDownLatch不同的
Wesley13 Wesley13
3年前
JUC
Java5.0在java.util.concurrent包中提供了多种并发容器类来改进同步容器的性能。CountDownLatch一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一
Wesley13 Wesley13
3年前
Java并发中常用同步工具类
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程控制流。阻塞队列(BlockingQueue)可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore),栅栏(Barrier)以及闭锁(Latch)。在平台类库中还包含其他一些同步工具类的类,如果这些类还无法满足需要,那么可以创建自己的同步工具类。闭锁Latch
Stella981 Stella981
3年前
CountDownLatch
CountDownLatch\TOC\一.介绍CountDownLatch是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它们都存在于java.util.concurrent包下。Co
代码逐风鹤
代码逐风鹤
Lv1
白日放歌须纵酒,青春作伴好还乡。
文章
4
粉丝
0
获赞
0