24 动手设计并实现一个队列同步器AQS解决交通事故封路问题
Diego38 66 1

1. 前言

前面我们讲到 AQS 是并发组件和工具的基础,学好 AQS 是学习后续章节的前提,能达到事半功倍的效果。 本节我们借助一个场景面试题,借助 AQS 底层能力来实现一个同步器,来巩固学习的效果。

2. 事故封路问题设计分析

2.1 题目描述

题目: 车流行驶在公路上,车辆顺序通过路口,假设一个路口发生了严重的交通事故,交警会将道路完全封闭,所有车辆将停止通行,直到交警将道路解封,请使用多线程模拟这一过程。

2.2 技术分析

题目中,交警是一个线程,行驶即将通过路口的车辆是其他的多线程,路口是多线程竞争的资源,在发生交通事故后,只允许交警线程占据路口,其他车辆线程不能通过路口,需要等待交警释放路口。

其实这个和读写锁有些类似,在读写锁 ReentrantReadWriteLock 中

  • 当写锁占据资源时,在锁被释放前,所有尝试获取锁的读线程或其他写线程都会被阻塞。
  • 当一个或多个读线程占据资源时,任何尝试获取锁的写线程都会被阻塞,直到所有读线程释放资源。 读写锁中的写锁没有抢占权,当前即使有一个读锁未释放,写锁也会一直被阻塞。而题目中交警可以不用排队和事先通知,直接将路口封路,随时打断通行的状态。

因此我们做出以下设计分析:

  • 路口支持共享,所以内部通过 AQS 共享模式来实现

  • 将 State 用来存储道路有没有被封路,1 表示未封路,-1 表示被封路

  • 获取许可的条件是 State 的值大于等于 0, 即在未封路时都可以通行

  • 获取过许可的车辆线程在 state==1 条件下可释放许可,即在未封路条件下均可

  • 交警线程在封路之后需要调用 release 接口去解封

    2.3 API 设计

    因为是独占模式,我们实现同步器时只需要完成 tryAcquireShared 和 tryReleaseShared 方法的编写。

  • tryAcquireShared

protected final int tryAcquireShared(int acquires) {
            return getState() == 1 ? 1 : -1;
        }

返回值大于等于 0 意味着成功获取许可,state 我们就定义两个值,封路时 state==-1,未封路时 state==1。

  • tryReleaseShared
    protected final boolean tryReleaseShared(int releases) {
              for (;;) {
                  //state 等于 1 or -1
                  int state = getState();
                  if (state == 1) {
                      return true;
                  }
                  //交警线程才会传release==-1
                  if (releases == -1 && compareAndSetState(state, 1)) {
                      return true;
                  //识别是车辆线程,在封路时(state==-1)不做任何操作    
                  } else if (releases != -1) {
                      return false;
                  }
              }
    }
    当未封路即 State==1 时均可释放许可,当交警重新开启路口时传递 Release 参数为 - 1,需要将 state 状态由 - 1 CAS 改为 1,并返回 true,以便于后续唤醒等待的线程。

3. 事故封路 AQS 代码实现

public class SwitchSync {

    private final Sync sync;

    public SwitchSync() {
        sync = new Sync();
    }

    public void openSwitch() {
        sync.tryReleaseShared(-1);
    }

    public void closeSwitch() {
       sync.innerSetState();
    }

    public void acquireShared() {
        sync.acquireShared(1);
    }

    public void releaseShread() {
        sync.releaseShared(1);
    }

    public static class Sync extends AbstractQueuedSynchronizer {


        Sync() {
            setState(1);
        }

        public void innerSetState() {
            setState(-1);
        }

        protected final int tryAcquireShared(int acquires) {
            return getState() == 1 ? 1 : -1;
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int state = getState();
                if (state == 1) {
                    return true;
                }
                //交警线程才会传release==-1
                if (releases == -1 && compareAndSetState(state, 1)) {
                    return true;
                //识别是车辆线程,在封路时(state==-1)不做任何操作    
                } else if (releases != -1) {
                    return false;
                }
            }
        }
    }

    public static void main(String[] args) {
        SwitchSync switchSync = new SwitchSync();
        //车辆线程
        Thread thread2 = new Thread(() -> {
            for (;;) {
                //申请通行
                switchSync.acquireShared();
                try {
                    System.out.println("thread2" + "通行");
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    switchSync.releaseShread();
                }
            }
        } );

        //车辆线程
        Thread thread1 = new Thread(() -> {
            for (;;) {
                //申请通行
                switchSync.acquireShared();
                try {
                    System.out.println("thread1" + "通行");
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    switchSync.releaseShread();
                }
            }
        } );
        thread1.start();
        thread2.start();
        //交警线程
        Thread police = new Thread(() -> {
           //封锁路口
           switchSync.closeSwitch();
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("开放路口");
            //开放路口
            switchSync.openSwitch();
        } );
        police.start();
    }
}

输出如下:

thread1通行
thread2通行
开放路口
thread1通行
thread2通行
  • 命名为更加通用的名字 SwitchSync,内部定义一个静态类 Sync 来继承 AQS
  • closeSwitch 可以代表封路操作,openSwitch 代表解封操作
  • acquireShared 代表车辆线程申请获取通行许可,releaseShread 代表车辆线程释放许可
  • openSwitch 操作 State 状态,closeSwitch 需要做释放许可,唤醒等待的车辆线程
  • 建议大家通过 jstack 命令将线程 dump,观察在封路时,车辆线程所处的状态

    4. 总结

    本节我们学习了基于 AQS 实现的 “道路封路” 同步器,为了让大家容易接受,其实场景相对比较简单。假设题目增加一条规则,总共有 6 个车道,即一次最多允许 6 辆车通行,SwitchSync 该怎么改造呢?这个留给大家来实现。
预览图
评论区

索引目录