EMQX+HStreamDB 实现物联网流数据高效持久化

CAP理论
• 阅读 731

EMQX+HStreamDB 实现物联网流数据高效持久化

在 IoT 场景中,通常面临设备数量庞大、数据产生速率高、累积数据量巨大等挑战。因此,如何接入、存储和处理这些海量设备数据就成为了一个关键的问题。

EMQX 作为一款强大的物联网 MQTT 消息服务器,单个集群可处理上亿设备连接,同时提供了丰富的数据集成功能。HStreamDB 作为一款分布式流数据库,不仅可以高效存储来自 EMQX 的海量设备数据,而且提供实时处理分析能力。EMQX 与 HStreamDB 都具备高可扩展性和可靠性,两者结合不仅能够满足大规模 IoT 应用的性能和稳定性需求,同时能够提升应用的实时性。

EMQX+HStreamDB 实现物联网流数据高效持久化

近期 EMQX Enterprise 4.4.15 发布,更新了对 HStreamDB 最新版本的支持,本文将具体介绍如何通过 EMQX 规则引擎将数据持久化到 HStreamDB,实现 MQTT 数据流的存储与实时处理。

:本文介绍的集成步骤基于 EMQX 4.4.15 和 HStreamDB 0.14.0 以上版本。

连接到 HStreamDB 集群

在下面的教程中,我们假设有一个正在运行的 EMQX Enterprise 集群和正在运行的 HStreamDB 集群。如需部署 EMQX Enterprise 集群,请参考 EMQX Enterprise docs。如需部署 HStreamDB 集群,请参考 HStreamDB docs,其中包含关于如何用 Docker 快速部署的说明。

我们可以通过 Docker 来部署 HStreamDB 客户端并连接到 HStreamDB 集群:

# 获取帮助信息
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream --help

我们在此使用 hstream stream 命令创建一个 stream,供接下来的示例使用:

# 使用 hstream stream 命令创建 streams
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream stream create basic_condition_info_0 -r 3 -b $(( 7 * 24 * 60 * 60 ))

接下来,连接到 HStreamDB 集群,启动交互式 HStream SQL shell:

docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql --service-url "<<YOUR-SERVICE-URL>>"
# 如果要使用安全连接,还需要填写 --tls-ca, --tls-key, --tls-cert 参数

如果连接成功,将会出现

      __  _________________  _________    __  ___
     / / / / ___/_  __/ __ \/ ____/   |  /  |/  /
    / /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
   / __  /___/ // / / _, _/ /___/ ___ |/ /  / /
  /_/ /_//____//_/ /_/ |_/_____/_/  |_/_/  /_/

Command
  :h                           To show these help info
  :q                           To exit command line interface
  :help [sql_operation]        To show full usage of sql statement

SQL STATEMENTS:
  To create a simplest stream:
    CREATE STREAM stream_name;
  To create a query select all fields from a stream:
    SELECT * FROM stream_name EMIT CHANGES;
  To insert values to a stream:
    INSERT INTO stream_name (field1, field2) VALUES (1, 2);

可以使用 show streams; 来查看已经创建的 streams 的信息:

> show streams;
+-------------------------------------------+---------+----------------+-------------+
| Stream Name                               | Replica | Retention Time | Shard Count |
+-------------------------------------------+---------+----------------+-------------+
| basic_condition_info_0                    | 3       | 604800 seconds | 1           |
+-------------------------------------------+---------+----------------+-------------+

创建 HStreamDB 资源

在利用 EMQX 规则引擎将数据持久化到 HStreamDB 之前,需要创建一个 HStreamDB 资源。

为此,请访问 EMQX Dashboard,单击 规则引擎 -> 资源创建 ,选择 HStreamDB 资源,输入 HStreamDB 地址并填写必要的选项。可用选项如下表:

EMQX+HStreamDB 实现物联网流数据高效持久化

在选择开启 SSL 时,会出现额外的 SSL 配置界面,可以粘贴所需配置内容或上传文件。

EMQX+HStreamDB 实现物联网流数据高效持久化
EMQX+HStreamDB 实现物联网流数据高效持久化

创建数据持久化到 HStreamDB 的规则

点击 规则引擎 -> 规则 -> 创建

EMQX+HStreamDB 实现物联网流数据高效持久化

编辑 SQL 规则并添加操作,您可以在字符串模板中使用 SQL 变量。

请注意,本文档中介绍的 SQL 规则仅供演示,实际的 SQL 应根据业务设计进行编写。

单击 添加操作,选择「数据持久化」以将数据保存到 HStreamDB 中。选择上一步创建的资源并输入参数。可用参数如下表:

EMQX+HStreamDB 实现物联网流数据高效持久化
EMQX+HStreamDB 实现物联网流数据高效持久化

点击 确定 来确认添加行为。

EMQX+HStreamDB 实现物联网流数据高效持久化

在 HStream SQL Shell 中获取实时的数据更新

从 EMQX 规则引擎持久化到 HStreamDB 的数据可以使用 HStream SQL Shell 实时读出新写入 stream 的内容。现在,数据已经被写入 HStreamDB,可以使用任何消费方式来消费消息。文档使用了一个简单的消费方法:使用 HStream SQL shell 进行查询。此外,读者可以自由选择使用自己喜欢的编程语言 SDK 编写消费端。

# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;

当前的 select 查询没有结果可供打印出,这是因为还没有数据通过 EMQX 的规则引擎向 HStreamDB 写入。一旦有数据写入,便可以在 HStream SQL shell 观察到数据的即时更新。目前在 HStreamDB 使用 SQL 对 streams 做查询,只会打印出创建查询后的结果。如果在 EMQX 停止向 HStreamDB 写入后创建查询,可能观察不到产生的结果。

向 EMQX 写入消息测试规则引擎

可以使用跨平台的桌面客户端 MQTT X 来连接到 EMQX 并发送消息:

EMQX+HStreamDB 实现物联网流数据高效持久化

从 EMQX Dashboard 获取规则引擎的运行数据指标

访问对应的规则引擎界面:

EMQX+HStreamDB 实现物联网流数据高效持久化

如果规则引擎运行数据指标正常,则代表 EMQX 会将数据持久化到 HStreamDB。一旦写入成功,便可以在前面步骤启动的 HStream SQL Shell 中看到实时的数据更新。

# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;
{"current-number-of-people":247.0,"device-health":true,"number-of-people-in-line":14.0,"submitter":"admin-07","temperature":27.0}
{"current-number-of-people":220.0,"device-health":true,"number-of-people-in-line":13.0,"submitter":"admin-07","temperature":27.2}
{"current-number-of-people":135.0,"device-health":true,"number-of-people-in-line":2.0,"submitter":"admin-01","temperature":26.9}
{"current-number-of-people":137.0,"device-health":true,"number-of-people-in-line":0.0,"submitter":"admin-01","temperature":26.9}

结语

至此,我们就完成了通过 EMQX 规则引擎将数据持久化到 HStreamDB 的主要流程。

将 EMQX 采集到的数据存储到 HStreamDB 后,可以对这些数据进行实时处理与分析,为上层 AI、大数据等应用提供支撑,进一步发掘和利用数据价值。作为首个专为流数据设计的云原生流数据库,HStreamDB 与 EMQX 结合可以实现一站式存储和实时处理海量物联网数据,精简物联网应用数据栈,加速企业的物联网应用开发。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/integration-practice-of-emqx-and-hstreamdb

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
4年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
4年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Wesley13 Wesley13
4年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
4年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Python进阶者 Python进阶者
2年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
CAP理论
CAP理论
Lv1
黑发不知勤学早,白首方悔读书迟。
文章
3
粉丝
0
获赞
0