go micro registry

蔡阳
• 阅读 2759

micro.newService()中newOptions

func newOptions(opts ...Option) Options {
    opt := Options{
        Auth:      auth.DefaultAuth,
        Broker:    broker.DefaultBroker,
        Cmd:       cmd.DefaultCmd,
        Config:    config.DefaultConfig,
        Client:    client.DefaultClient,
        Server:    server.DefaultServer,
        Store:     store.DefaultStore,
        Registry:  registry.DefaultRegistry,
        Router:    router.DefaultRouter,
        Runtime:   runtime.DefaultRuntime,
        Transport: transport.DefaultTransport,
        Context:   context.Background(),
        Signal:    true,
    }

    for _, o := range opts {
        o(&opt)
    }

    return opt
}

初始化了一堆基础设置,来看看Registry
registry.DefaultRegistry,
在registry/registry.go中的
DefaultRegistry = NewRegistry()

// NewRegistry returns a new default registry which is mdns
func NewRegistry(opts ...Option) Registry {
    return newRegistry(opts...)
}

func newRegistry(opts ...Option) Registry {
    options := Options{
        Context: context.Background(),
        Timeout: time.Millisecond * 100,
    }

    for _, o := range opts {
        o(&options)
    }

    // set the domain
    defaultDomain := DefaultDomain

    d, ok := options.Context.Value("mdns.domain").(string)
    if ok {
        defaultDomain = d
    }

    return &mdnsRegistry{
        defaultDomain: defaultDomain,
        globalDomain:  globalDomain,
        opts:          options,
        domains:       make(map[string]services),
        watchers:      make(map[string]*mdnsWatcher),
    }
}

这里做了以下事情:

  1. 初始化并设置Options
  2. 设置defaultDomain,默认micro,如果options.Context中定义了mdns.domain,则使用这里定义的
  3. 返回mdnsRegistry{}实例

在micro server篇中介绍了service的启动过程
service.Run()中调用了s.Start()
s.Start()中调用了s.opts.Server.Start(),这里的s.opts.Server就是micro/defaults.go中定义的server.DefaultServer = gsrv.NewServer()

那我们去看server/grpc/grpc.go中的Start()

func (g *grpcServer) Start() error {
    g.RLock()
    if g.started {
        g.RUnlock()
        return nil
    }
    g.RUnlock()

    config := g.Options()

    // micro: config.Transport.Listen(config.Address)
    var ts net.Listener

    if l := g.getListener(); l != nil {
        ts = l
    } else {
        var err error

        // check the tls config for secure connect
        if tc := config.TLSConfig; tc != nil {
            ts, err = tls.Listen("tcp", config.Address, tc)
            // otherwise just plain tcp listener
        } else {
            ts, err = net.Listen("tcp", config.Address)
        }
        if err != nil {
            return err
        }
    }

    if g.opts.Context != nil {
        if c, ok := g.opts.Context.Value(maxConnKey{}).(int); ok && c > 0 {
            ts = netutil.LimitListener(ts, c)
        }
    }

    if logger.V(logger.InfoLevel, logger.DefaultLogger) {
        logger.Infof("Server [grpc] Listening on %s", ts.Addr().String())
    }
    g.Lock()
    g.opts.Address = ts.Addr().String()
    g.Unlock()

    // only connect if we're subscribed
    if len(g.subscribers) > 0 {
        // connect to the broker
        if err := config.Broker.Connect(); err != nil {
            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
            }
            return err
        }

        if logger.V(logger.InfoLevel, logger.DefaultLogger) {
            logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
        }
    }

    // announce self to the world
    if err := g.Register(); err != nil {
        if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
            logger.Errorf("Server register error: %v", err)
        }
    }

    // micro: go ts.Accept(s.accept)
    go func() {
        if err := g.srv.Serve(ts); err != nil {
            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                logger.Errorf("gRPC Server start error: %v", err)
            }
        }
    }()

    go func() {
        t := new(time.Ticker)

        // only process if it exists
        if g.opts.RegisterInterval > time.Duration(0) {
            // new ticker
            t = time.NewTicker(g.opts.RegisterInterval)
        }

        // return error chan
        var ch chan error

    Loop:
        for {
            select {
            // register self on interval
            case <-t.C:
                if err := g.Register(); err != nil {
                    if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                        logger.Error("Server register error: ", err)
                    }
                }
            // wait for exit
            case ch = <-g.exit:
                break Loop
            }
        }

        // deregister self
        if err := g.Deregister(); err != nil {
            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                logger.Error("Server deregister error: ", err)
            }
        }

        // wait for waitgroup
        if g.wg != nil {
            g.wg.Wait()
        }

        // stop the grpc server
        exit := make(chan bool)

        go func() {
            g.srv.GracefulStop()
            close(exit)
        }()

        select {
        case <-exit:
        case <-time.After(time.Second):
            g.srv.Stop()
        }

        // close transport
        ch <- nil

        if logger.V(logger.InfoLevel, logger.DefaultLogger) {
            logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
        }
        // disconnect broker
        if err := config.Broker.Disconnect(); err != nil {
            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
            }
        }
    }()

    // mark the server as started
    g.Lock()
    g.started = true
    g.Unlock()

    return nil
}

这个过程在micro server篇中有介绍,现在只看registry部分,注册后开一个协程定时注册

g.Register()注册到服务发现

func (g *grpcServer) Register() error {
    g.RLock()
    rsvc := g.rsvc
    config := g.opts
    g.RUnlock()

    regFunc := func(service *registry.Service) error {
        var regErr error

        for i := 0; i < 3; i++ {
            // set the ttl and namespace
            rOpts := []registry.RegisterOption{
                registry.RegisterTTL(config.RegisterTTL),
                registry.RegisterDomain(g.opts.Namespace),
            }

            // attempt to register
            if err := config.Registry.Register(service, rOpts...); err != nil {
                // set the error
                regErr = err
                // backoff then retry
                time.Sleep(backoff.Do(i + 1))
                continue
            }
            // success so nil error
            regErr = nil
            break
        }

        return regErr
    }

    // if service already filled, reuse it and return early
    if rsvc != nil {
        if err := regFunc(rsvc); err != nil {
            return err
        }
        return nil
    }

    var err error
    var advt, host, port string
    var cacheService bool

    // check the advertise address first
    // if it exists then use it, otherwise
    // use the address
    if len(config.Advertise) > 0 {
        advt = config.Advertise
    } else {
        advt = config.Address
    }

    if cnt := strings.Count(advt, ":"); cnt >= 1 {
        // ipv6 address in format [host]:port or ipv4 host:port
        host, port, err = net.SplitHostPort(advt)
        if err != nil {
            return err
        }
    } else {
        host = advt
    }

    if ip := net.ParseIP(host); ip != nil {
        cacheService = true
    }

    addr, err := addr.Extract(host)
    if err != nil {
        return err
    }

    // make copy of metadata
    md := meta.Copy(config.Metadata)

    // register service
    node := &registry.Node{
        Id:       config.Name + "-" + config.Id,
        Address:  mnet.HostPort(addr, port),
        Metadata: md,
    }

    node.Metadata["broker"] = config.Broker.String()
    node.Metadata["registry"] = config.Registry.String()
    node.Metadata["server"] = g.String()
    node.Metadata["transport"] = g.String()
    node.Metadata["protocol"] = "grpc"

    g.RLock()
    // Maps are ordered randomly, sort the keys for consistency
    var handlerList []string
    for n, e := range g.handlers {
        // Only advertise non internal handlers
        if !e.Options().Internal {
            handlerList = append(handlerList, n)
        }
    }
    sort.Strings(handlerList)

    var subscriberList []*subscriber
    for e := range g.subscribers {
        // Only advertise non internal subscribers
        if !e.Options().Internal {
            subscriberList = append(subscriberList, e)
        }
    }
    sort.Slice(subscriberList, func(i, j int) bool {
        return subscriberList[i].topic > subscriberList[j].topic
    })

    endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))
    for _, n := range handlerList {
        endpoints = append(endpoints, g.handlers[n].Endpoints()...)
    }
    for _, e := range subscriberList {
        endpoints = append(endpoints, e.Endpoints()...)
    }
    g.RUnlock()

    service := &registry.Service{
        Name:      config.Name,
        Version:   config.Version,
        Nodes:     []*registry.Node{node},
        Endpoints: endpoints,
    }

    g.RLock()
    registered := g.registered
    g.RUnlock()

    if !registered {
        if logger.V(logger.InfoLevel, logger.DefaultLogger) {
            logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
        }
    }

    // register the service
    if err := regFunc(service); err != nil {
        return err
    }

    // already registered? don't need to register subscribers
    if registered {
        return nil
    }

    g.Lock()
    defer g.Unlock()

    for sb := range g.subscribers {
        handler := g.createSubHandler(sb, g.opts)
        var opts []broker.SubscribeOption
        if queue := sb.Options().Queue; len(queue) > 0 {
            opts = append(opts, broker.Queue(queue))
        }

        if cx := sb.Options().Context; cx != nil {
            opts = append(opts, broker.SubscribeContext(cx))
        }

        if !sb.Options().AutoAck {
            opts = append(opts, broker.DisableAutoAck())
        }

        if logger.V(logger.InfoLevel, logger.DefaultLogger) {
            logger.Infof("Subscribing to topic: %s", sb.Topic())
        }
        sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
        if err != nil {
            return err
        }
        g.subscribers[sb] = []broker.Subscriber{sub}
    }

    g.registered = true
    if cacheService {
        g.rsvc = service
    }

    return nil
}

这里做了以下事情:

  1. rsvc := g.rsvc,定义regFunc()

    1. 定义[]registry.RegisterOption{}
    2. 调用config.Registry.Register()注册,失败会重试3次
  2. rsvc不为空就调用regFunc()注册了并返回了,空就往下走,继续注册
  3. 验证host,port,复制metadata,定义registry.Node{},在metadata中增加broker,registry,server,transport,protocol
  4. g.handlers放到handlerList中(非内部handle),排个序,保持一致性。g.subscribers也放到subscriberList,按topic排序。最后都放入endpoints
  5. 定义registry.Service{},调用regFunc()注册,如果没有错误,也没有订阅需要处理就返回
  6. 处理订阅

到registry/mdns_registry.go中看看Register()

func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error {
    m.Lock()

    // parse the options
    var options RegisterOptions
    for _, o := range opts {
        o(&options)
    }
    if len(options.Domain) == 0 {
        options.Domain = m.defaultDomain
    }

    // create the domain in the memory store if it doesn't yet exist
    if _, ok := m.domains[options.Domain]; !ok {
        m.domains[options.Domain] = make(services)
    }

    // create the wildcard entry used for list queries in this domain
    entries, ok := m.domains[options.Domain][service.Name]
    if !ok {
        entry, err := createServiceMDNSEntry(service.Name, options.Domain)
        if err != nil {
            m.Unlock()
            return err
        }
        entries = append(entries, entry)
    }

    var gerr error
    for _, node := range service.Nodes {
        var seen bool

        for _, entry := range entries {
            if node.Id == entry.id {
                seen = true
                break
            }
        }

        // this node has already been registered, continue
        if seen {
            continue
        }

        txt, err := encode(&mdnsTxt{
            Service:   service.Name,
            Version:   service.Version,
            Endpoints: service.Endpoints,
            Metadata:  node.Metadata,
        })

        if err != nil {
            gerr = err
            continue
        }

        host, pt, err := net.SplitHostPort(node.Address)
        if err != nil {
            gerr = err
            continue
        }
        port, _ := strconv.Atoi(pt)

        if logger.V(logger.DebugLevel, logger.DefaultLogger) {
            logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)
        }
        // we got here, new node
        s, err := mdns.NewMDNSService(
            node.Id,
            service.Name,
            options.Domain+".",
            "",
            port,
            []net.IP{net.ParseIP(host)},
            txt,
        )
        if err != nil {
            gerr = err
            continue
        }

        srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true})
        if err != nil {
            gerr = err
            continue
        }

        entries = append(entries, &mdnsEntry{id: node.Id, node: srv})
    }

    // save the mdns entry
    m.domains[options.Domain][service.Name] = entries
    m.Unlock()

    // register in the global Domain so it can be queried as one
    if options.Domain != m.globalDomain {
        srv := *service
        srv.Nodes = nil

        for _, n := range service.Nodes {
            node := n

            // set the original domain in node metadata
            if node.Metadata == nil {
                node.Metadata = map[string]string{"domain": options.Domain}
            } else {
                node.Metadata["domain"] = options.Domain
            }

            srv.Nodes = append(srv.Nodes, node)
        }

        if err := m.Register(service, append(opts, RegisterDomain(m.globalDomain))...); err != nil {
            gerr = err
        }
    }

    return gerr
}

这里做了以下事情:

  1. 设置optionsentries
  2. 创建m.domains[options.Domain],并赋值entries
  3. 循环每个service.Nodes,entries看有没有注册过,有就跳过
  4. 编码mdnsTxt{},调用mdns.NewMDNSService()得到一个新node,在调用mdns.NewServer()得到mdns.Server,包装到mdnsEntry{},放入entries,在存入m.domainsoptions.Domain
  5. 如果options.Domain != m.globalDomain,设置service.node中的Metadata["domain"]为options.Domain,注册到global Domain中

这里是默认的mdns实现,实际使用中可以指定consul,etcd等,具体的流程请见各自的Register()

go micro 分析系列文章
go micro server 启动分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 网关鉴权
go micro 链路追踪
go micro 熔断与限流
go micro wrapper 中间件
go micro metrics 接入Prometheus、Grafana

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Easter79 Easter79
3年前
typeScript数据类型
//布尔类型letisDone:booleanfalse;//数字类型所有数字都是浮点数numberletdecLiteral:number6;lethexLiteral:number0xf00d;letbinaryLiteral:number0b101
Wesley13 Wesley13
3年前
VBox 启动虚拟机失败
在Vbox(5.0.8版本)启动Ubuntu的虚拟机时,遇到错误信息:NtCreateFile(\\Device\\VBoxDrvStub)failed:0xc000000034STATUS\_OBJECT\_NAME\_NOT\_FOUND(0retries) (rc101)Makesurethekern
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Wesley13 Wesley13
3年前
Java日期时间API系列36
  十二时辰,古代劳动人民把一昼夜划分成十二个时段,每一个时段叫一个时辰。二十四小时和十二时辰对照表:时辰时间24时制子时深夜11:00凌晨01:0023:0001:00丑时上午01:00上午03:0001:0003:00寅时上午03:00上午0
Wesley13 Wesley13
3年前
MBR笔记
<bochs:100000000000e\WGUI\Simclientsize(0,0)!stretchedsize(640,480)!<bochs:2b0x7c00<bochs:3c00000003740i\BIOS\$Revision:1.166$$Date:2006/08/1117
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
蔡阳
蔡阳
Lv1
如果有来世,我一定活出你喜欢的样子。
文章
3
粉丝
0
获赞
0