java高并发之从零到放弃(三)

LogicAetherMaster
• 阅读 2148

前言

今天讲的多线程的同步控制
直接进入正题

ReentrantLock重入锁

重入锁可以完全代替synchronized,它需要java.util.concurrent.locks.ReentrantLock类来实现
下面用一个简单的例子来实现重入锁:

public class ReentrantLockThread implements Runnable{
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;
    @Override
    public void run() {
        for (int j=0;j<10000;j++){
            lock.lock();
            try {
                i++;
            }finally {
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ReentrantLockThread thread = new ReentrantLockThread();
        Thread t1 = new Thread(thread);
        Thread t2 = new Thread(thread);
        t1.start();t2.start();
        t1.join();t2.join();
        System.out.println(i);
    }
}

以上代码打印出来的i是20000,可以说明ReentrantLock也实现了线程同步
它相比synchronized更加灵活,因为重入锁实现了用户自己加锁.lock(),自己释放锁.unlock()(记得一定要释放,不然其他线程无法进入)
当然重入锁同一个对象可以加两个锁,但也要记得释放两个锁(多释放了会抛出异常,少释放了那其它线程就进不来)

重入锁的中断功能也是它的高级功能之一:
在run()代码块中写入lock.lockInterruptibly()方法,当线程实例使用t1.interrupt()时,外部通知便会就会中断t1线程
下面来一个简单示例代码Demo:

public class interruptTest {

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(){
            public void run(){
                while (true){
                    System.out.println("go");
                    if (Thread.currentThread().isInterrupted()){
                        break;
                    }
                }
            }
        };
        t1.start();
        Thread.sleep(10001);
        t1.interrupt();
    }
}

发现t1线程实现interrupt()方法时,线程实现代码中的.isInterrupted()执行了,并且中断了t1线程
中断功能可以很好的防止两个线程间互相等待,出现死锁的现象。

除了.interrupt()通知,要避免死锁的另一种方法,就是限时等待:lock.tryLock()
我们来看下代码:

public class MyThread implements Runnable{
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;
    @Override
    public void run() {
        try {
            if (lock.tryLock(5, TimeUnit.SECONDS)){
                Thread.sleep(6000);
            }else {
                System.out.println("结束线程");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            if (lock.isHeldByCurrentThread()){
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MyThread thread = new MyThread();
        Thread t1 = new Thread(thread);
        Thread t2 = new Thread(thread);
        t1.start();
        t2.start();
    }
}

lock.tryLock(5, TimeUnit.SECONDS)解释下:
如果超过5秒还没有得到锁,就返回false,执行else语句
如果成功获得锁,就返回true,执行sleep语句
所以5秒后打印结束线程,结束的就是等待5秒后没有拿到锁的线程

当然也可以不带等待时间,直接if(lock.tryLock())

下面是对ReentrantLock的整理

lock():获得锁,如果锁被占用,则等待
lockInterruptibly():获得锁,但优先响应中断
tryLock():获得锁,如果成功返回true,如果失败返回false
unlock():释放锁

Condition条件

Condition是和ReentrantLock关联的
利用绑定的Condition我们可以让线程在合适的时间等待,或者在某一特定时刻获得通知继续执行
下面演示一段简单的Condition代码

public class MyThread implements Runnable{
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();
    @Override
    public void run() {
        try {
            lock.lock();
            Thread.sleep(1000);
            System.out.println("t1拿到锁,接着进入等待,并且释放锁");
            condition.await();
            Thread.sleep(4000);
            System.out.println("t1又拿到锁");
        } catch (InterruptedException e) {
        }finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MyThread thread = new MyThread();
        Thread t1 = new Thread(thread);
        t1.start();
        Thread.sleep(5000);
        lock.lock();
        System.out.println("主线程占用锁");
        condition.signal();
        System.out.println("唤醒成功,主线程释放锁");
        lock.unlock();
    }
}

await():会让当前线程进入等待并且释放锁
singal();会唤醒一个在等待中的线程,当然执行方法的线程必须释放锁,被唤醒的线程才能得到锁

Semaphore信号量

怎样才能规定进入一段代码的线程数
这里我们使用信号量,在外面定义Semaphore semp = new Semaphore(10);,这样简单的设定了5个线程
在run()方法中semp.acquire();表示获得了10个中的其中一个许可证
当你的工作代码完成时,依旧在run()方法的后面写上semp.release();,许可证被释放(跟锁一个道理)

ReadWriteLock读写锁

我们知道读不会响应数据,写会影响数据
所以我们真正操作的时候要求只读的那些线程可以一起执行,不用同步操作
而与写有关的线程全部要同步,以保护数据的安全
那么我们怎样才能做到只读线程不用同步呢
这里需要用到读写锁,下面演示一段读写锁的简单例子:

public class ReadWriteThread {
    private static Lock lock = new ReentrantLock();
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static Lock readLock = readWriteLock.readLock();
    private static Lock writeLock = readWriteLock.writeLock();
    private int value;

    //读
    public int handleRead(Lock lock) throws InterruptedException{
        try {
            lock.lock();
            Thread.sleep(5000); //模拟读线程
            System.out.println("读完成");
            return value;
        } finally {
            lock.unlock();
        }
    }
    //写
    public void handleWrite(Lock lock,int index) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(5000);
            value=index;
            System.out.println("写完成");
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ReadWriteThread demo = new ReadWriteThread();
        Thread t1 = new Thread(){
            public void run(){
                try {
                    demo.handleRead(readLock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread t2 = new Thread(){
            public void run(){
                try {
                    demo.handleRead(readLock);
                    demo.handleWrite(writeLock,2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        t1.start();
        t2.start();
    }
}

执行结果可以发现,对于只读的方法,我们给予readLock锁,而写的方法给予writeLock锁
如此这般,只读的方法可以并行,而读写只能串行

CountDownLatch倒计时器

倒计时器用来控制线程等待,它可以让一个线程等待直到倒计时结束,再开始执行
还是通过实例来让大家知道什么是倒计时器,并且它能有什么作用
这个例子的需求是:要在主线程之前完成之个类线程才能继续主线程:

public class CountDownLatchThread implements Runnable{
    private static CountDownLatch countDownLatch = new CountDownLatch(10);
    private static CountDownLatchThread countDownLatchThread = new CountDownLatchThread();

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println("此线程工作完成");
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);    //创建一个固定大小为10的线程池
        for (int i=0;i<10;i++){
            executorService.submit(countDownLatchThread);
        }
        countDownLatch.await();
        System.out.println("10个任务完成");
        executorService.shutdown();
    }
}

countDownLatch.await();:让主线程进入等待,等待所有10个线程完成
countDownLatch.countDown();:说明这个线程已完成,并进入统计

线程池

线程池和数据库连接池一样,事先准备好线程,当程序需要线程时,调用线程池中空闲的线程,当工作线程工作完毕时,重新放回线程池
JDK提供了一套Executor框架,本质是线程池。
框架中的成员变量在java.util.concurrent包中,是JDK并发包的核心类
其中ThreadPoolExecutor实现了Executor接口,任何Runnable都可以被ThreadPoolExecutor线程池调度

Executor提供了各种类型的线程池,主要有以下工厂方法创建不同的线程池:
newFixedThreadPool()方法:返回一个固定线程数量的线程池,当一个新任务提交时,如果有空闲线程,立即执行,如果没有空闲线程,新任务会被放在一个等待队列中去等待空闲线程
newCachedThreadPool()方法:返回一个根据实际情况调整线程数量的线程池,
newSingleThreadScheduledExecutor()方法:返回一个对象,可放线程数量为1,但是这个线程池拓展了计划任务功能,可以规定执行时间、周期性等等

上面这些线程池的源码其实都是用ThreadPoolExecutor实现:
我们来看下ThreadPoolExecutor的构造函数:

public ThreadPoolExecutor(int corePoolSize,    //指定线程池中线程的数量
                          int maximumPoolSize,//指定线程池中最大线程数量        
                          long keepAliveTime,    //当线程池中线程数量超过corePoolSize时,多余线程的存活时间
                          TimeUnit unit,    //keepAliveTime的单位
                          BlockingQueue<Runnable> workQueue,    //等待任务队列,被提交都是尚未执行的任务
                          ThreadFactory threadFactory,    //线程工厂,用于创建线程,一般默认
                          RejectedExecutionHandler handler    //拒绝策略,当任务太多时,如何拒绝任务
    }

下面我来讲讲BlockingQueue:
在ThreadPoolExecutor构造器中,有以下几种BlockingQueue:
1.直接提交队列:有SynchronousQueue对象提供,提交的任务如果没有空闲线程尝试新建线程,如果线程数量已达到最大,则执行拒绝策略
2.有界的任务队列:使用ArrayBlockingQueue对象实现,构造器带一个任务的容量参数,若等待队列已满,总线程不大于maximumPoolSize的前提下,创建新的线程执行任务,若大于,则执行拒绝策略
3.无界的任务队列:使用LinkedBlockingQueue来实现,和有界相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况
4.有限任务队列:通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序

再来看newFixedThreadPool(),它使用了无界任务队列,并且corePoolSize和maximumPoolSize一样大,因为对于固定大小的线程池,不存在线程数量的动态变化,当任务提交非常频繁时,可能会耗尽系统资源

而newCachedThreadPool()方法返回corePoolSize为0,maximumPoolSize无限大的线程池,使用了SynchronousQueue队列,当任务执行完毕后,由于corePoolSize为0,空闲线程会在指定时间(60s)回收

讲完了BlockingQueue我们来讲下RejectedExecutionHandler拒绝策略
JDK内置了四种拒绝策略:
1.AbortPolicy:直接抛出异常,阻止系统正常工作
2.CallerRunsPolicy:只要线程池没有关闭,该策略直接调用工作线程运行当前被丢弃的任务
3.DiscardOledestPolicy:丢弃最老的一个等待任务,也就是即将被执行的一个任务,并尝试再次提交当前任务
4.DiscardPolicy:默默地丢弃无法处理的任务,如果运行任务丢失,这是最好的一个策略

当然我们也可以自己写拒绝策略
下面我来写一个打印出被拒绝的策略(而不是选择抛异常,因为抛异常我们还要去捕捉异常,如果没有捕捉到会导致系统奔溃)

public class ThreadPoolTest {
    public static class TestThread implements Runnable{
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis()+"线程ID:"+Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        TestThread testThread = new TestThread();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.SECONDS,
                                                    new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(),
                                                    new RejectedExecutionHandler(){
                                                        @Override
                                                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                                            System.out.println("等待线程被拒绝");
                                                        }
                                                    });
        for (int i=0;i<Integer.MAX_VALUE;i++){
            es.submit(testThread);
            Thread.sleep(10);
        }
    }
}

来看下运行结果:

1511833277724线程ID:13
1511833277771线程ID:10
1511833277771线程ID:14
1511833277771线程ID:12
等待线程被拒绝
等待线程被拒绝
1511833277833线程ID:13
1511833277833线程ID:11

可以发现,我们自定义的线程池和拒绝策略完美的执行了

并发高效容器

下面我介绍给大家几个非常好用的工具类(当然都是线程安全的)
1.ConcurrentHashMap:这是一个高效的hashMap
2.CopyOnwriteArrayList:在读多写少的场合这个list非常好用,远胜与Vector
3.ConCurrentLinkedQueue:高效的并发队列,使用链表实现,可以看做是一个线程安全的LinkedList
4.BlockingQueue:这个接口上面说过,表示阻塞队列,非常适合用于作为数据共享的通道
5.ConcurrentSkipListMap:这是一个Map,使用跳表的数据结构进行快速的查找

如果并不追求高效,我们可以使用Collections类帮助把线程不安全的容器转换为线程安全
如将HashMap转换为线程安全:

Map map = Collections.synchronizedMap(new HashMap<String,Object>());

当然可以使用CAS操作来替代synchronized

今天就先到这里,大家可以看看这些内容的拓展
记得点关注看更新,谢谢阅读

点赞
收藏
评论区
推荐文章
Easter79 Easter79
4年前
synchronized底层原理
前言一、synchronized的特性1.1原子性1.2可见性1.3有序性1.4可重入性二、synchronized的用法三、synchronized锁的实现3.1同步方法3.2同步代码块四、
Wesley13 Wesley13
4年前
java多线程之ReentrantLock
前言相信学过java的人都知道synchronized这个关键词,也知道它用于控制多线程对并发资源的安全访问,兴许,你还用过Lock相关的功能,但你可能从来没有想过java中的锁底层的机制是怎么实现的。如果真是这样,而且你有兴趣了解,今天我将带领你轻松的学习下java中非常重要,也非常基础的可重入锁ReentrantLock的实现机制。R
Wesley13 Wesley13
4年前
java中的锁
记录一下公平锁,非公平锁,可重入锁(递归锁),读写锁,自旋锁的概念,以及一些和锁有关的java类。公平锁与非公平锁:公平锁就是在多线程环境下,每个线程在获取锁时,先查看这个锁维护的队列,如果队列为空或者自身就是等待队列的第一个,就占有锁。否则就加入到等待队列中,按照FIFO的顺序依次占有锁。非公平锁会一上来就试图占
Wesley13 Wesley13
4年前
java并发相关(四)——关于synchronized的可重入性,线程切换实现原理与是否公平锁
一、可重入性  关于synchronized的可重入性的证明,我们可以通过A类内写两个同步方法syncA(),syncB()。然后syncA内调用syncB,调用syncA发现代码可正常执行,来证明这一点。  当处于无锁阶段时,划掉,都重入了不可能处于无锁。  当处于偏向锁阶段时,由之前对偏向锁的解释可知,偏向当前线程id是,当前线程可直
Wesley13 Wesley13
4年前
java 面试知识点笔记(十二)多线程与并发
问:synchronized和ReentrantLock的区别?ReentrantLock(可重入锁)位于java.util.concurrent.locks包(著名的juc包是由Douglea大神写的AQS抽象类框架衍生出来的应用)和CountDownLatch、FutureTask、Semaphore一样基于AQS实现
Stella981 Stella981
4年前
ReenTrantLock可重入锁和synchronized的区别
ReenTrantLock可重入锁和synchronized的区别可重入性:从名字上理解,ReenTrantLock的字面意思就是再进入的锁,其实synchronized关键字所使用的锁也是可重入的,两者关于这个的区别不大。两者都是同一个线程没进入一次,锁的计数器都自增1,所以要等到锁的计数器下降为0时才能释放锁。锁的实现:S
Wesley13 Wesley13
4年前
Java并发系列4
今天讲另一个并发工具,叫读写锁。读写锁是一种分离锁,是锁应用中的一种优化手段。考虑读多写少的情况,这时如果我们用synchronized或ReentrantLock直接修饰读/写方法未尝不可,如:publicstaticclassRw{privateintval;publicsynchr
Wesley13 Wesley13
4年前
Java并发源码之ReentrantLock
ReentrantLock介绍ReentrantLock是一个可重入的互斥锁,与使用synchronized方法和语句访问的隐式监视锁具有相同的基本行为和语义,但具有扩展功能。ReentrantLock属于最后一个成功加锁并且还没有释放锁的线程。当一个线程请求lock时,如果锁不属于任何线程,将立马得到这个锁;如果锁已经被
Wesley13 Wesley13
4年前
Java多线程进阶干货(2)
问题1:子类可以调用父类的同步方法吗?/    一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个对象的锁,  再次申请时,仍然会得到该对象的锁,也就是说synchronized获得的锁是可重入的   这里是继承中有可能发生的情形,子类调用父类的同步方法 /public class Test09 {    sy
Wesley13 Wesley13
4年前
Java中所有锁介绍
在读很多并发文章中,会提及各种各样锁如公平锁,乐观锁等等,这篇文章介绍各种锁的分类。介绍的内容如下:1.公平锁/非公平锁2.可重入锁/不可重入锁3.独享锁/共享锁4.互斥锁/读写锁5.乐观锁/悲观锁6.分段锁7.偏向锁/轻量级锁/重量级锁8.自旋锁上面是很多锁的名词,这些分类并不是全是指锁的
并发情况如何实现加锁来保证数据一致性? | 京东云技术团队
单体架构下锁的实现方案1\.ReentrantLock全局锁ReentrantLock(可重入锁),指的是一个线程再次对已持有的锁保护的临界资源时,重入请求将会成功。简单的与我们常用的Synchronized进行比较:||ReentrantLock|Syn