CountDownLatch + Callbale+FutureTask 实现异步变同步调用

雾凇闭包
• 阅读 6356

背景

通过HTTP接口实现调用MQTT Client发送数据,HTTP接口返回值为MQTT Client发送数据的对应结果。 HTTP接口为同步阻塞,MQTT Client 为异步回调方式。
如何实现在HTTP接口中调用MQTT Client发送数据后,能够阻塞等待MQTT返回结果,然后将结果返回?

解决方法

CountDownLatch + Callbale+FutureTask

1.CountDownLatch作用

CountDownLatch实现在MQTT Client 发送数据后 到接收数据后这段时间的阻塞。
HTTP每次请求,新建一个CountDownLatch,然后将CountDownLatch作为值和deviceId作为KEY保存到Map中,
调用MQTT Client 发送数据后,countDownLatch.await(),进行同步等待
在MQTT Client接收数据的回调方法中更加deviceId取出CountDwonLatch然后计数减一


2.Callbale+FutureTask作用

将调用MQTT Client发送数据的过程,封装成Callable,投递发送任务时,通过返回的FutureTask的get()方法,
同步阻塞,直到结果返回。

关键代码

1.Map保存CountDownObj用于同步阻塞等待MQTT Client返回结果,以及将返回结果传递个FutureTask

    private final static ConcurrentMap<String, CountDownObj> countDownLatchMap = new ConcurrentHashMap<>();
    //线程池
    private final ThreadPoolExecutor threadPoolExecutor = 
            new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> {
        Thread thread = new Thread(runnable, "mqtt thread");
        return thread;
    });

2.HTTP API 调用的发送MQTT 消息数据的接口

    /**
     * HTTP API 调用的发送MQTT 消息数据的接口
     * 同步阻塞
     */
    public Integer send(Long packageId, String deviceId) throws Exception {
        ......
       FutureTask<Integer> futureTask = sendTask(publishDto));
       return futureTask.get()
    }

3.投递发送MQTT指令的task方法

   /**
     * 投递MQTT发送指令任务
     * 同步阻塞
     */ 
   private FutureTask<Integer> sendTask(PublishDto publishDto) throws Exception {
        FutureTask<Integer> futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto));
        threadPoolExecutor.execute(futureTask);
        //阻塞线程
        return futureTask;
    } 

4.封装CountDownLatch 和 Integer的对象,用于CountDownLatch阻塞控制和返回结果

    /**
     * 封装CountDownLatch 和 Integer
     * 用于CountDownLatch阻塞控制和返回结果
     */
    private class CountDownObj {
        private final CountDownLatch countDownLatch;
        private volatile Integer value;

        private CountDownObj(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        public CountDownLatch getCountDownLatch() {
            return countDownLatch;
        }

        public Integer getValue() {
            return value;
        }

        public void setValue(Integer value) {
            this.value = value;
        }
    }

5.具体发送MQTT数据的Callbale线程Task,会新建CountDownLatch,并通过CountDownLatch.await()方法阻塞,直到MQTT回调接收到数据或者超时。

    /**
     * 发送MQTT消息的任务Callable
     */
    private class GetDatapointValueCallable implements Callable<Integer> {
        private final PublishDto publishDto;

        GetDatapointValueCallable(PublishDto publishDto) {
            this.publishDto = publishDto;
        }

        @Override
        public Integer call() throws Exception {
            //mqtt client 发送数据,此处具体代码省略
            ......
            
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch));
            //阻塞,超时时间3s
            countDownLatch.await(3, TimeUnit.SECONDS);
            //返回mqtt指令对应的结果或者null
            return countDownLatchMap.remove(publishDto.getDeviceId()).getValue();
        }

    }

6.MQTT接收数据回调,这里通过deviceId从MAP里面取到CountDownObj,释放闭锁(结束callable线程的等待)和设置MQTT返回的结果(即callable中call()返回的结果,也就是FutureTask的get()方法返回的结果)。

    /**
     * MQTT 接收数据回调
     */
    void mqttReceiveCallback(String deviceId, String datapointId, String value) {
        ......
        
        //接收到数据后,通过闭锁释放阻塞的线程,同时设置结果返回给调用者
        CountDownObj countDownObj=countDownLatchMap.get(deviceId);
        if(countDownObj!=null) {
            countDownObj.setValue(Integer.parseInt(value));
            countDownObj.getCountDownLatch().countDown();
        }
        
        .......
    }
点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
3年前
Linux网络IO模型
同步和异步,阻塞和非阻塞_同步和异步_关注的是结果消息的通信机制同步:同步的意思就是调用方需要主动等待结果的返回异步:异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回调函数等。_阻塞和非阻塞_主要关注的是等待结果返回调用方的状态阻塞:是指
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Wesley13 Wesley13
3年前
JavaWeb 调用接口
JavaWeb 如何调用接口CreateTime2018年4月2日19:04:29Author:Marydon1.所需jar包!(https://oscimg.oschina.net/oscnet/0f139
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
雾凇闭包
雾凇闭包
Lv1
塞花飘客泪,边柳挂乡愁。
文章
4
粉丝
0
获赞
0