IoT基础架构的演进 — 边云自定义消息传输

Prodan Labs 等级 232 0 0

IoT基础架构的演进 — 边云自定义消息传输 边缘计算不仅仅是将应用部署在边缘,并对其进行自动化的监控和运维。在许多应用场景里,边缘和云上应用需要进行特定的消息传输、数据交换等,以完成边云协同的业务处理。例如,用户需要从云端发送命令至边缘的应用来触发特定的业务,或者边缘设备需要将采集的业务信息上传至云端处理。

KubeEdge v1.6 版本增加了自定义边云消息传输的支持,用户可以根据场景,借助 Rule 和 RuleEndpoint 两个新增API来自定义的边云消息传输设置,为需要边云通信的业务组件或第三方插件屏蔽底层网络环境差异。

Router Manager


KubeEdge 借助 Kubernetes CRD 和路由器模块支持路由管理。用户可以通过mqtt代理在云和边缘之间传递其自定义消息。

使用场景: 用于用户控制数据的传递; 不适合大数据传送; 一次传送的数据大小限制为12MB。

kubeedge 升级到 1.6 版本貌似默认没有开启 router ,需要手动创建 router crd 和开启 router 功能。 IoT基础架构的演进 — 边云自定义消息传输

修改 cloudcore 配置,开启 router 功能。如果是新环境,直接开启即可。

# /etc/kubeedge/config/cloudcore.yaml
  router:
    enable: true
    address: 0.0.0.0
    port: 9443
    restTimeout: 60
  syncController:
    enable: true

# kubectl get crds | grep kubeedge
clusterobjectsyncs.reliablesyncs.kubeedge.io          2021-03-08T07:53:39Z
devicemodels.devices.kubeedge.io                      2021-03-08T07:53:39Z
devices.devices.kubeedge.io                           2021-03-08T07:53:39Z
objectsyncs.reliablesyncs.kubeedge.io                 2021-03-08T07:53:39Z
ruleendpoints.rules.kubeedge.io                       2021-03-08T08:04:59Z
rules.rules.kubeedge.io                               2021-03-08T08:04:59Z

云端到边缘端


通过调用 cloudcore api 将控制边缘端应用的消息从云传递到边缘,边缘端的应用接收消息后,开始或停止收集树莓派的系统日志,收集到的日志发布到 mq ,日志可以通过 kuiper 的规则处理,把需要的日志上传到云端 emqx 。 IoT基础架构的演进 — 边云自定义消息传输 环境

# kubectl get nodes -o wide
NAME                     STATUS   ROLES        AGE    VERSION                   INTERNAL-IP      EXTERNAL-IP   OS-IMAGE                         KERNEL-VERSION                 CONTAINER-RUNTIME
k8s-test-master01        Ready    master       21d    v1.20.2                   172.31.250.220   <none>        CentOS Linux 8 (Core)            4.18.0-193.28.1.el8_2.x86_64   cri-o://1.20.0
k8s-test-node01          Ready    node         21d    v1.20.2                   172.31.250.221   <none>        Ubuntu 20.04.1 LTS               5.4.0-58-generic               containerd://1.3.3-0ubuntu2.2
k8s-test-node02          Ready    node         21d    v1.20.2                   172.31.250.222   <none>        openSUSE Leap 15.2               5.3.18-lp152.57-default        cri-o://1.17.3
kubeedge-raspberrypi01   Ready    agent,edge   2d2h   v1.19.3-kubeedge-v1.6.0   192.168.13.102   <none>        Raspbian GNU/Linux 10 (buster)   5.4.51-v7l+                    docker://19.3.13

1.创建云端到边缘端的路由

# cat create-ruleEndpoint-rest.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
  name: my-rest
  labels:
    description: test
spec:
  ruleEndpointType: "rest"
  properties: {}
---
# cat create-ruleEndpoint-eventbus.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
  name: my-eventbus
  labels:
    description: test
spec:
  ruleEndpointType: "eventbus"
  properties: {}
---
# cat create-rule-rest-eventbus.yaml
apiVersion: rules.kubeedge.io/v1
kind: Rule
metadata:
  name: my-rule
  labels:
    description: test
spec:
  source: "my-rest"
  sourceResource: {"path":"/a"}
  target: "my-eventbus"
  targetResource: {"topic":"test"}

2.写一个简单的程序,用于收集 Linux 日志

// messagePubHandler 订阅 mq 消息,启动或停止日志采集
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {

    log.Printf("Received message: %v messageid: %d from topic: %s\n", msg, msg.MessageID(), msg.Topic())
    controlMessage := string(msg.Payload())
    log.Println(controlMessage)
    if controlMessage == "start" {
        fileName := "/var/log/syslog"
        config := tail.Config{
            ReOpen:    true,                                        // 重新打开
            Follow:    true,                                        // 是否跟随
            Location:  &tail.SeekInfo{Offset: S.offset, Whence: 0}, // 从文件的哪个地方开始读
            MustExist: true,                                        // 文件不存在报错
            Poll:      true,
        }

        tails, err := tail.TailFile(fileName, config)
        if err != nil {
            log.Println("tail file failed, err:", err)
            return
        }
        go logsStream(tails, S.mqttClient)

    } else if controlMessage == "stop" {
        S.StopCh <- string(msg.Payload())
    } else {
        log.Printf("Unknown message : %s", controlMessage)
    }

}

IoT基础架构的演进 — 边云自定义消息传输

3.在 Kuiper 创建日志流

/kuiper #  bin/kuiper create stream logs '(month string, days string,times string ,hostname string,kinds string,logs string,) WITH (FORMAT="JSON", DATASOURCE="demo")';
Connecting to 127.0.0.1:20498... 
Stream logs is created.
/kuiper # bin/kuiper show streams
Connecting to 127.0.0.1:20498... 
logs

4.创建 Kuiper 规则,把日志转发到云端 emqx

cat > /tmp/rule.yaml <<EOF
{
  "id": "rule1",
  "sql": "select * from logs ",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "tcp://${云端emqx IP}:1883",
        "topic": "demoSink"
      }
    }
  ]
}
EOF
/kuiper # bin/kuiper create rule rule1 -f /tmp/rule.yaml

5.调用云端的 cloudcore rest api 将消息发送到边缘端

# URL: http://{rest_endpoint}/{node_name}/{namespace}/{path}
[root@k8s-test-master01 ~]# curl -X POST --data 'start'  http://127.0.0.1:9443/kubeedge-raspberrypi01/default/a 
message delivered

6.边缘端的程序接收到消息,开始收集日志 IoT基础架构的演进 — 边云自定义消息传输

7.订阅云端的 emqx 消息,验证是否有日志发送过来 IoT基础架构的演进 — 边云自定义消息传输

边缘端到云端


1.创建边缘端到云端的路由

# create-ruleEndpoint-rest.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
  name: my-rest
  labels:
    description: test
spec:
  ruleEndpointType: "rest"
  properties: {}
---
# create-ruleEndpoint-eventbus.yaml
apiVersion: rules.kubeedge.io/v1
kind: RuleEndpoint
metadata:
  name: my-eventbus
  labels:
    description: test
spec:
  ruleEndpointType: "eventbus"
  properties: {}
---
#create-rule-eventbus-rest.yaml
apiVersion: rules.kubeedge.io/v1
kind: Rule
metadata:
  name: my-rule-eventbus-rest
  labels:
    description: test
spec:
  source: "my-eventbus"
  sourceResource: {"topic": "test","node_name": "kubeedge-raspberrypi01"}
  target: "my-rest"
  targetResource: {"resource":"http://172.31.250.220:8088/api/v1/msg"}

resource 是云端应用的 api 地址

2.写一个简单的 API 接口

package main

import(
    "github.com/kataras/iris/v12"
    "github.com/kataras/iris/v12/middleware/logger"
    "github.com/kataras/iris/v12/middleware/recover"
)

type Msg struct{
    EdgeMsg    string `json:"edgemsg"`
}

func main(){
    app := iris.New()
    c := &Msg{}
    app.Logger().SetLevel("debug")

    app.Use(recover.New())
    app.Use(logger.New())

    resAPI := app.Party("/api/v1")
    resAPI.Post("/msg", func(ctx iris.Context){
        if err := ctx.ReadJSON(c); err != nil{
            panic(err.Error())
        }else{
            ctx.JSON(c)
        }
    })

    resAPI.Get("/msg", func(ctx iris.Context){
        ctx.Writef("Received: %v\n", c.EdgeMsg)
    })

    app.Run(iris.Addr(":8088"), iris.WithoutServerError(iris.ErrServerClosed))
}

IoT基础架构的演进 — 边云自定义消息传输

目前接口没有数据 IoT基础架构的演进 — 边云自定义消息传输

3.在边缘端的将自定义的消息发布到边缘节点的 MQTT 代理 IoT基础架构的演进 — 边云自定义消息传输

mosquitto_pub -t 'default/test' -d -m '{"edgemsg":"msgtocloud"}'

4.再访问云端 API 接口,这时已经获取到了从边缘端发送的消息。 IoT基础架构的演进 — 边云自定义消息传输

结束


因为目前边云自定义消息传输的不适合大数据传送,一次传送的数据大小限制为12MB 和单向 REST 的局限性,目前使用场景还是相对简单,可能更多是用于用户控制数据的传递,比如控制边缘终端设备的启停、边缘端向云端汇总边缘终端设备的在线或离线状态等。

社区也计划在下一个版本中优化和扩展该功能特性。 参考文档 https://kubeedge.io/en/docs/developer/custom_message_deliver/\


感兴趣的读者可以关注下微信号 IoT基础架构的演进 — 边云自定义消息传输

收藏
评论区