InfluxDB集群 -- 创建database源码分析

公祖
• 阅读 1861

创建database,可以通过CLI命令,也可以通过HTTP,两种方式走的是同一套逻辑流程。

本文以HTTP为例,分析集群模式下创建database的源码流程。

curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"

database是集群的元信息,需要Raft强一致;
create database的request若被发送给follower节点,则返回NotLeader的redirect响应(带leader的url),client重新向Leader节点发送request。

整体流程:
InfluxDB集群 -- 创建database源码分析

HTTP handler

8086是httpd的监听端口,其hander:

// services/httpd/handler.go
func NewHandler(c Config) *Handler {
    ......
    h.AddRoutes([]Route{
        {
            "query", // Query serving route.
            "POST", "/query", true, true, h.serveQuery,
        },    
    })
}

继续向下走:

// services/httpd/handler.go
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User) {
    ...
    // Execute query.
    results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
    ...
}
// query/executor.go
func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) {
    ......
    // Send any other statements to the underlying statement executor.
    err = e.StatementExecutor.ExecuteStatement(stmt, ctx)
    ......
}

识别到Create database语句:

// cluster/statement_executor.go
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error {
    ...
    switch stmt := stmt.(type) {
    case *influxql.CreateDatabaseStatement:
        if ctx.ReadOnly {
            messages = append(messages, query.ReadOnlyWarning(stmt.String()))
        }
        err = e.executeCreateDatabaseStatement(stmt)
    ......
    }
}

最终是通过metaClient.CreateDatabase()完成调用:

// cluster/statement_executor.go
func (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.CreateDatabaseStatement) error {
    if !meta.ValidName(stmt.Name) {
        // TODO This should probably be in `(*meta.Data).CreateDatabase`
        // but can't go there until 1.1 is used everywhere
        return meta.ErrInvalidName
    }

    if !stmt.RetentionPolicyCreate {
        _, err := e.MetaClient.CreateDatabase(stmt.Name)
        return err
    }
    .....
}

metaClient

metaClient在执行CreateDatabase时:

  • 首先将CreateDatabase封装成1个command;
  • 然后将这个command通过http(8091) POST /execute发送到其它节点;

    • 若节点返回errRedirect,则提取Leader的url,重新尝试向Leader发送请求;
    • 重试maxRetries次,直到成功为止;

构造command并发送给集群执行:

// services/meta/client.go
func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
    if db := c.Database(name); db != nil {
        return db, nil
    }
    cmd := &internal.CreateDatabaseCommand{
        Name: proto.String(name),
    }
    err := c.retryUntilExec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, cmd)
    if err != nil {
        return nil, err
    }
    return c.Database(name), nil
}

发送POST /execute,重试maxRetries次,直到成功:

func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
    var index uint64
    tries := 0
    currentServer := 0
    var redirectServer string

    for {
        ...
        // build the url to hit the redirect server or the next metaserver
        var url string
        if redirectServer != "" {
            url = redirectServer
            redirectServer = ""
        } else {            
            server := c.metaServers[currentServer]
            url = fmt.Sprintf("http://%s/execute", server)            
        }

        index, err = c.exec(url, typ, desc, value)
        tries++
        currentServer++

        if err == nil {
            c.waitForIndex(index)
            return nil
        }
        if tries > maxRetries {
            return err
        }
        //如果返回redirect的response
        if e, ok := err.(errRedirect); ok {
            redirectServer = e.host
            continue
        }
        time.Sleep(errSleep)
    }
}

节点处理CreateDatabase的Command

节点监听在8091端口,处理POST /execute请求,request body是protobuf序列化的command:

// services/meta/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "POST":
        h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r)
    }
}

handler处理请求时,首先将body apply到store,若返回ErrNotLeader,则返回client redirect响应:

// services/meta/handler.go
func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
    .......
    var resp *internal.Response
    if err := h.store.apply(body); err != nil {
        // If we aren't the leader, redirect client to the leader.
        if err == raft.ErrNotLeader {
            l := h.store.leaderHTTP()            
            scheme := "http://"            
            l = scheme + l + "/execute"
            http.Redirect(w, r, l, http.StatusTemporaryRedirect)
            return
        }
        // Error wasn't a leadership error so pass it back to client.
        resp = &internal.Response{
            OK:    proto.Bool(false),
            Error: proto.String(err.Error()),
        }
    }
    ......
}

具体看下store.apply()做了什么事情:

// services/meta/store_fsm.go
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
    var cmd internal.Command
    if err := proto.Unmarshal(l.Data, &cmd); err != nil {
        panic(fmt.Errorf("cannot marshal command: %x", l.Data))
    }
    err := func() interface{} {
        switch cmd.GetType() {
        case internal.Command_CreateDatabaseCommand:
            return fsm.applyCreateDatabaseCommand(&cmd)
        }
    }()
    ....
}

跟添加节点类似,把新创建的database的信息,更新到fsm.data中:

// services/meta/store_fsm.go
func (fsm *storeFSM) applyCreateDatabaseCommand(cmd *internal.Command) interface{} {
    ext, _ := proto.GetExtension(cmd, internal.E_CreateDatabaseCommand_Command)
    v := ext.(*internal.CreateDatabaseCommand)
    other := fsm.data.Clone()
    if err := other.CreateDatabase(v.GetName()); err != nil {
        return err
    }
    ....
    fsm.data = other
    ....
}
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
android SQLite数据库使用实例
创建数据库帮助类DbHelper.java:packagecom.example.db;importandroid.content.Context;importandroid.database.sqlite.SQLiteDatabase;importandroid.database.sqlite.S
Chase620 Chase620
4年前
Dubbo 源码分析 - 集群容错之Directory
注:本系列文章已捐赠给Dubbo社区,你也可以在Dubbo中阅读本系列文章。1\.简介前面文章分析了服务的导出与引用过程,从本篇文章开始,我将开始分析Dubbo集群容错方面的源码。这部分源码包含四个部分,分别是服务目录Directory、服务路由Router、集群Cluster和负载均衡LoadBalance。这几个部分的源码逻辑比
Chase620 Chase620
4年前
Dubbo 源码分析 - 集群容错之 LoadBalance
Dubbo源码分析集群容错之LoadBalance注:本系列文章已捐赠给Dubbo社区,你也可以在Dubbo中阅读本系列文章。1.简介LoadBalance中文意思为负载均衡,它的职责是将网络
Wesley13 Wesley13
3年前
3. InfluxDB使用HTTP的API编写数据
InfluxDB前篇介绍Centos7下InfluxDB从安装开始到入门InfluxDB关键概念经过前面两个篇章的探讨,基本已经了解了InfluxDB的操作,下面再来继续看看使用HTTPAPI编写数据。使用HTTP的API请求创建数据库首先查看InfluxDB当前有哪些数据库了。root@server81
Stella981 Stella981
3年前
Scrapy使用入门及爬虫代理配置
本文通过一个简单的项目实现Scrapy采集流程。希望通过该项目对Scrapy的使用方法和框架能够有帮助。1\.工作流程重点流程如下:创建一个Scrapy项目。创建一个爬虫来抓取网站和处理数据。通过命令行将采集的内容进行分析。将分析的数据保存到MongoDB数据库。2\.准备环境安装
Wesley13 Wesley13
3年前
Java SQL基础
SQL基础1、DataBase创建:CREATEDATABASEDATABASE\_NAME ;使用:USEDATABASE\_NAME;删除:DROPDATABASEDATABASE\_NAME;2、Table创建CREATET
Wesley13 Wesley13
3年前
MySQL表的几个简单查询语句
1. 创建数据库CREATE DATABASE databasename2. 删除数据库drop database dbname3\.创建新表create table tabname(col1 type1 \not null\ \primary key\,col2 type2 \not   null\,..)    
Wesley13 Wesley13
3年前
Oracle12Dataguard启停
 检查selectname,open\_mode,database\_role,cdbfromv$database;selectname,open\_mode,con\_id,dbidfromv$containers;!psef|grepmrpshowcon\_nameshowparametersdb
Stella981 Stella981
3年前
InfluxDB学习之InfluxDB的基本概念
一、与传统数据库中的名词做比较influxDB中的名词传统数据库中的概念database数据库measurement数据库中的表points表里面的一行数据二、InfluxDB中独有的概念1)PointPoint由时间戳(time)、数据(field)、标签(tags)组成。Po
Wesley13 Wesley13
3年前
mongoDB的安全相关
开启认证:在配置文件里新增一行authtrue创建用户:1.创建语法:createUser2.{user:"<name",pwd:"<cleartextpassword",customData:{<anyinformation},roles:\{role:"<role",db:"<database"}\
Stella981 Stella981
3年前
Mybatis拦截器分析
【基本思路】拦截器在哪里拦截?什么情况下才会拦截代理?怎么代理呢?只要搞清楚这些,基本的拦截器功能也3拦截器实现原理mybatis支持拦截器,实现的原理就是利用JDK的动态代理。拦截器在哪里呢?mybatis到底提供几处可以拦截呢?请看下图,通过分析源码可知基本查询流程如下:!mybatis的interceptor拦截器流程图(http:
公祖
公祖
Lv1
抽刀断水水更流,举杯销愁愁更愁。
文章
4
粉丝
0
获赞
0