应用YoMo开发一个噪声传感器采集监控系统

樊建
• 阅读 2382

前言

这个例子描述 YoMo 在工业互联网数据采集中的应用,以收集噪声传感器的数据为例,涉及数据收集/处理/工作流/数据展示的全过程,为了便于体验运行效果,还会对其进行容器化,并通过docker快速部署体验版。

述语

  • xxx-source: 表示一个数据源收集程序
  • xxx-zipper: 表示一个工作流和控制平面
  • xxx-flow: 表示一个工作流单元,用于实际的业务逻辑处理,被zipper调度。
  • xxx-sink: 表示一个数据的传送目的地,一般落地数据库或者传递给下一级代理,被zipper调度。

架构

应用YoMo开发一个噪声传感器采集监控系统

从图中可见,区分了边缘端和云端两个独立区域,区域之间是通过弱网或者互联网连接,这里简单介绍一下各个服务:

  • 边缘端部署了传感器设备(Noise)和数据收集网关(ZLAN),网关会定时向设备请求状态数据,并转换为MQTT协议数据发送给noise-source收集器,source起到转换编码并与YoMo工作流引擎zipper建立连接的作用。关于传感器设备和数据收集网关的硬件选购和配置可以参与这篇文章:https://yomo.run/zh/aiot
  • 对于不想购买硬件设备的开发者,这里也提供了一个noise-emitter模拟器用来产生噪声数据。
  • zipper是一个强大的工作流引擎,通过编排(workflow.yaml)可以调度多个flow和sink,让他们以流的方式把业务逻辑串联起来,以满足复杂的需求。与之相连的所有通信和编解码均以QUIC+Y3进行,提供可靠实时的流式处理,全程体验流式编程的乐趣。
  • noise-flow 实现把噪音值除以10的简单处理,并且监控如果超过一定阀值后输出日志进行警报。
  • noise-sink 没有真的输出到数据库,而是通过搭建一个WebSocket服务器,把实时的噪音状态输出给任意的网页进行展示消费。
  • noise-web 是一个消费WebSocket的网页服务,他部署在哪里都可以,只要能访问到noise-sink提供的WebSocket服务地址即可,这里我们假设部署回边缘端也是没有问题的。

代码

下表提供了案例的全部代码,供感兴趣的朋友查看,参照这个案体的代码,可以轻松开发出类拟场景的案例。

项目地址说明
noise-sourceyomo-source-noise-example收集MQTT消息格式的噪音数据
noise-zipperyomo-zipper-noise-example编排本案体的工作流和数据流向
noise-flowyomo-flow-noise-example对噪音数据进行预处理和警报
noise-sinkyomo-sink-socketio-server-example提供WebSocket服务用于数据展示
noise-webyomo-sink-socket-io-example消费WebSocket服务展示噪音状态
noise-emitteryomo-source-noise-emitter-example模拟产生噪声数据
quic-mqttyomo-source-mqtt-starter开发xxx-source的通用组件

容器化部署

通过下载上节的项目代码可以进行本地原生部署,体验YoMo开发的乐趣,但是对于想急于马上看到效果的朋友来说,更爽的方式当然是先快速运行起来看看效果,所以对上节的项目也做了容器化处理,每个项目的根目录均提供了Dockerfile文件,并且在hub.docker.com提供了官方镜像下载:

项目镜像地址最新版本
noise-sourceyomorun/noise-sourceyomorun/noise-source:latest
noise-zipperyomorun/noise-zipperyomorun/noise-zipper:latest
noise-flowyomorun/noise-flowyomorun/noise-flow:latest
noise-sinkyomorun/noise-sinkyomorun/noise-sink:latest
noise-webyomorun/noise-webyomorun/noise-web:latest
noise-emitteryomorun/noise-emitteryomorun/noise-emitter:latest
quic-mqttyomorun/quic-mqttyomorun/quic-mqtt:latest

yomorun/quic-mqtt:latest 是开发xxx-source的基础镜像,可以快速打包自定义代码,但本案例中可以暂时忽略。

快速部署

为了快速运行体验运行效果,本小节描述如何整体容器化在同一宿主机上运行。

快速运行

有了上述的官方镜像就简单多了,只需简单的步骤就可以体验效果了:

应用YoMo开发一个噪声传感器采集监控系统

注意事项:

  • 这里Delay的值可能不很准确,因为是通过Docker容器部署,各个容器的时钟并不对齐得那么完美,如果要查看到最精确的延时值,需要把noise-source和noise-web原生部署到同一个宿主机上。
  • 如果部署不是在本地,则需要修改docker-compose.yml文件中的环境变量SOCKET\_SERVER\_ADDR为你部署服务的宿主机地址。

查看状态

通过 docker-compose ps查看服务状态

    Name                   Command               State                Ports              
-----------------------------------------------------------------------------------------
noise-emitter   sh -c go run main.go             Up                                      
noise-flow      sh -c yomo run app.go -p 4242    Up      4242/udp                        
noise-sink      sh -c go run main.go             Up      4141/udp, 0.0.0.0:8000->8000/tcp
noise-source    sh -c go run main.go             Up      1883/tcp                        
noise-web       ./docker-entrypoint.sh yar ...   Up      0.0.0.0:3000->3000/tcp          
noise-zipper    sh -c yomo wf run workflow ...   Up      9999/udp    
  • noise-sink 暴露了8000的WebSocket端口提给noise-web展示消费。
  • noise-web 暴露了3000的http端口用于展示实时的噪声值和延时。
  • noise-zipper/noise-flow/noise-sink 均提供了udp端口的QUIC服务,全流程QUIC通信。
  • noise-source 是我们对接不同设备的关键,提供1883的MQTT端口,当然也可以修改的。

查看日志

  • 查看noise-emitter docker-compose logs -f noise-emitter

    noise-emitter    | 2021-04-26 10:11:03: Publish counter=12438, topic=NOISE, payload={"noise":12438}
    noise-emitter    | 2021-04-26 10:11:04: Publish counter=12439, topic=NOISE, payload={"noise":12439}
    noise-emitter    | 2021-04-26 10:11:05: Publish counter=12440, topic=NOISE, payload={"noise":12440}
    noise-emitter    | 2021-04-26 10:11:06: Publish counter=12441, topic=NOISE, payload={"noise":12441}
    

这个模拟发生器产生了MQTT数据:主题是`NOISE`, 值是不断递增的序号(JSON格式)。
  • 查看noise-source docker-compose logs -f noise-source

    noise-source     | 2021/04/26 15:27:32 receive: topic=NOISE, payload={"noise":2638}
    noise-source     | 2021/04/26 15:27:32 write: sendingBuf=[]byte{0x81, 0x1b, 0x90, 0x19, 0x11, 0x3, 0x45, 0x24, 0xe0, 0x12, 0x6, 0xaf, 0x90, 0xe8, 0xce, 0x83, 0x1c, 0x13, 0xa, 0x31, 0x37, 0x32, 0x2e, 0x31, 0x39, 0x2e, 0x30, 0x2e, 0x36}
    noise-source     | 2021/04/26 15:27:33 receive: topic=NOISE, payload={"noise":2639}
    noise-source     | 2021/04/26 15:27:33 write: sendingBuf=[]byte{0x81, 0x1b, 0x90, 0x19, 0x11, 0x3, 0x45, 0x24, 0xf0, 0x12, 0x6, 0xaf, 0x90, 0xe8, 0xce, 0x8b, 0x5, 0x13, 0xa, 0x31, 0x37, 0x32, 0x2e, 0x31, 0x39, 0x2e, 0x30, 0x2e, 0x36}
    

-   receive: 表示source收到的MQTT数据。
-   write: 表示把经过Y3转码的字节码向noise-zipper工作流引擎发送。
  • 查看noise-zipper docker-compose logs -f noise-zipper

    noise-zipper     | 2021/04/26 14:39:28 Found 1 flows in zipper config
    noise-zipper     | 2021/04/26 14:39:28 Flow 1: Noise Serverless on noise-flow:4242
    noise-zipper     | 2021/04/26 14:39:28 Found 1 sinks in zipper config
    noise-zipper     | 2021/04/26 14:39:28 Sink 1: Socket.io Server on noise-sink:4141
    noise-zipper     | 2021/04/26 14:39:28 Running YoMo workflow...
    noise-zipper     | 2021/04/26 14:39:28 ✅ Listening on 0.0.0.0:9999
    noise-zipper     | 2021/04/26 14:43:32 ✅ Connect to Noise Serverless (noise-flow:4242) successfully.
    noise-zipper     | 2021/04/26 14:44:33 ✅ Connect to Socket.io Server (noise-sink:4141) successfully.
    

工作流引擎连接上了noise-flow和noise-sink这两个工作流的处理单元了。
  • 查看noise-flow docker-compose logs -f noise-flow

    noise-flow       | ❗ value: 561.700012 reaches the threshold 16! 45.700012
    noise-flow       | [172.19.0.6] 1619425035923 > value: 561.799988 ⚡️=0ms
    noise-flow       | ❗ value: 561.799988 reaches the threshold 16! 45.799988
    noise-flow       | [172.19.0.6] 1619425036923 > value: 561.900024 ⚡️=1ms
    

噪声数据是模拟器产生的,远远超过了预设的阀值,打印出警告信息。

分区部署

经过上一小节的快速部署应该了解如何在同一台宿主机上进行容器化部署并查看各个服务的状态,但与我们的开始提出的云端与边缘分离的实际架构图不符,现实场景下flow/sink这类Serverless的服务会部署在云端,而数据接收器source会部署在边缘,那本节就来分离他们看看如果编排部署。

云端部署

部署服务

目标是把 noise-zipper / noise-flow / noise-sink 部署在云端,可以查看下面的配置文件和运行步骤:

查看状态

通过 docker-compose -f docker-compose-cloud.yml ps查看服务状态

    Name                  Command               State                Ports              
----------------------------------------------------------------------------------------
noise-flow     sh -c yomo run app.go -p 4242    Up      4242/udp                        
noise-sink     sh -c go run main.go             Up      4141/udp, 0.0.0.0:8000->8000/tcp
noise-zipper   sh -c yomo wf run workflow ...   Up      0.0.0.0:9999->9999/udp   
  • noise-zipper 暴露了云端工作流引擎的服务端口,因为通信是QUIC协议,所以可见是一个udp端口。
  • noise-sink 暴露了通过WebSocket数据消费的端口,让其它Web服务展示消费。

边端部署

部署服务

目标是把 noise-source / noise-web / noise-emitter 部署在边缘端,可以查看下面的配置文件和运行步骤:

注意事项:

  • Docker-compose-edge.yml中的SOCKET\_SERVER\_ADDR变量需要设置为cloud端的source-sink暴露的地址和端口(默认8000)。

查看状态

通过 docker-compose -f docker-compose-edge.yml ps查看服务状态

    Name                   Command               State           Ports         
-------------------------------------------------------------------------------
noise-emitter   sh -c go run main.go             Up                            
noise-source    sh -c go run main.go             Up      1883/tcp              
noise-web       docker-entrypoint.sh sh -c ...   Up      0.0.0.0:3000->3000/tcp
  • noise-web 暴露了效果展示网站的端口。通(过访问 http://localhost:3000/ 可)以查看到之前相同的展示界面。

路由器部署

到了这里就结束了?还没有!实际上边缘部署的情况要比云端复杂,因为边缘设备很多是老旧,不一定支持Docker容器,这时你可以选择通过源代码编译成不同平台的二进制运行文件,直接就把noise-source跑在对应的平台上。当然,更方便的选择是购买一台支持Docker容器的路由器,直接就可以通过容器部署在边缘端了,目前YoMo与iKuai达成深度合作,携手推进工业互联网在各个领域的应用与发展,可以通过下面链接查看详细的信息:

引用参考

上面就是这个噪声传感器案例从数据收集处理到展示的所有代码和部署过程了。对于需要扩展或者引申到别的应用场景的开发者,可以点开每个项目的链接进行详细阅读,每个项目都是微服务化构建,服务角色明确,代码清晰易懂,如果有什么问题欢迎提出Issues或者讨论。参考链接:

最后,补充一张带有各个服务可暴露端口号的架构图,以便碰到困难的朋友查阅。

应用YoMo开发一个噪声传感器采集监控系统

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Stella981 Stella981
3年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
ES6 新增的数组的方法
给定一个数组letlist\//wu:武力zhi:智力{id:1,name:'张飞',wu:97,zhi:10},{id:2,name:'诸葛亮',wu:55,zhi:99},{id:3,name:'赵云',wu:97,zhi:66},{id:4,na
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
4个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(