IoT基础架构的演进 — 边云自定义消息传输

Prodan Labs 等级 390 0 0

IoT基础架构的演进 — 边云自定义消息传输 边缘计算不仅仅是将应用部署在边缘,并对其进行自动化的监控和运维。在许多应用场景里,边缘和云上应用需要进行特定的消息传输、数据交换等,以完成边云协同的业务处理。例如,用户需要从云端发送命令至边缘的应用来触发特定的业务,或者边缘设备需要将采集的业务信息上传至云端处理。

KubeEdge v1.6 版本增加了自定义边云消息传输的支持,用户可以根据场景,借助 Rule 和 RuleEndpoint 两个新增API来自定义的边云消息传输设置,为需要边云通信的业务组件或第三方插件屏蔽底层网络环境差异。

Router Manager


KubeEdge 借助 Kubernetes CRD 和路由器模块支持路由管理。用户可以通过mqtt代理在云和边缘之间传递其自定义消息。

使用场景: 用于用户控制数据的传递; 不适合大数据传送; 一次传送的数据大小限制为12MB。

kubeedge 升级到 1.6 版本貌似默认没有开启 router ,需要手动创建 router crd 和开启 router 功能。 IoT基础架构的演进 — 边云自定义消息传输

修改 cloudcore 配置,开启 router 功能。如果是新环境,直接开启即可。

# /etc/kubeedge/config/cloudcore.yaml
  router:
    enable: true
    address: 0.0.0.0
    port: 9443
    restTimeout: 60
  syncController:
    enable: true

# kubectl get crds | grep kubeedge
clusterobjectsyncs.reliablesyncs.kubeedge.io          2021-03-08T07:53:39Z
devicemodels.devices.kubeedge.io                      2021-03-08T07:53:39Z
devices.devices.kubeedge.io                           2021-03-08T07:53:39Z
objectsyncs.reliablesyncs.kubeedge.io                 2021-03-08T07:53:39Z
ruleendpoints.rules.kubeedge.io                       2021-03-08T08:04:59Z
rules.rules.kubeedge.io                               2021-03-08T08:04:59Z

云端到边缘端


通过调用 cloudcore api 将控制边缘端应用的消息从云传递到边缘,边缘端的应用接收消息后,开始或停止收集树莓派的系统日志,收集到的日志发布到 mq ,日志可以通过 kuiper 的规则处理,把需要的日志上传到云端 emqx 。 IoT基础架构的演进 — 边云自定义消息传输 环境

# kubectl get nodes -o wide
NAME                     STATUS   ROLES        AGE    VERSION                   INTERNAL-IP      EXTERNAL-IP   OS-IMAGE                         KERNEL-VERSION                 CONTAINER-RUNTIME
k8s-test-master01        Ready    master       21d    v1.20.2                   172.31.250.220   <none>        CentOS Linux 8 (Core)            4.18.0-193.28.1.el8_2.x86_64   cri-o://1.20.0
k8s-test-node01          Ready    node         21d    v1.20.2                   172.31.250.221   <none>        Ubuntu 20.04.1 LTS               5.4.0-58-generic               containerd://1.3.3-0ubuntu2.2
k8s-test-node02          Ready    node         21d    v1.20.2                   172.31.250.222   <none>        openSUSE Leap 15.2               5.3.18-lp152.57-default        cri-o://1.17.3
kubeedge-raspberrypi01   Ready    agent,edge   2d2h   v1.19.3-kubeedge-v1.6.0   192.168.13.102   <none>        Raspbian GNU/Linux 10 (buster)   5.4.51-v7l+                    docker://19.3.13

1.创建云端到边缘端的路由

# cat create-ruleEndpoint-rest.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
  name: my-rest
  labels:
    description: test
spec:
  ruleEndpointType: "rest"
  properties: {}
---
# cat create-ruleEndpoint-eventbus.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
  name: my-eventbus
  labels:
    description: test
spec:
  ruleEndpointType: "eventbus"
  properties: {}
---
# cat create-rule-rest-eventbus.yaml
apiVersion: rules.kubeedge.io/v1
kind: Rule
metadata:
  name: my-rule
  labels:
    description: test
spec:
  source: "my-rest"
  sourceResource: {"path":"/a"}
  target: "my-eventbus"
  targetResource: {"topic":"test"}

2.写一个简单的程序,用于收集 Linux 日志

// messagePubHandler 订阅 mq 消息,启动或停止日志采集
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {

    log.Printf("Received message: %v messageid: %d from topic: %s\n", msg, msg.MessageID(), msg.Topic())
    controlMessage := string(msg.Payload())
    log.Println(controlMessage)
    if controlMessage == "start" {
        fileName := "/var/log/syslog"
        config := tail.Config{
            ReOpen:    true,                                        // 重新打开
            Follow:    true,                                        // 是否跟随
            Location:  &tail.SeekInfo{Offset: S.offset, Whence: 0}, // 从文件的哪个地方开始读
            MustExist: true,                                        // 文件不存在报错
            Poll:      true,
        }

        tails, err := tail.TailFile(fileName, config)
        if err != nil {
            log.Println("tail file failed, err:", err)
            return
        }
        go logsStream(tails, S.mqttClient)

    } else if controlMessage == "stop" {
        S.StopCh <- string(msg.Payload())
    } else {
        log.Printf("Unknown message : %s", controlMessage)
    }

}

IoT基础架构的演进 — 边云自定义消息传输

3.在 Kuiper 创建日志流

/kuiper #  bin/kuiper create stream logs '(month string, days string,times string ,hostname string,kinds string,logs string,) WITH (FORMAT="JSON", DATASOURCE="demo")';
Connecting to 127.0.0.1:20498... 
Stream logs is created.
/kuiper # bin/kuiper show streams
Connecting to 127.0.0.1:20498... 
logs

4.创建 Kuiper 规则,把日志转发到云端 emqx

cat > /tmp/rule.yaml <<EOF
{
  "id": "rule1",
  "sql": "select * from logs ",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "tcp://${云端emqx IP}:1883",
        "topic": "demoSink"
      }
    }
  ]
}
EOF
/kuiper # bin/kuiper create rule rule1 -f /tmp/rule.yaml

5.调用云端的 cloudcore rest api 将消息发送到边缘端

# URL: http://{rest_endpoint}/{node_name}/{namespace}/{path}
[root@k8s-test-master01 ~]# curl -X POST --data 'start'  http://127.0.0.1:9443/kubeedge-raspberrypi01/default/a 
message delivered

6.边缘端的程序接收到消息,开始收集日志 IoT基础架构的演进 — 边云自定义消息传输

7.订阅云端的 emqx 消息,验证是否有日志发送过来 IoT基础架构的演进 — 边云自定义消息传输

边缘端到云端


1.创建边缘端到云端的路由

# create-ruleEndpoint-rest.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
  name: my-rest
  labels:
    description: test
spec:
  ruleEndpointType: "rest"
  properties: {}
---
# create-ruleEndpoint-eventbus.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
  name: my-eventbus
  labels:
    description: test
spec:
  ruleEndpointType: "eventbus"
  properties: {}
---
#create-rule-eventbus-rest.yaml
apiVersion: rules.kubeedge.io/v1
kind: Rule
metadata:
  name: my-rule-eventbus-rest
  labels:
    description: test
spec:
  source: "my-eventbus"
  sourceResource: {"topic": "test","node_name": "kubeedge-raspberrypi01"}
  target: "my-rest"
  targetResource: {"resource":"http://172.31.250.220:8088/api/v1/msg"}

resource 是云端应用的 api 地址

2.写一个简单的 API 接口

package main

import(
    "github.com/kataras/iris/v12"
    "github.com/kataras/iris/v12/middleware/logger"
    "github.com/kataras/iris/v12/middleware/recover"
)

type Msg struct{
    EdgeMsg    string `json:"edgemsg"`
}

func main(){
    app := iris.New()
    c := &Msg{}
    app.Logger().SetLevel("debug")

    app.Use(recover.New())
    app.Use(logger.New())

    resAPI := app.Party("/api/v1")
    resAPI.Post("/msg", func(ctx iris.Context){
        if err := ctx.ReadJSON(c); err != nil{
            panic(err.Error())
        }else{
            ctx.JSON(c)
        }
    })

    resAPI.Get("/msg", func(ctx iris.Context){
        ctx.Writef("Received: %v\n", c.EdgeMsg)
    })

    app.Run(iris.Addr(":8088"), iris.WithoutServerError(iris.ErrServerClosed))
}

IoT基础架构的演进 — 边云自定义消息传输

目前接口没有数据 IoT基础架构的演进 — 边云自定义消息传输

3.在边缘端的将自定义的消息发布到边缘节点的 MQTT 代理 IoT基础架构的演进 — 边云自定义消息传输

mosquitto_pub -t 'default/test' -d -m '{"edgemsg":"msgtocloud"}'

4.再访问云端 API 接口,这时已经获取到了从边缘端发送的消息。 IoT基础架构的演进 — 边云自定义消息传输

结束


因为目前边云自定义消息传输的不适合大数据传送,一次传送的数据大小限制为12MB 和单向 REST 的局限性,目前使用场景还是相对简单,可能更多是用于用户控制数据的传递,比如控制边缘终端设备的启停、边缘端向云端汇总边缘终端设备的在线或离线状态等。

社区也计划在下一个版本中优化和扩展该功能特性。 参考文档 https://kubeedge.io/en/docs/developer/custom_message_deliver/\


感兴趣的读者可以关注下微信号 IoT基础架构的演进 — 边云自定义消息传输

收藏
评论区

相关推荐

Kubernetes(k8s)中文文档 Kubernetes概述
简介 Kubernetes(https://www.kubernetes.org.cn/)是一个开源的,用于管理云平台中多个主机上的容器化的应用,Kubernetes的目标是让部署容器化的应用简单并且高效(powerful),Kubernetes提供了应用部署,规划,更新,维护的一种机制。 Kubernetes一个核心的特点就是能够自主的管理容
IoT基础架构的演进 — Kuiper
EMQ X Kuiper 是映云科技开源的轻量级物联网边缘数据分析和流式处理软件, Kuiper 设计的一个主要目标就是将在云端运行的实时流式计算框架(如 Apache Spark,Apache Storm 和 Apache Flink 等)迁移到边缘端。Kuiper 参考了云端流式处理项目的架构与实现,结合边缘流式数据处理的特点,采用了编写基于源 (Sou
IoT基础架构的演进 — 边云自定义消息传输
边缘计算不仅仅是将应用部署在边缘,并对其进行自动化的监控和运维。在许多应用场景里,边缘和云上应用需要进行特定的消息传输、数据交换等,以完成边云协同的业务处理。例如,用户需要从云端发送命令至边缘的应用来触发特定的业务,或者边缘设备需要将采集的业务信息上传至云端处理。KubeEdge v1.6 版本增加了自定义边云消息传输的支持,用户可以根据场景,借助 Rule
基于k8s的DevOps实践之路
原标题:基于k8s的DevOps实践之路很多快速发展的公司都面临着一个巨大挑战:在需求不断动态横向扩容的同时继续保持系统的高可用性。如何有效解决这一问题,Kubernetes(k8s)应运而生。k8s以运行可扩展工作负载而闻名,它可以根据资源使用情况调整工作负载。白山科技云分发团队基于多年的DevOps实践经验,在白山会运维日第三期与Thoughtworks
谐云上榜“2020边缘计算力量TOP20”
2020年11月7日,以“5G·边缘计算“为主题的全球边缘计算大会在北京成功召开。此次活动吸引了政、产、学、研、用各界的行业权威技术机构与专家等参会。谐云作为边缘计算领域的先驱者,应邀出席大会,并获评“2020边缘计算力量TOP20”。全球最大客服云在云边协同中的案例分享大会上,谐云高级架构师魏欢分
深度 | 阿里云蒋江伟:什么是真正的云原生?
作者 | 阿里云原生 来源|而今,云原生成了耳熟能详的热门词,似乎不提云原生就落伍了,加入 CNCF 也成了云厂商引以为傲的技术优势。我们也看到各种云原生的定义,有来自 CNCF 的“微服务容器持续交付DevOps”,也有来自不同云厂商的说法。2020 年 9 月,阿里云成立了云原生技术委员会,今天我就从云计算的初心尝试谈谈什么是真正的云原生。狭义的
PaaS失败了吗?让我们看看Cloud Foundry的优势
软件团队常见的行为方式可以总结如下:1. 软件部署从Heroku或Firebase开始。2. 在应用程序需要快速扩展时,开始使用Docker进行容器化。3. 在遇到Docker困境的时候,考虑使用Kubernetes等容器编排工具。(https://cdn.thenewstack.io/media/2020/12/5105a24aimage011.
灵雀云陈恺:2020 云原生走向何处?|CNBPS2020演讲实录
大家好,我是灵雀云的陈恺。今天我们用这种比较特殊的方式来交流,很多人可能已经习惯这种新的工作和生活方式。疫情在带来很大挑战的同时,也在倒逼着我们去进步,就像几个月前微软CEO 萨提亚·纳德拉说的,很多企业把原本需要花两年时间来做的数字化转型,在短短两个月内一口气全都搞定了。参加过前面几届CNBPS大会的朋友可能知道,我每年都会试图用一句话来概括下云原生在这一
云游戏时代来了,让好玩触手可及。
本文纯属分享新技术随着科技的迅猛发展,"云端"这个概念逐渐成为各大厂商的研究对象。云储存、云盘、云计算乃至最近大火的云游戏,脱离了固定的硬件和软件计算环节。 云游戏也叫有需求的游戏,是基于云计算的新技术。在云游戏模型中,所有的游戏逻辑和渲染都在服务器端运行,然后再从服务器把压缩的视频传给用户,这样玩家就不需要一台CPU和GPU相当好的计算机了,唯一的要求就
zookeeper到nacos的迁移实践
本文已收录 https://github.com/lkxiaolou/lkxiaolou 欢迎star。 技术选型公司的RPC框架是dubbo,配合使用的服务发现组件一直是zookeeper,长久以来也没什么大问题。至于为什么要考虑换掉zookeeper,并不是因为它的性能瓶颈,而是考虑往云原生方向演进。云原生计算基金会(CNCF)对云原生的定义是: 云原生
腾云先锋招新啦!!
腾云先锋招新啦!!如果您想结交一群热爱云产品 / 技术,喜欢技术交流的小伙伴;如果您对目前正在使用的腾讯云产品、云服务有更好的建议,但无法直接反馈给产品团队;如果您出现遇到云问题无法及时解决的情况;::: warning 请加入腾云先锋,您的云技术问题和需求将可直接对接到产品团,同时您也可以在这里拓展朋友圈。还可以通过各种方式积累积分,凭积分兑换无门槛代金
腾云先锋招新啦!!
腾云先锋招新啦!!如果您想结交一群热爱云产品 / 技术,喜欢技术交流的小伙伴;如果您对目前正在使用的腾讯云产品、云服务有更好的建议,但无法直接反馈给产品团队;如果您出现遇到云问题无法及时解决的情况;请加入腾云先锋,您的云技术问题和需求将可直接对接到产品团,同时您也可以在这里拓展朋友圈。还可以通过各种方式积累积分,凭积分兑换无门槛代金券(301000 元不等)
腾讯云用户沟通群招新啦!!!
如果您想结交一群热爱云产品 / 技术,喜欢技术交流的小伙伴;如果您对目前正在使用的腾讯云产品、云服务有更好的建议,但无法直接反馈给产品团队;如果您出现遇到腾讯云产品问题无法及时解决的情况;请加入腾云先锋用户沟通群,您的云技术问题和需求将可直接对接到产品团,同时您也可以在这里拓展朋友圈。还可以通过各种方式积累积分,凭积分兑换无门槛代金券(301000 元不等)
我们也从 Python 转向了 Golang -- MemFireDB
首先说明一下,Python 也是我最喜欢的一门编程语言,我用 Python 工作了接近 8 年,并且会一直使用下去。我们团队在开启 这个项目之初就做出了从 Python 往 golang 转换的预期,因此我们的转换过程没有任何障碍,非常顺利的就完成了。我们为什么会在项目开启之初就做出要更换编程语言的决定呢,为什么不一开始就选择 Golang 呢?第一个问题
做运维的如何利用自己手里的资源增加收入,发财记得感谢我。
现在很多做技术开发的人 都会找兼职给外面接点活赚点零花钱,目前这个市场还是狼多肉少,很难接到赚钱的单子。不妨多找一些方法,我这给大家介绍一个好渠道。如果在公司做运维工作有维护服务器资源的不管是阿里云腾讯云,只要你能登陆账号就可以 公司采购完的订单是可以关联给代理商的,以阿里云为例,在控制台企业合作伙伴项目进去以后你能看到最近购买的订单,每一个订单后面都是有一