使用Java代码搭建一个MQTT客户端

模式流星
• 阅读 1110

一个MQTT客户端可以做到:
1、发布其他客户端可能会订阅的信息
2、订阅其它客户端发布的消息
3、退订或删除应用程序的消息
4、断开与服务器连接

所以在使用Java代码进行搭建时,也要实现这几种功能。


在POM文件中导入坐标

<dependency>
    <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-mqtt</artifactId>
      <version>6.0.2</version>
</dependency>

MQTT客户端(发布者)

1、创建一个SendMQTT类,对MQTT客户端进行初始化操作,创建连接。

public class SendMqtt {
    public static MqttClient mqttClient = null;
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;


    static {
        init("happyfan");
    }


    public static  void init(String clientId) {
        //初始化连接设置对象
        mqttConnectOptions = new MqttConnectOptions();
        //初始化MqttClient
        if(null != mqttConnectOptions) {
//            true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
            mqttConnectOptions.setCleanSession(true);
//            设置连接超时
            mqttConnectOptions.setConnectionTimeout(30);
//            设置持久化方式
            memoryPersistence = new MemoryPersistence();
            if(null != memoryPersistence && null != clientId) {
                try {
                    mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }else {

            }
        }else {
            System.out.println("mqttConnectOptions对象为空");
        }
        System.out.println(mqttClient.isConnected());
        if(null != mqttClient) {
            if(!mqttClient.isConnected()) {
                try {
                    System.out.println("创建连接");
                    mqttClient.connect(mqttConnectOptions);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }else {
            System.out.println("mqttClient为空");
        }
        System.out.println(mqttClient.isConnected());
    }
}

2、实现与MQTT客户端断开连接以及重新连接的功能:

public class SendMqtt {
    public static MqttClient mqttClient = null;
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;


    static {
        init("happyfan");
    }


    public static  void init(String clientId) {
        //初始化连接设置对象
        mqttConnectOptions = new MqttConnectOptions();
        //初始化MqttClient
        if(null != mqttConnectOptions) {
//            true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
            mqttConnectOptions.setCleanSession(true);
//            设置连接超时
            mqttConnectOptions.setConnectionTimeout(30);
//            设置持久化方式
            memoryPersistence = new MemoryPersistence();
            if(null != memoryPersistence && null != clientId) {
                try {
                    mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }else {

            }
        }else {
            System.out.println("mqttConnectOptions对象为空");
        }
        System.out.println(mqttClient.isConnected());
        if(null != mqttClient) {
            if(!mqttClient.isConnected()) {
                try {
                    System.out.println("创建连接");
                    mqttClient.connect(mqttConnectOptions);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }else {
            System.out.println("mqttClient为空");
        }
        System.out.println(mqttClient.isConnected());
    }
    //与MQTT服务器断开连接
    public void closeConnect() {
        //关闭存储方式
        if(null != memoryPersistence) {
            try {
                memoryPersistence.close();
            } catch (MqttPersistenceException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }else {
            System.out.println("memoryPersistence is null");
        }

        //关闭连接
        if(null != mqttClient) {
            if(mqttClient.isConnected()) {
                try {
                    mqttClient.disconnect();
                    mqttClient.close();
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }else {
                System.out.println("mqttClient is not connect");
            }
        }else {
            System.out.println("mqttClient is null");
        }
    }
    //重新连接MQTT服务器
    public void reConnect() {
        if(null != mqttClient) {
            if(!mqttClient.isConnected()) {
                if(null != mqttConnectOptions) {
                    try {
                        mqttClient.connect(mqttConnectOptions);
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }else {
                    System.out.println("mqttConnectOptions is null");
                }
            }else {
                System.out.println("mqttClient is null or connect");
            }
        }else {
            init("happyfan");//其实就是重新回到初始化的方法中去尝试连接服务器
        }

    }
}

3、实现订阅、发布、清空主题:

public class SendMqtt {
    public static MqttClient mqttClient = null;
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;


    static {
        init("happyfan");
    }


    public static  void init(String clientId) {
        //初始化连接设置对象
        mqttConnectOptions = new MqttConnectOptions();
        //初始化MqttClient
        if(null != mqttConnectOptions) {
//            true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
            mqttConnectOptions.setCleanSession(true);
//            设置连接超时
            mqttConnectOptions.setConnectionTimeout(30);
//            设置持久化方式
            memoryPersistence = new MemoryPersistence();
            if(null != memoryPersistence && null != clientId) {
                try {
                    mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }else {

            }
        }else {
            System.out.println("mqttConnectOptions对象为空");
        }
        System.out.println(mqttClient.isConnected());
        if(null != mqttClient) {
            if(!mqttClient.isConnected()) {
                try {
                    System.out.println("创建连接");
                    mqttClient.connect(mqttConnectOptions);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }else {
            System.out.println("mqttClient为空");
        }
        System.out.println(mqttClient.isConnected());
    }

    public void closeConnect() {
        //关闭存储方式
        if(null != memoryPersistence) {
            try {
                memoryPersistence.close();
            } catch (MqttPersistenceException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }else {
            System.out.println("memoryPersistence is null");
        }

        //关闭连接
        if(null != mqttClient) {
            if(mqttClient.isConnected()) {
                try {
                    mqttClient.disconnect();
                    mqttClient.close();
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }else {
                System.out.println("mqttClient is not connect");
            }
        }else {
            System.out.println("mqttClient is null");
        }
    }
    
    //    重新连接
    public void reConnect() {
        if(null != mqttClient) {
            if(!mqttClient.isConnected()) {
                if(null != mqttConnectOptions) {
                    try {
                        mqttClient.connect(mqttConnectOptions);
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }else {
                    System.out.println("mqttConnectOptions is null");
                }
            }else {
                System.out.println("mqttClient is null or connect");
            }
        }else {
            init("happyfan");
        }

    }
    //    发布消息
    public void publishMessage(String pubTopic,String message,int qos) {
        if(null != mqttClient&& mqttClient.isConnected()) {
            System.out.println("发布消息   "+mqttClient.isConnected());
            System.out.println("id:"+mqttClient.getClientId());
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setPayload(message.getBytes());

            MqttTopic topic = mqttClient.getTopic(pubTopic);

            if(null != topic) {
                try {
                    MqttDeliveryToken publish = topic.publish(mqttMessage);
                    if(!publish.isComplete()) {
                        System.out.println("消息发布成功");
                    }
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

        }else {
            reConnect();
        }

    }
    //    订阅主题
    public void subTopic(String topic) {
        if(null != mqttClient&& mqttClient.isConnected()) {
            try {
                mqttClient.subscribe(topic, 1);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }else {
            System.out.println("mqttClient is error");
        }
    }


    //    清空主题
    public void cleanTopic(String topic) {
        if(null != mqttClient&& !mqttClient.isConnected()) {
            try {
                mqttClient.unsubscribe(topic);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }else {
            System.out.println("mqttClient is error");
        }
    }

}

在使用发布消息的功能时,只需要调用这个类中的publishMessage()方法即可。指定一个发布的主题78:21:84:B8:EB:0C(随便自己的),使用MQTTX尝试一下能不能接收到这个主题的消息(MQTTX也得订阅这个主题)。
使用Java代码搭建一个MQTT客户端
可以看到,MQTTX以及成功的接收到了这个消息,这样一个MQTT客户端(发布者)就搭建完成了。

MQTT客户端(订阅者)

1、创建一个MqttReceive类,对MQTT客户端进行初始化操作,并且实现接收消息的功能(把主题订阅上,创建回调函数):

@Service
public class MqttReceive {
    @Autowired
    private  MqttReceiveCallback mqttReceiveCallback;

    private static int QoS = 1;//通讯的质量,最高是2
    private static String Host = "tcp://127.0.0.1:1883";
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;
    private static MqttClient mqttClient  = null;

    public  void init(String clientId) {
        mqttConnectOptions = new MqttConnectOptions();
        memoryPersistence = new MemoryPersistence();

        if(null != memoryPersistence && null != clientId && null != Host) {
            try {
                mqttClient = new MqttClient(Host, clientId, memoryPersistence);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        if(null != mqttConnectOptions) {
            mqttConnectOptions.setCleanSession(true);
            mqttConnectOptions.setConnectionTimeout(30);
            mqttConnectOptions.setKeepAliveInterval(45);
            if(null != mqttClient && !mqttClient.isConnected()) {
                //这里可以自己new一个回调函数,比如new MqttReceiveCallback()。我这里使用自动装配,让Spring容器来管理bean与bean的依赖
               mqttClient.setCallback(mqttReceiveCallback);
                try {
                    System.out.println(mqttReceiveCallback);
                    System.out.println("尝试连接");
                    mqttClient.connect();
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    public  void receive(String topic) {
        int[] Qos = {QoS};
        String[] topics = {topic};

        if(null != mqttClient && mqttClient.isConnected()) {
            if(null!=topics && null!=Qos && topics.length>0 && Qos.length>0) {
                try {
                    System.out.println("订阅主题");

                    mqttClient.subscribe(topics, Qos);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }else {
            System.out.println("初始化");
            init("HAPPYFAN");
            receive(topic);
        }
    }
}

在使用时,只要调用这个类中的 receive(String topic)方法即可。当MQTT客户端接收到订阅的主题(topic)的消息时,传输的信息就会到回调函数中(就是在MqttReceive类中,自己new的那个回调函数,要确保一致类名和new出来的回调函数一致)。
2、创建回调函数

@Service
public class MqttReceiveCallback implements MqttCallback {

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        System.out.println("Client 接收消息主题 : " + topic);
        System.out.println("Client 接收消息Qos : " + message.getQos());
        System.out.println("Client 接收消息内容 : " + msg);
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
    }
    @Override
    public void connectionLost(Throwable cause) {
    }

}

调用receive(String topic)方法,指定一个接收消息的主题(也是随自己的,我这里写的就是receive),尝试能不能接收到这个主题的信息。
使用Java代码搭建一个MQTT客户端
可以看到,再idea控制台中成功的打印出了接收主题的消息、质量、以及内容。一个MQTT客户端(订阅者)就搭建完成了。

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java mqtt服务器搭建
MQTT服务器搭建以及客户端代码编写服务器关于linux系统,可以在阿里云购买云服务器或者利用虚拟机安装CentOs系统。我用的就是阿里云的云服务器,比较方便安装Emqx服务器安装必要的依赖:$sudoyuminstallyyumutilsdevicem
Wesley13 Wesley13
3年前
activeMQ+MQTT实现点对点发送消息
问题的提出:最近在做若干安卓设备(共享项目使用的硬件)和服务器通信实现MQTT消息的的接收。由于MQTT的限制(注意:不管你用的是paho的库还是其他任何MQTT的库都一样,这是MQTT协议的限制。)而无法实现服务器只给某一台机器(根据机器的IMEI号)发送消息。一开始使用的方法,就是服务器只管群发(消息体里会带一个终端ID字段信息),安卓端收到消息后,
Wesley13 Wesley13
3年前
MQTT实战之MQTT入门
MQTT入门介绍(一)一.MQTT简述MQTT(MessageQueuingTelemetryTransport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在
Wesley13 Wesley13
3年前
Java使用easymqtt4j快速开发工业级mqtt企业级应用
Java使用easymqtt4j快速开发工业级mqtt企业级应用easymqtt4j,nettymqttsubscriberpublisherbrokerclusterserverforjavaeasymqtt4j特点:1、springintegration集成模式,自由灵活。2、完全支持mqtt3.1、3.
Stella981 Stella981
3年前
Redis 发布订阅
Redis发布订阅Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。Redis客户端可以订阅任意数量的频道。下图展示了频道channel1,以及订阅这个频道的三个客户端——client2、client5和client1之间的关系:!(
Wesley13 Wesley13
3年前
MQTT初始篇笔记整理
MQTT简介MQTT(MessageQueuingTelemetryTransport,消息队列遥测传输),基于TCP/IP协议栈而构建,虽然叫消息队列遥测传输,但是她与消息队列毫无关系,她是一个IBM开发的客户端服务端架构的发布/订阅模式的消息传输协议;她的设计思想是轻巧、开放、简单、规范、易于实现,因此MQTT比较
Stella981 Stella981
3年前
MQTT消息协议、服务器及其客户端
    MQTT是一个轻量级的消息协议。从2014年12月IOIT大会上得到的消息,该协议已经被OASIS标准组织接收,成立了专门的工作组,以意味着该规范正式走向了标准化之路。    目前MQTT的标准组织官网:http://www.mqtt.org(https://www.oschina.net/action/GoToLink?urlhttp%3
Stella981 Stella981
3年前
Redis发布订阅
简介发布者和订阅者都是Redis客户端。发布者可以发消息到任意多个频道上,订阅者可以订阅任意多个频道。订阅命令subscribe channel\channel...\:返回的数字1,2表示订阅的第n个频道,频道返回的消息类型是message127.0.0.1:6379subscribeeduca
Stella981 Stella981
3年前
Spring Boot+Socket实现与html页面的长连接,客户端给服务器端发消息,服务器给客户端轮询发送消息,附案例源码
功能介绍1.客户端给所有在线用户发送消息2.客户端给指定在线用户发送消息3.服务器给客户端发送消息(轮询方式)注意:socket只是实现一些简单的功能,具体的还需根据自身情况,代码稍微改造下项目搭建项目结构图!(https://img2020.cnblogs.com/blog/15044
Wesley13 Wesley13
3年前
Java Socket基本例子——使用最原始方法
使用最原始的java.net.ServerSocket和java.net.Socket进行socket通信。实现的效果为:客户端向服务端发送消息、服务端向客户端发送消息、保留统计客户端的信息列表、剔除已经断开的客户端等。本文所有代码均可在https://gitee.com/songxinqiang/JavaSocketDemo(https://git
GeorgeGcs GeorgeGcs
1个月前
【HarmonyOS 5】鸿蒙中如何使用MQTT
鸿蒙开发能力HarmonyOSSDK应用服务鸿蒙金融类应用(金融理财一、MQTT是什么?MQTT(MessageQueuingTelemetryTransport,消息队列遥测传输)是一种轻量级、基于发布/订阅(Publish/Subscribe)模式的即