Dapr源码解析-五分钟快速添加并构建你的自定义组件

安全建
• 阅读 1333

今天跟大家分享一个小教程,如何自己动手写一个新组件,并加载到Dapr runtime。

前期准备

  1. 关于Dapr runtime的启动过程,如何加载和实例化组件可以参考之前写的文章Dapr runtime 启动过程
  2. Dapr开发环境配置,可以参考Dapr 开发环境配置

开始编码

克隆Dapr和 component-contrib

cd $GOPATH/src

# Clone dapr
mkdir -p github.com/dapr/dapr
git clone https://github.com/dapr/dapr.git github.com/dapr/dapr

# Clone component-contrib
mkdir -p github.com/dapr/components-contrib
git clone https://github.com/dapr/components-contrib.git github.com/dapr/components-contrib

编写组件

假设,我们组件名字叫:Mytest
首先在component-contrib项目下,创建mytest包。

  1. 在mytest文件夹创建接口约束文件:github.com/dapr/components-contrib/mytest/mytest.go
    mytest组件提供两个方法,Init和Info
package mytest

type Mytest interface {
    // Init this component.
    Init(metadata Metadata)

    // Info show method
    Info(info string)
}
  1. metadata.go:接收组件配置文件YAML中定义的Metadata字段
package mytest

type Metadata struct {
    Properties map[string]string `json:"properties"`
}

3.为mytest组件提供两种实现方式:demof和demos
在mytest包里创建demof包和demos包,分别在demof.go和demos.go实现Mytest组件
demof.go

package demof

import (
    "github.com/dapr/components-contrib/mytest"
    "github.com/dapr/kit/logger"
)

type Demof struct {
    logger logger.Logger
}

func NewDemof(logger logger.Logger) *Demof {
    d := &Demof{
        logger: logger,
    }

    return d
}

func (d *Demof) Init(metadata mytest.Metadata) {
    d.logger.Info(metadata)
}

func (d *Demof) Info(info string) {
    d.logger.Info("this is Demof, I received %s", info)
}

demos.go

package demos

import (
    "github.com/dapr/components-contrib/mytest"
    "github.com/dapr/kit/logger"
)

type Demos struct {
    logger logger.Logger
}

func NewDemos(logger logger.Logger) *Demos {
    d := &Demos{
        logger: logger,
    }

    return d
}

func (d *Demos) Init(metadata mytest.Metadata) {
    d.logger.Info(metadata)
}

func (d *Demos) Info(info string) {
    d.logger.Info("this is Demos, I received %s", info)
}

至此Mytest组件已经具备了可以执行的所有要素,目录解构如下

component-contrib
    |_mytest
        |_demof
            |_demof.go
        |_demos
            |_demos.go
        metadata.go
        mytest.go

将Mytest组件注册到Dapr runtime

  1. 实现Mytest组件注册接口github.com/dapr/dapr/pkg/components/mytest/registry.go
package mytest

import (
    "strings"

    "github.com/pkg/errors"

    "github.com/dapr/components-contrib/mytest"
    "github.com/dapr/dapr/pkg/components"
)

type Mytest struct {
    Name          string
    FactoryMethod func() mytest.Mytest
}

func New(name string, factoryMethod func() mytest.Mytest) Mytest {
    return Mytest{
        Name:          name,
        FactoryMethod: factoryMethod,
    }
}

type Registry interface {
    Register(components ...Mytest)
    Create(name, version string) (mytest.Mytest, error)
}

type mytestRegistry struct {
    mytests map[string]func() mytest.Mytest
}

func NewRegistry() Registry {
    return &mytestRegistry{
        mytests: map[string]func() mytest.Mytest{},
    }
}

func (t *mytestRegistry) Register(components ...Mytest) {
    for _, component := range components {
        t.mytests[createFullName(component.Name)] = component.FactoryMethod
    }
}

func (t *mytestRegistry) Create(name, version string) (mytest.Mytest, error) {
    if method, ok := t.getMytest(name, version); ok {
        return method(), nil
    }
    return nil, errors.Errorf("couldn't find Mytest %s/%s", name, version)
}

func (t *mytestRegistry) getMytest(name, version string) (func() mytest.Mytest, bool) {
    nameLower := strings.ToLower(name)
    versionLower := strings.ToLower(version)
    mytestFn, ok := t.mytests[nameLower+"/"+versionLower]
    if ok {
        return mytestFn, true
    }
    if components.IsInitialVersion(versionLower) {
        mytestFn, ok = t.mytests[nameLower]
    }
    return mytestFn, ok
}

func createFullName(name string) string {
    return strings.ToLower("mytest." + name)
}

2.更新runtime 组件发现和注册机制
github.com/dapr/dapr/pkg/runtime/options.go
扩展runtimeOpts

import(
"github.com/dapr/dapr/pkg/components/mytest"
)
runtimeOpts struct {
    ...
    mytests     []mytest.Mytest
}

添加runtime组件注册函数

func WithMytests(mytests ...mytest.Mytest) Option {
    return func(o *runtimeOpts) {
        o.mytests = append(o.mytests, mytests...)
    }
}

更新runtime启动流程,注册Mytest组件
github.com/dapr/dapr/pkg/runtime/runtime.go

import(
....
"github.com/dapr/components-contrib/mytest"
    mytest_loader "github.com/dapr/dapr/pkg/components/mytest"
)

...
//更新变量声明
const (
    mytestComponent             ComponentCategory = "mytest"
)
var componentCategoriesNeedProcess = []ComponentCategory{
    ...
    mytestComponent,
}
...
// 扩展 DaprRuntime  components of the runtime.
type DaprRuntime struct {
    ...
    mytestRegistry mytest_loader.Registry
    mytests        map[string]mytest.Mytest
}

// NewDaprRuntime returns a new runtime with the given runtime config and global config.
func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration, accessControlList *config.AccessControlList, resiliencyProvider resiliency.Provider) *DaprRuntime {
    ctx, cancel := context.WithCancel(context.Background())
    return &DaprRuntime{
        ...
        mytestRegistry: mytest_loader.NewRegistry(),
        mytests:        map[string]mytest.Mytest{},
    }
}

...
//初始化runtime
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    ...
    a.mytestRegistry.Register(opts.mytests...)
    ...
}

//本示例仅以http api接口为例
func (a *DaprRuntime) startHTTPServer(port int, publicPort *int, profilePort int, allowedOrigins string, pipeline http_middleware.Pipeline) error {
    a.daprHTTPAPI = http.NewAPI(a.runtimeConfig.ID,
        a.appChannel,
        a.directMessaging,
        a.getComponents,
        a.resiliency,
        a.stateStores,
        a.lockStores,
        a.secretStores,
        a.secretsConfiguration,
        a.configurationStores,
        a.getPublishAdapter(),
        a.actor,
        a.sendToOutputBinding,
        a.globalConfig.Spec.TracingSpec,
        a.ShutdownWithWait,
        a.getComponentsCapabilitesMap,
        a.mytests,
    )
...
}

//runtime初始化过程从components目录 发现并初始化组件
func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
    switch category {
    ...
    case mytestComponent:
        return a.initMytest(comp)
    }
    return nil
}

func (a *DaprRuntime) initMytest(c components_v1alpha1.Component) error {
    mytestIns, err := a.mytestRegistry.Create(c.Spec.Type, c.Spec.Version)
    if err != nil {
        log.Errorf("error create component %s: %s", c.ObjectMeta.Name, err)
    }
    a.mytests[c.ObjectMeta.Name] = mytestIns
    properties := a.convertMetadataItemsToProperties(c.Spec.Metadata)
    log.Debug("properties is ", properties)
    mytestIns.Init(mytest.Metadata{
        Properties: properties,
    })
    return err
}

实现Dapr http api调用接口

github.com/dapr/dapr/pkg/http/api.go

import (
"github.com/dapr/components-contrib/mytest"
)

type api struct {
    ....
    mytests              map[string]mytest.Mytest
}

const (
    ...
    mytestParam          = "mytestName"
)

// NewAPI returns a new API.
func NewAPI(
    ...
    mytests map[string]mytest.Mytest,
) API {
    ...
    api := &api{
        ...
        mytests:              mytests,
    }
    ...
    api.endpoints = append(api.endpoints, api.constructMytestEndpoints()...)

    return api
}
/**
 * regist Mytest component api
 */
func (a *api) constructMytestEndpoints() []Endpoint {
    return []Endpoint{
        {
            Methods: []string{fasthttp.MethodGet, fasthttp.MethodPost},
            Route:   "mytest/{mytestName}/info",
            Version: apiVersionV1,
            Handler: a.onMytestInfo,
        },
    }
}

/**
 * switch Mytest component instance
 */
func (a *api) getMytestWithRequestValidation(reqCtx *fasthttp.RequestCtx) (mytest.Mytest, string, error) {
    if a.mytests == nil || len(a.mytests) == 0 {
        msg := NewErrorResponse("ERR_MYTEST_NOT_CONFIGURED", messages.ErrMytestNotFound)
        respond(reqCtx, withError(fasthttp.StatusInternalServerError, msg))
        log.Debug(msg)
        return nil, "", errors.New(msg.Message)
    }

    mytestName := a.getMytestName(reqCtx)
    if a.mytests[mytestName] == nil {
        msg := NewErrorResponse("ERR_MYTEST_NOT_CONFIGURED", fmt.Sprintf(messages.ErrMytestNotFound, mytestName))
        respond(reqCtx, withError(fasthttp.StatusBadRequest, msg))
        log.Debug(msg)
        return nil, "", errors.New(msg.Message)
    }
    return a.mytests[mytestName], mytestName, nil
}

func (a *api) getMytestName(reqCtx *fasthttp.RequestCtx) string {
    return reqCtx.UserValue(mytestParam).(string)
}

func (a *api) onMytestInfo(reqCtx *fasthttp.RequestCtx) {
    log.Debug("calling mytest components")
    mytestInstance, _, err := a.getMytestWithRequestValidation(reqCtx)
    if err != nil {
        log.Debug(err)
        return
    }

    mytestInstance.Info()
    respond(reqCtx, withEmpty())
}

Dapr runtime初始化入口

github.com/dapr/dapr/cmd/daprd/main.go

import (
    //mytest demo
    "github.com/dapr/components-contrib/mytest"
    mytest_demof "github.com/dapr/components-contrib/mytest/demof"
    mytest_demos "github.com/dapr/components-contrib/mytest/demos"
    mytest_loader "github.com/dapr/dapr/pkg/components/mytest"
)
...

runtime.WithSecretStores(
...
        runtime.WithMytests(
            mytest_loader.New("demof", func() mytest.Mytest {
                return mytest_demof.NewDemof(logContrib)
            }),
            mytest_loader.New("demos", func() mytest.Mytest {
                return mytest_demos.NewDemos(logContrib)
            }),
        ),
...
)
....

在component目录中添加mytest.yaml 配置文件

默认是在.dapr/components目录

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: demof
spec:
  type: mytest.demof
  version: v1
  metadata:
  - name: mName
    value: mValue

编译并运行Dapr

go mod edit -replace github.com/dapr/components-contrib=../components-contrib

go mod tidy

make DEBUG=1 build

# Back up the current daprd
cp ~/.dapr/bin/daprd ~/.dapr/bin/daprd.bak
cp ./dist/darwin_amd64/debug/daprd ~/.dapr/bin

dapr run --app-id myapp --dapr-http-port 3500  --log-level debug

观察Dapr启动日志

DEBU[0000] loading component. name: demof, type: mytest.demof/v1  app_id=myapp instance=MacBook-Pro-3.local scope=dapr.runtime type=log ver=edge
INFO[0000] component loaded. name: demof, type: mytest.demof/v1  app_id=myapp instance=MacBook-Pro-3.local scope=dapr.runtime type=log ver=edge
INFO[0000] {map[mName:mValue]}                           app_id=myapp instance=MacBook-Pro-3.local scope=dapr.contrib type=log ver=edge

demof组件已经加载成功,接下来使用http api接口尝试调用demof info接口

curl http://localhost:3500/v1.0/mytest/demof/info

日志记录到了我们在组件中打印的信息

INFO[0126] this is Demof, I'm working                    app_id=myapp instance=MacBook-Pro-3.local scope=dapr.contrib type=log ver=edge

修改一下mytest.yaml 配置文件,实例化demos

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: demos
spec:
  type: mytest.demos
  version: v1
  metadata:
  - name: mName
    value: mValue

尝试调用demos info接口

curl http://localhost:3500/v1.0/mytest/demos/info
INFO[0058] this is Demos, I'm working                    app_id=myapp instance=MacBook-Pro-3.local scope=dapr.contrib type=log ver=edge

组件正常工作。

以上示例代码已经放在github上
https://github.com/RcXu/dapr.git
https://github.com/RcXu/components-contrib.git
分支名为:dev-demo

多谢阅读,敬请斧正!

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
徐小夕 徐小夕
4年前
《精通react/vue组件设计》之5分钟教你实现一个极具创意的加载(Loading)组件
前言本文是笔者写组件设计的第八篇文章,今天带大家用5分钟实现一个极具创意的加载(loading)组件.涉及的核心知识点主要是css3相关特性,如果大家非常熟悉,可直接跳过介绍直接看正文.时刻问自己:是否具备创造力?笔记前端组件的一般分类:通用型组件:比如Button,Icon等.布局型组件:比如Grid,Layout布
Easter79 Easter79
3年前
Taro小程序自定义顶部导航栏
微信自带的顶部导航栏是无法支持自定义icon和增加元素的,在开发小程序的时候自带的根本满足不了需求,分享一个封装好的组件,支持自定义icon、扩展dom,适配安卓、ios、h5,全面屏。我用的是京东的Taro多端编译框架写的小程序,原生的也可以适用,用到的微信/taro的api做调整就行,实现效果如下。!在这里插入图片描述(https://i
Wesley13 Wesley13
3年前
VBox 启动虚拟机失败
在Vbox(5.0.8版本)启动Ubuntu的虚拟机时,遇到错误信息:NtCreateFile(\\Device\\VBoxDrvStub)failed:0xc000000034STATUS\_OBJECT\_NAME\_NOT\_FOUND(0retries) (rc101)Makesurethekern
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
如来佛祖 如来佛祖
1年前
李术铜-从0手写自己的Linux X86操作系统|完结无密
李术铜从0手写自己的LinuxX86操作系统|完结无密Linuxx86操作系统启动流程详解概述Linuxx86操作系统的启动流程是一个复杂的过程,涉及硬件自检、BIOS引导、加载引导装载程序、内核加载、文件系统挂载等一系列步骤。这个过程对于理解Linux操