EMQ X + IoTDB:存储 MQTT 消息到时序数据库

索超
• 阅读 2561

IoTDB 是最早由清华大学发起的开源时序数据库项目,现已经是 Apache 的顶级项目。IoTDB 可以为用户提供数据收集、存储和分析等服务。由于其轻量级架构、高性能和高可用的特性,以及与 Hadoop 和 Spark 生态的无缝集成,满足了工业 IoT 领域中海量数据存储、高吞吐量数据写入和复杂数据查询分析的需求。

EMQ X 是一个大规模扩展、可弹性伸缩的开源云原生分布式物联网消息中间件,由开源物联网数据基础设施软件供应商 EMQ 映云科技发布。EMQ X 可以高效可靠地处理海量物联网设备的并发连接,并且内置了强大的规则引擎功能,用以对事件和消息流数据进行高性能地实时处理。规则引擎通过 SQL 语句提供了灵活的 "配置式" 的业务集成方案,简化了业务开发流程,提升了易用性,降低了用户的业务逻辑与 EMQ X 的耦合度。

本文将介绍如何使用 EMQ X 规则引擎的 MQTT 数据桥接功能,接收 MQTT 客户端发送的数据,并实时插入到时序数据库 IoTDB。

准备工作

本文示例中用到的软件和环境:

IoTDB 安装

首先我们需要从 IoTDB 官方页面下载 IoTDB Server(单机版)的二进制包。

下载完成之后解压,进入解压后的目录:

% ls
LICENSE         README.md       RELEASE_NOTES.md data             ext             licenses         sbin
NOTICE           README_ZH.md     conf             docs             lib             logs             tools

要启用 IoTDB 的 MQTT 协议支持,需要改动 IoTDB 的配置文件 conf/iotdb-engine.properties

*后续建模使用了一个存储组 root.sg,为了增加写入并行度,需要同时将 iotdb-engine.properties 中的 virtual_storage_group_num 设置为机器核数。
####################
### MQTT Broker Configuration
####################

# whether to enable the mqtt service.
enable_mqtt_service=true

# the mqtt service binding host.
mqtt_host=0.0.0.0

# the mqtt service binding port.
mqtt_port=2883

# the handler pool size for handing the mqtt messages.
mqtt_handler_pool_size=1

# the mqtt message payload formatter.
mqtt_payload_formatter=json

# max length of mqtt message in byte
mqtt_max_message_size=1048576

其中 enable_mqtt_service 默认为 false,需要改成 truemqtt_port 默认值是 1883,为了避免与 emqx 的端口号冲突,需要改为 2883。

然后使用 ./sbin/start-server.sh 启动 IoTDB 服务端:

% ./sbin/start-server.sh
---------------------
Starting IoTDB
---------------------
Maximum memory allocation pool = 2048MB, initial memory allocation pool = 512MB
If you want to change this configuration, please check conf/iotdb-env.sh(Unix or OS X, if you use Windows, check conf/iotdb-env.bat).
2022-01-10 14:15:31,914 [main] INFO o.a.i.d.c.IoTDBDescriptor:121 - Start to read config file file:./sbin/../conf/iotdb-engine.properties
...
2022-01-10 14:14:28,690 [main] INFO o.a.i.d.s.UpgradeSevice:73 - Upgrade service stopped
2022-01-10 14:14:28,690 [main] INFO o.a.i.db.service.IoTDB:153 - Congratulation, IoTDB is set up successfully. Now, enjoy yourself!
2022-01-10 14:14:28,690 [main] INFO o.a.i.db.service.IoTDB:101 - IoTDB has started

我们保持这个终端窗口不动,另外打开一个新的命令行终端窗口,启动 IoTDB 的 shell 工具:

% ./sbin/start-cli.sh
---------------------
Starting IoTDB Cli
---------------------
_____       _________ ______   ______
|_   _|     | _   _ ||_   _ `.|_   _ \
| |   .--.|_/ | | \_| | | `. \ | |_) |
| | / .'`\ \ | |     | | | | | __'.
_| |_| \__. | _| |_   _| |_.' /_| |__) |
|_____|'.__.' |_____| |______.'|_______/ version 0.12.4


IoTDB> login successfully
IoTDB>

至此 IoTDB 环境就准备好了。如要了解 IoTDB 的基本使用方法,可以参考官网的快速上手页面

安装、配置 EMQ X

下载和启动 EMQ X

我们直接使用命令行下载 macOS 版本的 EMQ X 开源版,更多安装包请访问 EMQ X 开源版下载页面

% wget https://www.emqx.com/en/downloads/broker/4.3.11/emqx-macos-4.3.11-amd64.zip

然后解压并启动 EMQ X:

% unzip -q emqx-macos-4.3.11-amd64.zip
% cd emqx
% ./bin/emqx console

log.to = "console"
Erlang/OTP 23 [erts-11.1.8] [emqx] [64-bit] [smp:8:8] [ds:8:8:8] [async-threads:4] [hipe]
Starting emqx on node emqx@127.0.0.1
Start mqtt:tcp:internal listener on 127.0.0.1:11883 successfully.
Start mqtt:tcp:external listener on 0.0.0.0:1883 successfully.
Start mqtt:ws:external listener on 0.0.0.0:8083 successfully.
Start mqtt:ssl:external listener on 0.0.0.0:8883 successfully.
Start mqtt:wss:external listener on 0.0.0.0:8084 successfully.
Start http:management listener on 8081 successfully.
Start http:dashboard listener on 18083 successfully.
EMQ X Broker 4.3.11 is running now!
Eshell V11.1.8 (abort with ^G)
(emqx@127.0.0.1)1>

配置规则

使用浏览器打开 EMQ X Dashboard,在规则引擎页面创建一条规则:

EMQ X + IoTDB:存储 MQTT 消息到时序数据库

SQL 语句为:

SELECT
    clientid,
    now_timestamp('millisecond') as now_ts_ms,
    payload.bar as bar
FROM
    "t/#"

然后我们在页面的底部,给规则加一个 "桥接数据到 MQTT Broker" 动作:

EMQ X + IoTDB:存储 MQTT 消息到时序数据库

这个动作需要关联一个资源,我们点击右上角的 “新建资源” 来创建一个 MQTT Bridge 资源:

EMQ X + IoTDB:存储 MQTT 消息到时序数据库

远程 Broker 地址要填写 IoTDB 的 MQTT 服务地址,即 "127.0.0.1:2883"。客户端 Id、用户名、密码都填写 root,因为 root 是 IoTDB 默认的用户名和密码。

其他选项保持默认值不变,点击 ”测试连接“ 按钮确保配置无误,然后再点击右下角的 ”新建“ 按钮创建资源。

现在返回到动作创建页面,关联资源的下拉框里自动填充了我们刚才创建的资源。

现在我们继续填写更多的动作参数:

EMQ X + IoTDB:存储 MQTT 消息到时序数据库

IoTDB 不关心消息主题,我们填一个任意的主题:foo

IoTDB 要求消息内容是一个 JSON 格式,消息内容模板可以按照上图中样式填写。详情请参见 IoTDB 的通信服务协议文档

{
 "device": "root.sg.${clientid}",
 "timestamp": ${now_ts_ms},
 "measurements": [
   "bar"
 ],
 "values": [
   ${bar}
 ]
}

注意其中的 "${clientid}", "${now_ts_ms}" 以及 "${bar}" 都是从规则的 SQL 语句的输出中提取的变量,所以必须保证这些变量跟 SQL 语句的 SELECT 字句对应上。

现在可以点击 ”确认“ 保存动作配置,然后再次点击 ”新建“ 完成规则的创建。

使用 MQTT Client 发送消息

接下来我们使用 MQTT 客户端工具 - MQTT X,来发送一条消息给 EMQ X:

MQTT X 是 EMQ 发布的一款完全开源的 MQTT 5.0 跨平台桌面客户端。支持快速创建多个同时在线的 MQTT 客户端连接,方便测试 MQTT/TCP、MQTT/TLS、MQTT/WebSocket 的连接、发布、订阅功能及其他 MQTT 协议特性。

EMQ X + IoTDB:存储 MQTT 消息到时序数据库

MQTT 客户端的连接参数里面,我们只需要填一个参数,Client ID:"abc",其他的保持默认值不变。

连接成功之后,我们发送 2 条主题为:"t/1" 的消息,消息内容格式为:

{
 "bar": 0.2
}

然后回到 EMQ X Dashboard 的规则引擎页面,观察规则的命中次数,确认规则被触发了 2 次:

EMQ X + IoTDB:存储 MQTT 消息到时序数据库

最后我们回到命令行终端的 IoTDB 客户端窗口,使用下面的 SQL 语句查询数据:

IoTDB> SHOW TIMESERIES root.sg.abc
+---------------+-----+-------------+--------+--------+-----------+----+----------+
|     timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
+---------------+-----+-------------+--------+--------+-----------+----+----------+
|root.sg.abc.bar| null|     root.sg|   FLOAT| GORILLA|     SNAPPY|null|      null|
+---------------+-----+-------------+--------+--------+-----------+----+----------+
Total line number = 1
It costs 0.006s

IoTDB> SELECT * FROM root.sg.abc
+-----------------------------+---------------+
|                         Time|root.sg.abc.bar|
+-----------------------------+---------------+
|2022-01-10T17:39:41.724+08:00|            0.3|
|2022-01-10T17:40:32.805+08:00|            0.2|
+-----------------------------+---------------+
Total line number = 2
It costs 0.007s
IoTDB>

数据插入成功!

结语

至此,我们完成了通过 EMQ X 规则引擎功能将消息持久化到 IoTDB 时序数据库。

在实际生产场景中,我们可以使用 EMQ X 处理海量的物联网设备并发连接,并通过规则引擎灵活地处理业务功能,然后将设备发送的消息持久化到 IoTDB 数据库,最后使用 Hadoop/Spark、Flink 或 Grafana 等对接 IoTDB 实现大数据分析、可视化展示等。

EMQ X + IoTDB 的组合是一个简洁、高效且易扩展、高可用的服务端集成方案,对于物联网设备管理和数据处理场景来说,是一个不错的选择。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Stella981 Stella981
3年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
索超
索超
Lv1
万里归船弄长笛,此心吾与白鸥盟。
文章
2
粉丝
0
获赞
0