超轻量物联网边缘流处理 - Kuiper 插件开发教程

CodeCipherPro
• 阅读 1473

EMQ X Kuiper 是一款基于 SQL 的轻量级物联网流式数据处理软件,提供了一套插件机制用于实现自定义源(source),目标(sink)以及 SQL 函数(function)以扩展流处理功能。本教程详细介绍了 Kuiper 插件的开发编译和部署过程。

概览

Kuiper 插件基于 Go 语言的插件机制,用户可以构建松散耦合的插件程序,在运行时动态加载和绑定。但是,由于 GO 语言插件系统的限制, Kuiper 插件的编译和使用也有相应的限制:

  • 插件不支持 windows 系统
  • 插件编译环境要求跟 Kuiper 编译环境尽量一致,包括但不限于

    • 相同的 GO 版本
    • 插件与 Kuiper 自身依赖的相同包版本必须完全一致,包括 Kuiper 自身
    • 插件与 Kuiper 编译环境的 GOPATH 必须完全一致

这些限制较为苛刻,几乎要求插件和 Kuiper 在同一台机器编译运行,经常导致开发环境编译出的插件无法在生产 Kuiper 上使用。本文详细介绍了一种切实可用的插件开发环境设置和流程,推荐给 Kuiper 插件开发者使用。插件的开发和使用一般有如下流程:

  • 开发

    • 创建并开发插件项目
    • 编译调试插件
  • 部署

    • 编译生产环境可用插件
    • 部署插件到生产环境

插件开发

插件开发一般在开发环境中进行。在开发环境调试运行通过后再部署到生产环境中。

创建并开发插件项目

Kuiper 项目源代码的 plugins 目录下有一些插件范例。用户自定义的插件也可以在 Kuiper 项目中开发。但是为了便于代码管理,一般应当在 Kuiper 项目之外另建项目开发自定义插件。插件项目建议使用 Go module,典型的项目目录如下图所示:

plugin_project
  sources         //源(source)插件源代码目录
    mysource.go
  sinks           //目标(sink)插件源代码目录
    mysink.go
  functions       //函数(function)插件源代码目录
    myfunction.go
  target          //编译结果目录     
  go.mod          //go module文件

插件开发需要扩展 Kuiper 内的接口,因此必须依赖于 Kuiper 项目。最简单的 go.mod 也需要包含对 Kuiper 的依赖。典型的 go.mod 如下:

module samplePlugin

go 1.13

require (
    github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
)

Kuiper 插件有三种类型,源代码可放入对应的目录中。插件开发的详细方法请参看 EMQ X Kuiper 扩展。本文以目标(sink)为例,介绍插件的开发部署过程。我们将开发一个最基本的 MySql 目标,用于将流输出写入到 MySql 数据库中。

  • 新建名为 samplePlugin 的插件项目,采用上文的目录结构
  • 在 sinks 目录下,新建 mysql.go 文件
  • 编辑 mysql.go 文件以实现插件

    • 实现 api.Sink接口
    • 导出 Symbol:Mysql
  • 编辑 go.mod, 添加 mysql 驱动模块

mysql.go 完整代码如下

package main

import (
    "database/sql"
    "fmt"
    "github.com/emqx/kuiper/xstream/api"
    _ "github.com/go-sql-driver/mysql"
)

type mysqlSink struct {
    url       string
    table     string

    db        *sql.DB
}

func (m *mysqlSink) Configure(props map[string]interface{}) error {
    if i, ok := props["url"]; ok {
        if i, ok := i.(string); ok {
            m.url = i
        }
    }
    if i, ok := props["table"]; ok {
        if i, ok := i.(string); ok {
            m.table = i
        }
    }
    return nil
}

func (m *mysqlSink) Open(ctx api.StreamContext) (err error) {
    logger := ctx.GetLogger()
    logger.Debug("Opening mysql sink")
    m.db, err = sql.Open("mysql", m.url)
    return
}

func (m *mysqlSink) Collect(ctx api.StreamContext, item interface{}) error {
    logger := ctx.GetLogger()
    if v, ok := item.([]byte); ok {
        logger.Debugf("mysql sink receive %s", item)
        sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.table, v)
        logger.Debugf(sql)
        insert, err := m.db.Query(sql)
        if err != nil {
            return err
        }
        defer insert.Close()
    } else {
        logger.Debug("mysql sink receive non byte data")
    }
    return nil
}

func (m *mysqlSink) Close(ctx api.StreamContext) error {
    if m.db != nil{
        m.db.Close()
    }
    return nil
}

var Mysql mysqlSink

go.mod 完整代码如下

module samplePlugin

go 1.13

require (
   github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
   github.com/go-sql-driver/mysql v1.5.0
)

编译调试插件

编译插件应当与编译 Kuiper 的环境一致。在开发环境中,典型的用法是在本地下载并编译 Kuiper 和插件,然后在本地 Kuiper 上调试插件功能;也可以在 Kuiper 的 docker 容器中编译插件,并用 Kuiper 容器运行调试。

本地编译

开发者可以在本地自行编译 Kuiper 和插件进行调试。其步骤如下:

  1. 下载 Kuiper 源代码 git clone https://github.com/emqx/kuiper.git
  2. 编译 Kuiper:在 Kuiper 目录下,运行 make
  3. 编译插件:

    1. 在插件项目下,运行go mod edit -replace github.com/emqx/kuiper=$kuiperPath,使得 Kuiper 依赖指向本地 Kuiper,请替换$kuiperPath 到步骤1下载目录,下同。
    2. 编译插件 so 到 Kuiper 插件目录下
     go build --buildmode=plugin -o $kuiperPath/_build/$build/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go

Docker 编译

从0.3.0版本开始,Kuiper 提供了开发版本 docker 镜像( kuiper:x.x.x-dev )。与运行版本相比,开发版提供了 go 开发环境,使得用户可以在编译出在 Kuiper 正式发布版本中完全兼容的插件。Docker 中编译步骤如下:

  1. 运行 Kuiper 开发版本 docker。需要把本地插件目录 mount 到 docker 里的目录中,这样才能在 docker 中访问插件项目并编译。笔者的插件项目位于本地/var/git目录。下面的命令中,我们把本地的/var/git目录映射到docker内的/home目录中。

    docker run -d --name kuiper-dev --mount type=bind,source=/var/git,target=/home emqx/kuiper:0.3.0-dev
  2. 在 docker 环境中编译插件,其原理与本地编译一致。编译出的插件置于插件项目的 target 目录中

    -- In host
    # docker exec -it kuiper-dev /bin/sh
    
    -- In docker instance
    # cd /home/samplePlugin
    # go mod edit -replace github.com/emqx/kuiper=/go/kuiper
    # go build --buildmode=plugin -o /home/samplePlugin/target/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go

调试运行插件

在本地或 Docker 中启动 Kuiper,创建流和规则,规则的 action 设置为 mysql 即可对自定义的 mysql sink 插件进行测试。创建流和规则的步骤请参考 Kuiper 文档。以下提供一个使用了 mysql 插件的规则供参考。

{
  "id": "ruleTest",
  "sql": "SELECT * from demo",
  "actions": [
    {
      "log": {},
      "mysql":{
        "url": "user:test@tcp(localhost:3307)/user",
        "table": "test"
      }
    }
  ]
}

需要注意的是,插件重新编译后需要重启 Kuiper 才能载入新的版本。

插件部署

如果生产环境和开发环境如果不同,开发的插件需要重新编译并部署到生产环境。假设生产环境采用 Kuiper docker 进行部署,本节将描述如何部署插件到生产环境中。

插件编译

插件原则上应该与生产环境 Kuiper 采用相同环境进行编译。假设生产环境为 Kuiper docker,则应当采用与生产环境相同版本的 dev docker 环境编译插件。例如,生产环境采用 emqx/kuiper:0.3.0 的 docker 镜像,则插件需要在emqx/kuiper:0.3.0-dev 的环境中进行编译。

编译过程请参考 Docker 编译

插件部署

可以采用 REST API 或者 CLI 进行插件管理。下文以 REST API 为例,将上一节编译的插件部署到生产环境中。

  1. 插件打包并放到 http 服务器。将上一节编译好的插件 .so 文件及默认配置文件(只有 source 需要) .yaml 文件一起打包到一个 .zip 文件中,假设为 mysqlSink.zip。把该文件放置到生产环境也可访问的 http 服务器中。
  2. 使用 REST API 创建插件:

    POST http://{$production_kuiper_ip}:9081/plugins/sinks
    Content-Type: application/json
    
    {"name":"mysql","file":"http://{$http_server_ip}/plugins/sinks/mysqlSink.zip"}
  3. 验证插件是否创建成功

    GET http://{$production_kuiper_ip}:9081/plugins/sinks/mysql

    返回

    {
       "name": "mysql",
       "version": "1.0.0"
    }

至此,插件部署成功。可以创建带有 mysql sink 的规则进行验证。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.io/cn/blog/super-lightweight-iot-edge-stream-processing-kuiper-plugin-development-tutorial

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Prodan Labs Prodan Labs
4年前
IoT基础架构的演进 — Kuiper
EMQXKuiper是映云科技开源的轻量级物联网边缘数据分析和流式处理软件,Kuiper设计的一个主要目标就是将在云端运行的实时流式计算框架(如ApacheSpark,ApacheStorm和ApacheFlink等)迁移到边缘端。Kuiper参考了云端流式处理项目的架构与实现,结合边缘流式数据处理的特点,采用了编写基于源(Sou
Stella981 Stella981
3年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Wesley13 Wesley13
3年前
FD
!(https://oscimg.oschina.net/oscnet/27b94bba9f88488dba7d397aa3cc4f00.gif)
Stella981 Stella981
3年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Eclipse插件开发_学习_00_资源帖
一、官方资料 1.eclipseapi(https://www.oschina.net/action/GoToLink?urlhttp%3A%2F%2Fhelp.eclipse.org%2Fmars%2Findex.jsp%3Ftopic%3D%252Forg.eclipse.platform.doc.isv%252Fguide%2
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这