IoT基础架构的演进 — Kuiper

Prodan Labs 等级 81 0 0

Description EMQ X Kuiper 是映云科技开源的轻量级物联网边缘数据分析和流式处理软件, Kuiper 设计的一个主要目标就是将在云端运行的实时流式计算框架(如 Apache Spark,Apache Storm 和 Apache Flink 等)迁移到边缘端。

Kuiper 参考了云端流式处理项目的架构与实现,结合边缘流式数据处理的特点,采用了编写基于源 (Source),SQL (业务逻辑处理), 目标 (Sink) 的规则引擎来实现边缘端的流式数据处理。 Description Kuiper 可以运行在各类物联网的边缘使用场景中, 如 流式处理:实现在边缘端的实时流式处理 规则引擎:灵活定义规则引擎,实现告警和消息转发 数据格式与协议转换:实现边缘与云端不同类型的数据格式与异构协议之间灵活转换,实现IT&OT融合 通过 Kuiper 在边缘端的处理,可以提升系统响应速度,节省网络带宽费用和存储成本,以及提高系统安全性等。

Kuiper 除了具备高可扩展性外,还具备与 KubeEdge 框架集成的能力。 Description 感兴趣的读者可以移步官网解锁更多的姿势,笔者这里就不赘述了。 https://docs.emqx.cn/kuiper/latest/

在边缘端安装 Kuiper


kubernetes 环境如下,其中树莓派为边缘端设备

[root@k8s-test-master01 ~]# kubectl get no -o wide 
NAME                     STATUS   ROLES        AGE    VERSION                   INTERNAL-IP      EXTERNAL-IP   OS-IMAGE                         KERNEL-VERSION                 CONTAINER-RUNTIME
k8s-test-master01        Ready    master       16h    v1.20.2                   172.31.250.220   <none>        CentOS Linux 8 (Core)            4.18.0-193.28.1.el8_2.x86_64   cri-o://1.20.0
k8s-test-node01          Ready    node         18h    v1.20.2                   172.31.250.221   <none>        Ubuntu 20.04.1 LTS               5.4.0-58-generic               containerd://1.3.3-0ubuntu2.2
k8s-test-node02          Ready    node         17h    v1.20.2                   172.31.250.222   <none>        openSUSE Leap 15.2               5.3.18-lp152.57-default        cri-o://1.17.3
kubeedge-raspberrypi01   Ready    agent,edge   116m   v1.19.3-kubeedge-v1.5.0   192.168.13.102   <none>        Raspbian GNU/Linux 10 (buster)   5.4.51-v7l+                    docker://19.3.13

通过 kubernetes 部署 Kuiper,定义节点亲和性,把 Pod 调度到边缘节点,如果拥有的边缘设备量多,资源类别使用 daemonsets

MQTT_SOURCE__DEFAULT__SERVERS 指定的是 kubeedge 边缘端的 mqtt

apiVersion: apps/v1
kind: Deployment
metadata:
  name: edge-kuiper
  labels:
    app: edge-kuiper
spec:
  replicas: 1
  selector:
    matchLabels:
      app: edge-kuiper
  template:
    metadata:
      labels:
        app: edge-kuiper
    spec:
      affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: node-role.kubernetes.io/edge
                      operator: In
                      values:
                      - ""
      containers:
      - name: edge-kuiper
        image: emqx/kuiper:1.1.1-alpine
        env:
        - name: MQTT_SOURCE__DEFAULT__SERVERS
          value: "[tcp://127.0.0.1:1883]"
        resources:
          requests:
            cpu: 100m
            memory: 100Mi
          limits:
            cpu: 1000m
            memory: 1024Mi
        ports:
        - containerPort: 9081
          name: mq
          hostPort: 9081
        livenessProbe:
          tcpSocket:
            port: 9081
          initialDelaySeconds: 60
          periodSeconds: 60

Kuiper-manager 是用于管理 Kuiper 节点、流、规则和插件等的 Web 管理控制台

apiVersion: apps/v1
kind: Deployment
metadata:
  name: edge-kuiper-manager
  labels:
    app: edge-kuiper-manager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: edge-kuiper-manager
  template:
    metadata:
      labels:
        app: edge-kuiper-manager
    spec:
      affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: node-role.kubernetes.io/edge
                      operator: In
                      values:
                      - ""
      containers:
      - name: edge-kuiper-manager
        image: emqx/kuiper-manager:1.1.0
        resources:
          requests:
            cpu: 100m
            memory: 100Mi
          limits:
            cpu: 1000m
            memory: 1024Mi
        ports:
        - containerPort: 9082
          name: mq
          hostPort: 9082
        livenessProbe:
          tcpSocket:
            port: 9082
          initialDelaySeconds: 60
          periodSeconds: 60

使用 kubectl 命令 create 即可,安装完成后如下

[root@k8s-test-master01 ~]# kubectl get po -o wide
NAME                                 READY   STATUS    RESTARTS   AGE    IP               NODE                     NOMINATED NODE   READINESS GATES
edge-kuiper-79667bd886-67xtw         1/1     Running   0          104m   172.17.0.3       kubeedge-raspberrypi01   <none>           <none>
edge-kuiper-manager-ffb8bd5b-c9bxz   1/1     Running   0          94m    172.17.0.4       kubeedge-raspberrypi01   <none>           <none>
edge-nginx-7bd689df6d-4rgvn          1/1     Running   0          124m   172.17.0.2       kubeedge-raspberrypi01   <none>           <none>

使用边缘端 IP + 9082 端口访问 Kuiper 控制台,用户密码默认为 admin/public ,添加节点即可。 Description

测试 Kuiper


先对 Kuiper 进行简单的测试

1.进入 Kuiper 容器

kubectl exec -it edge-kuiper-79667bd886-67xtw  -- /bin/sh

2.创建温度与湿度流(stream)

/kuiper # bin/kuiper create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
Connecting to 127.0.0.1:20498... 
Stream demo is created.

# 查看创建的流
/kuiper # bin/kuiper show streams
Connecting to 127.0.0.1:20498... 
demo

流的名字为 demo, DATASOURCE 为 devices/+/messages ,对应 MQTT 的 toptic,MQTT 默认是安装 Kuiper 时通过变量传入的地址

3.进入 Kuiper 交互界面

/kuiper # bin/kuiper query
Connecting to 127.0.0.1:20498... 
kuiper > select * from demo WHERE temperature > 30;
Query was submit successfully.
kuiper > 

该 SQL 规则将过滤掉 temperature 小于 30 的数据

4.在边缘端通过 MQTT 客户端制造一些数据

mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 50, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 40, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 30, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 20, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 10, "humidity" : 20}' -t devices/device_001/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 50, "humidity" : 20}' -t devices/device_002/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 40, "humidity" : 20}' -t devices/device_002/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 30, "humidity" : 20}' -t devices/device_002/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 20, "humidity" : 20}' -t devices/device_002/messages
mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 10, "humidity" : 20}' -t devices/device_002/messages

-h 指定的是 kubeedge 的 MQTT

5.正常的情况,在 Kuiper 交互界面会打印出过滤后的数据

kuiper > [{"humidity":20,"temperature":50}][{"humidity":20,"temperature":40}][{"humidity":20,"temperature":50}][{"humidity":20,"temperature":40}]

也可以在 Kuiper 控制台创建流和插件等 Description

在云端安装 EMQX


云边协同在边缘计算中是一个很重要的概念,而云边的通道又是云边协同的枢纽,Kuiper 可以订阅 kubeedge 的 mqtt topic ( $hw/events/device/+/twin/update ),还可以把规则处理后的数据推送到云端的 mqtt 。

使用 helm 部署 emqx

# 添加 emqx 仓库
helm repo add emqx https://repos.emqx.io/charts
# 更新仓库
helm repo update
# 查看 emqx 版本
helm search repo emqx
NAME            CHART VERSION   APP VERSION     DESCRIPTION                             
emqx/emqx       4.2.7           4.2.7           A Helm chart for EMQ X                  
emqx/emqx-ee    4.2.3           4.2.3           A Helm chart for EMQ X Enterprise       
emqx/kuiper     0.9.0           0.9.0           A lightweight IoT edge analytic software

因为是云边集成环境,为了保证云端的 emqx 始终运行在云端节点,故使用 helm template 的方式部署

# 导出 yaml 安装清单
helm template  test emqx/emqx  > emqx.yaml

修改 yaml 文件,定义节点反亲和性

      affinity:
          nodeAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
                - matchExpressions:
                    - key: node-role.kubernetes.io/edge
                      operator: NotIn
                      values:
                      - ""

emqx service 对外使用 nodeport 类型

---
# Source: emqx/templates/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: test-emqx-external
  namespace: default
  labels:
    app.kubernetes.io/name: emqx
    helm.sh/chart: emqx-4.2.7
    app.kubernetes.io/instance: test
    app.kubernetes.io/managed-by: Helm
spec:
  type: NodePort
  ports:
  - name: mqtt
    port: 1883
    protocol: TCP
    targetPort: mqtt
    nodePort: 1883
  - name: mqttssl
    port: 8883
    protocol: TCP
    targetPort: mqttssl
    nodePort: 8883
  - name: mgmt
    port: 8081
    protocol: TCP
    targetPort: mgmt
    nodePort: 8081
  - name: ws
    port: 8083
    protocol: TCP
    targetPort: ws
    nodePort: 8083
  - name: wss
    port: 8084
    protocol: TCP
    targetPort: wss
    nodePort: 8084
  - name: dashboard
    port: 18083
    protocol: TCP
    targetPort: dashboard
    nodePort: 18083
  selector:
    app.kubernetes.io/name: emqx
    app.kubernetes.io/instance: test

查看 emqx 集群状态

[root@k8s-test-master01 ~]# kubectl get po -o wide --selector=app.kubernetes.io/name=emqx
NAME          READY   STATUS    RESTARTS   AGE     IP               NODE                NOMINATED NODE   READINESS GATES
test-emqx-0   1/1     Running   0          8m10s   192.168.57.195   k8s-test-master01   <none>           <none>
test-emqx-1   1/1     Running   0          7m44s   192.168.116.68   k8s-test-node02     <none>           <none>
test-emqx-2   1/1     Running   0          7m21s   192.168.85.196   k8s-test-node01     <none>           <none>
[root@k8s-test-master01 ~]# kubectl exec -it test-emqx-0 -- emqx_ctl cluster status
Cluster status: #{running_nodes =>
                      ['test@test-emqx-0.test-emqx-headless.default.svc.cluster.local',
                       'test@test-emqx-1.test-emqx-headless.default.svc.cluster.local',
                       'test@test-emqx-2.test-emqx-headless.default.svc.cluster.local'],
                  stopped_nodes => []}

EMQX 默认启用 Dashboard ,通过云端端 IP + 18083 端口访问,用户密码: admin/public

Description

Kuiper 规则


创建 Kuiper 规则,把通过规则处理后的数据上传到云端的 mqtt 1.订阅云端 mqtt 的 demoSink topic ,用于验证 Kuiper 是否把数据上传到云端。

# 47.242.xxx.xxx 是云端 mqtt 的地址
# mosquitto_sub -i test_sub -h 47.242.xxx.xxx  -p 1883 -d -t demoSink
Client test_sub sending CONNECT
Client test_sub received CONNACK (0)
Client test_sub sending SUBSCRIBE (Mid: 1, Topic: demoSink, QoS: 0, Options: 0x00)
Client test_sub received SUBACK
Subscribed (mid: 1): 0

2.在边缘端的 kuiper 中创建规则

# 定义规则
cat > /tmp/rule.yaml <<EOF
{
  "id": "rule1",
  "sql": "select * from demo WHERE temperature > 30",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "tcp://$云端IP:1883",
        "topic": "demoSink"
      }
    }
  ]
}
EOF
# 创建规则
/kuiper # bin/kuiper create rule rule1 -f /tmp/rule.yaml
Connecting to 127.0.0.1:20498... 
Creating a new rule from file /tmp/rule.yaml.
Rule rule1 was created successfully, please use 'bin/kuiper getstatus rule rule1' command to get rule status.
# 查看规则
/kuiper # bin/kuiper getstatus rule rule1
Connecting to 127.0.0.1:20498... 
{
  "source_demo_0_records_in_total": 0,
  "source_demo_0_records_out_total": 0,
  "source_demo_0_exceptions_total": 0,
  "source_demo_0_process_latency_us": 0,
  "source_demo_0_buffer_length": 0,
  "source_demo_0_last_invocation": 0,
  "op_1_preprocessor_demo_0_records_in_total": 0,
  "op_1_preprocessor_demo_0_records_out_total": 0,
  "op_1_preprocessor_demo_0_exceptions_total": 0,
  "op_1_preprocessor_demo_0_process_latency_us": 0,
  "op_1_preprocessor_demo_0_buffer_length": 0,
  "op_1_preprocessor_demo_0_last_invocation": 0,
  "op_2_filter_0_records_in_total": 0,
  "op_2_filter_0_records_out_total": 0,
  "op_2_filter_0_exceptions_total": 0,
  "op_2_filter_0_process_latency_us": 0,
  "op_2_filter_0_buffer_length": 0,
  "op_2_filter_0_last_invocation": 0,
  "op_3_project_0_records_in_total": 0,
  "op_3_project_0_records_out_total": 0,
  "op_3_project_0_exceptions_total": 0,
  "op_3_project_0_process_latency_us": 0,
  "op_3_project_0_buffer_length": 0,
  "op_3_project_0_last_invocation": 0,
  "sink_log_0_0_records_in_total": 0,
  "sink_log_0_0_records_out_total": 0,
  "sink_log_0_0_exceptions_total": 0,
  "sink_log_0_0_process_latency_us": 0,
  "sink_log_0_0_buffer_length": 0,
  "sink_log_0_0_last_invocation": 0,
  "sink_mqtt_1_0_records_in_total": 0,
  "sink_mqtt_1_0_records_out_total": 0,
  "sink_mqtt_1_0_exceptions_total": 0,
  "sink_mqtt_1_0_process_latency_us": 0,
  "sink_mqtt_1_0_buffer_length": 0,
  "sink_mqtt_1_0_last_invocation": 0
}

3.使用 mosquitto_pub 客户端发送一些数据

/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 30, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 40, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 30, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 31, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 21, "humidity" : 20}' -t devices/device_002/messages
/ # mosquitto_pub -i kubeedge-kuiper-test -h 127.0.0.1  -p 1883 -m '{"temperature": 40, "humidity" : 20}' -t devices/device_001/messages

4.查看规则状态

/kuiper # bin/kuiper getstatus rule rule1
Connecting to 127.0.0.1:20498... 
{
  "source_demo_0_records_in_total": 13,
  "source_demo_0_records_out_total": 13,
  "source_demo_0_exceptions_total": 0,
  "source_demo_0_process_latency_us": 4,
  "source_demo_0_buffer_length": 0,
  "source_demo_0_last_invocation": "2021-02-17T09:15:25.79156",
  "op_1_preprocessor_demo_0_records_in_total": 13,
  "op_1_preprocessor_demo_0_records_out_total": 13,
  "op_1_preprocessor_demo_0_exceptions_total": 0,
  "op_1_preprocessor_demo_0_process_latency_us": 9,
  "op_1_preprocessor_demo_0_buffer_length": 0,
  "op_1_preprocessor_demo_0_last_invocation": "2021-02-17T09:15:25.791597",
  "op_2_filter_0_records_in_total": 13,
  "op_2_filter_0_records_out_total": 6,
  "op_2_filter_0_exceptions_total": 0,
  "op_2_filter_0_process_latency_us": 13,
  "op_2_filter_0_buffer_length": 0,
  "op_2_filter_0_last_invocation": "2021-02-17T09:15:25.791622",
  "op_3_project_0_records_in_total": 6,
  "op_3_project_0_records_out_total": 6,
  "op_3_project_0_exceptions_total": 0,
  "op_3_project_0_process_latency_us": 43,
  "op_3_project_0_buffer_length": 0,
  "op_3_project_0_last_invocation": "2021-02-17T09:15:25.791651",
  "sink_log_0_0_records_in_total": 6,
  "sink_log_0_0_records_out_total": 6,
  "sink_log_0_0_exceptions_total": 0,
  "sink_log_0_0_process_latency_us": 287,
  "sink_log_0_0_buffer_length": 0,
  "sink_log_0_0_last_invocation": "2021-02-17T09:15:25.791736",
  "sink_mqtt_1_0_records_in_total": 6,
  "sink_mqtt_1_0_records_out_total": 6,
  "sink_mqtt_1_0_exceptions_total": 0,
  "sink_mqtt_1_0_process_latency_us": 212,
  "sink_mqtt_1_0_buffer_length": 0,
  "sink_mqtt_1_0_last_invocation": "2021-02-17T09:15:25.79171"
}

5.查看第一步订阅的 topic 有没有接收到数据。

/ # mosquitto_sub -i test_sub -h 47.242.xxx.xxx -p 1883 -d -t demoSink
Client test_sub sending CONNECT
Client test_sub received CONNACK (0)
Client test_sub sending SUBSCRIBE (Mid: 1, Topic: demoSink, QoS: 0, Options: 0x00)
Client test_sub received SUBACK
Subscribed (mid: 1): 0
Client test_sub received PUBLISH (d0, q0, r0, m0, 'demoSink', ... (32 bytes))
{"humidity":20,"temperature":40}
Client test_sub received PUBLISH (d0, q0, r0, m0, 'demoSink', ... (32 bytes))
{"humidity":20,"temperature":31}
Client test_sub received PUBLISH (d0, q0, r0, m0, 'demoSink', ... (32 bytes))
{"humidity":20,"temperature":40}

可以看到 temperature 大于 30 的数据,都推送到了云端。

小结


Kuiper 为 kubeedge 的计算下沉提供了边缘流式数据处理的能力,Kuiper 也有在做一些适配 kubeedge 的设计(如对接 KubeEdge 设备模型),KubeEdge 和 Kuiper 确实有点 “双剑合并” 的意思。Kuiper 扩展性很高,有很多有趣的插件,比如 influx 插件可以把数据存储到 InfluxDB 等。

本文只是简单地介绍了下 Kuiper ,我们也在研究试用中,包括与 Apache Flink 等大数据服务的集成,关于 Kuiper 的进一步使用,后续再更新啦。


感兴趣的读者可以关注下微信号 Description

预览图
收藏
评论区