moquette改造笔记(二):优化BrokerInterceptor notifyTopicPublished()逻辑

泛型珊瑚
• 阅读 4007

发现问题

下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源码

@Override
    public void notifyClientConnected(final MqttConnectMessage msg) {
        for (final InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) {
            LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}",
                    msg.payload().clientIdentifier(), handler.getID());
            executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg)));
        }
    }

    @Override
    public void notifyClientDisconnected(final String clientID, final String username) {
        for (final InterceptHandler handler : this.handlers.get(InterceptDisconnectMessage.class)) {
            LOG.debug("Notifying MQTT client disconnection to interceptor. CId={}, username={}, interceptorId={}",
                clientID, username, handler.getID());
            executor.execute(() -> handler.onDisconnect(new InterceptDisconnectMessage(clientID, username)));
        }
    }

    @Override
    public void notifyClientConnectionLost(final String clientID, final String username) {
        for (final InterceptHandler handler : this.handlers.get(InterceptConnectionLostMessage.class)) {
            LOG.debug("Notifying unexpected MQTT client disconnection to interceptor CId={}, username={}, " +
                "interceptorId={}", clientID, username, handler.getID());
            executor.execute(() -> handler.onConnectionLost(new InterceptConnectionLostMessage(clientID, username)));
        }
    }

    @Override
    public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {
        msg.retain();

        executor.execute(() -> {
                try {
                    int messageId = msg.variableHeader().messageId();
                    String topic = msg.variableHeader().topicName();
                    for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
                        LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
                                + "interceptorId={}", clientID, messageId, topic, handler.getID());
                        handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
                    }
                } finally {
                    ReferenceCountUtil.release(msg);
                }
        });
    }

    @Override
    public void notifyTopicSubscribed(final Subscription sub, final String username) {
        for (final InterceptHandler handler : this.handlers.get(InterceptSubscribeMessage.class)) {
            LOG.debug("Notifying MQTT SUBSCRIBE message to interceptor. CId={}, topicFilter={}, interceptorId={}",
                sub.getClientId(), sub.getTopicFilter(), handler.getID());
            executor.execute(() -> handler.onSubscribe(new InterceptSubscribeMessage(sub, username)));
        }
    }

可以发现在除了notifyTopicPublished(),其它方法中,在for循环中每次for循环,对于InterceptHandler的调用都是在线程池中每次都是新执行一个任务,但是在notifyTopicPublished()方法中是在一个线程中for循环依次调用,这样处理首先没有用到线程池的多线程,其次是一旦某个InterceptHandler的notifyTopicPublished方法是阻塞的,那么后面的InterceptHandler的notifyTopicPublished()都会被阻塞。

优化逻辑

优化方向:向启动方法一样,每次调用InterceptHandler的notifyTopicPublished方法都是在线程池中新建一个任务


具体代码:

public class PublishTask implements Runnable {
        final MqttPublishMessage msg;
        final InterceptHandler interceptHandler;
        final String clientId;
        final String username;

        PublishTask(MqttPublishMessage msg, InterceptHandler interceptHandler, String clientId, String username) {
            this.msg = msg;
            this.interceptHandler = interceptHandler;
            this.clientId = clientId;
            this.username = username;
        }

        @Override
        public void run() {
            try {
                interceptHandler.onPublish(new InterceptPublishMessage(msg, clientId, username));
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }

        @Override
        public String toString() {
            return "PublishTask{" +
                    "msg=" + msg +
                    ", interceptHandler=" + interceptHandler +
                    ", clientId='" + clientId + '\'' +
                    ", username='" + username + '\'' +
                    '}';
        }

        public InterceptHandler getInterceptHandler() {
            return interceptHandler;
        }
    }


    @Override
    public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) {

        msg.retain();

        try {
            for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
                executor.execute(new PublishTask(msg.retainedDuplicate(), handler, clientID, username));
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }


//        executor.execute(() -> {
//                try {
//                    int messageId = msg.variableHeader().messageId();
//                    String topic = msg.variableHeader().topicName();
//                    for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
//                        LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
//                                + "interceptorId={}", clientID, messageId, topic, handler.getID());
//                        handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
//                    }
//                } finally {
//                    ReferenceCountUtil.release(msg);
//                }
//        });
    }

解释:
(1)新建一个PublishTask用来实现调用handler.onPublish()方法。其中要注意
new PublishTask(msg.retainedDuplicate(), handler, clientID, username)中的msg.retainedDuplicate()
还要主要在两个finally中ReferenceCountUtil.release(msg);

(2)PublishTask的toString()和getInterceptHandler()可以先不用管,会在其它地方用到,下一篇文章会讲到。
moquette改造笔记(三):优化BrokerInterceptor 中的线程池

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
3年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Wesley13 Wesley13
3年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这