主从同步原理
所有数据库同步原理几乎一样,MongoDB解析oplog,Mysql解析bin.log,今天实现了MongoDB同步机制,请关注小编下次更新Mysql同步机制。
- intial sync:初始化所有数据。
- replication:根据oplog实现增量同步。
初始化所有数据这个不说了,以下代码根据oplog实时读取并同步数据。
Change Stream
MongoDB官网提供操作流,通过watch机制监听oplog变更并反向通知程序。
MongoDB官网给出oplog操作类型:
- insert:添加数据
- delete:删除数据
- replace:替换数据
- update:更新数据
- drop:删除表
- rename:修改表名
- dropDatabase:删除数据库
- invalidate:drop/rename/dropDatabase 将导致invalidate被触发,并关闭 change stream
还可以提供了监听条件、开始监听时间、Resume Tokens断点恢复等功能。
注:断点恢复也属于监听条件,只支持一个监听条件
Change Events解析
watch监听返回信息
{
   
    
        _id : {
   
     // 存储元信息
            "_data" : <BinData|hex string> // resumeToken,用于断点恢复
        },
        "operationType" : "<operation>", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate,部分仅支持4.0后的版本,详情见下
        "fullDocument" : {
   
     <document> }, // 修改后的数据,出现在insert, replace, delete, update的事件中
        "ns" : {
   
     // namespace
            "db" : "<database>", // 操作库名
            "coll" : "<collection" // 操作表名
        },
        "to" : {
   
     // 只在operationType为rename的时候有效,表示改名以后的namespace
            "db" : "<database>",
            "coll" : "<collection"
        },
        "documentKey" : {
   
     "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update事件中。正常只包含_id,对于sharded collection,还包括shard key
        "updateDescription" : {
   
     // 只在operationType为update的时候出现,相当于是增量的修改,而replace是替换
            "updatedFields" : {
   
     <document> }, // 更新的field的值
            "removedFields" : [ "<field>", ... ] // 删除的field列表
        },
        "clusterTime" : <Timestamp>, // 相当于ts字段
        "txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增
        "lsid" : {
   
     // 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id
            "id" : <UUID>,
            "uid" : <BinData>
        }
    }
go 代码
新建go mod工程,目录如下: 
  
utils可以忽略,小编自己写的映射。
- 获取mongo连接包: go get go.mongodb.org/mongo-driver/mongo
initMongo.go代码:
package mongo
import (
    "context"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "log"
    "time"
)
func initMasterDBClient() *mongo.Database {
   
    
    var err error
    clientOptions := options.Client().ApplyURI("mongodb://ip:端口/?connect=direct").SetConnectTimeout(5 * time.Second)
    // 连接到MongoDB
    client, err := mongo.Connect(context.TODO(), clientOptions)
    if err != nil {
   
    
        log.Fatal(err)
    }
    //选择数据库
    return client.Database("数据库")
}
func initLiveDBClient() *mongo.Database {
   
    
    var err error
    clientOptions := options.Client().ApplyURI("mongodb://ip:端口/?connect=direct").SetConnectTimeout(5 * time.Second)
    // 连接到MongoDB
    client, err := mongo.Connect(context.TODO(), clientOptions)
    if err != nil {
   
    
        log.Fatal(err)
    }
    //选择数据库
    return client.Database("数据库")
}
func initSlaveDBClient() *mongo.Database {
   
    
    var err error
    clientOptions := options.Client().ApplyURI("mongodb://ip:端口/?connect=direct").SetConnectTimeout(5 * time.Second)
    // 连接到MongoDB
    client, err := mongo.Connect(context.TODO(), clientOptions)
    if err != nil {
   
    
        log.Fatal(err)
    }
    //选择数据库
    return client.Database("数据库")
}
sync.go代码:
package mongo
import (
    "context"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/bson/primitive"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "log"
    "time"
)
type StreamObject struct {
   
    
    Id                *WatchId `bson:"_id"`
    OperationType     string
    FullDocument      map[string]interface{
   
    }
    Ns                NS
    UpdateDescription map[string]interface{
   
    }
    DocumentKey       map[string]interface{
   
    }
}
type NS struct {
   
    
    Database   string `bson:"db"`
    Collection string `bson:"coll"`
}
type WatchId struct {
   
    
    Data string `bson:"_data"`
}
const (
    OperationTypeInsert  = "insert"
    OperationTypeDelete  = "delete"
    OperationTypeUpdate  = "update"
    OperationTypeReplace = "replace"
)
var resumeToken bson.Raw
func Sync() {
   
    
    go syncMaster()
    for {
   
    
        time.Sleep(2 * time.Second)
    }
}
func syncMaster() {
   
    
    for {
   
    
        //获得主库数据连接
        client := initMasterDBClient()
        watch(client)
    }
}
func watch(client *mongo.Database) {
   
    
    defer func() {
   
    
        err := recover()
        if err != nil {
   
    
            log.Printf("同步出现异常: %+v \n", err)
        }
    }()
    //设置过滤条件
    pipeline := mongo.Pipeline{
   
    
        bson.D{
   
    {
   
    "$match",
            bson.M{
   
    "operationType": bson.M{
   
    "$in": bson.A{
   
    "insert", "delete", "replace", "update"}}},
        }},
    }
    //当前时间前一小时
    now := time.Now()
    m, _ := time.ParseDuration("-1h")
    now = now.Add(m)
    timestamp := &primitive.Timestamp{
   
    
        T: uint32(now.Unix()),
        I: 0,
    }
    //设置监听option
    opt := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetStartAtOperationTime(timestamp)
    if resumeToken != nil {
   
    
        opt.SetResumeAfter(resumeToken)
        opt.SetStartAtOperationTime(nil)
    }
    //获得watch监听
    watch, err := client.Watch(context.TODO(), pipeline, opt)
    if err != nil {
   
    
        log.Fatal("watch监听失败:", err)
    }
    //获得从库连接
    slaveClient := initSlaveDBClient()
    for watch.Next(context.TODO()) {
   
    
        var stream StreamObject
        err = watch.Decode(&stream)
        if err != nil {
   
    
            log.Println("watch数据失败:", err)
        }
        log.Println("=============", stream.FullDocument["_id"])
        //保存现在resumeToken
        resumeToken = watch.ResumeToken()
        switch stream.OperationType {
   
    
        case OperationTypeInsert:
            syncInsert(slaveClient, stream)
        case OperationTypeDelete:
            filter := bson.M{
   
    "_id": stream.FullDocument["_id"]}
            _, err := slaveClient.Collection(stream.Ns.Collection).DeleteOne(context.TODO(), filter)
            if err != nil {
   
    
                log.Println("删除失败:", err)
            }
        case OperationTypeUpdate:
            filter := bson.M{
   
    "_id": stream.FullDocument["_id"]}
            update := bson.M{
   
    "$set": stream.FullDocument}
            _, err := slaveClient.Collection(stream.Ns.Collection).UpdateOne(context.TODO(), filter, update)
            if err != nil {
   
    
                log.Println("更新失败:", err)
            }
        case OperationTypeReplace:
            filter := bson.M{
   
    "_id": stream.FullDocument["_id"]}
            _, err := slaveClient.Collection(stream.Ns.Collection).ReplaceOne(context.TODO(), filter, stream.FullDocument)
            if err != nil {
   
    
                log.Println("替换失败:", err)
            }
        }
    }
}
func syncInsert(slaveClient *mongo.Database, stream StreamObject) {
   
    
    defer func() {
   
    
        _ = recover()
    }()
    _, err := slaveClient.Collection(stream.Ns.Collection).InsertOne(context.TODO(), stream.FullDocument)
    if err != nil {
   
    
        log.Println("插入失败:", err)
    }
}
main.go代码:
package main
import (
    "moneky-data-sync/mongo"
)
func main() {
   
    
    mongo.Sync()
}
 
  
  
  
 
 
  
 
 
  
 