Flynn的服务发现组件(discoverd)部署在所有的Flynn集群节点上,该组件使用了Raft协议保证数据的一致性,目前采用的是hashicorp的实现(https://github.com/hashicorp/raft),我们知道在Raft协议中,如果参与选举的节点太多,会导致性能下降,那是不是说Flynn不支持大规模的节点呢?
Flynn是能够支持大规模节点的,虽然discoverd组件部署在所有的节点上,但并不是所有的节点都参与选举,只有部分节点作为Raft集群的节点,其余节点作为代理节点(proxying),Flynn通过以下逻辑判断是否是代理节点,启动discoverd时指定的-peers参数起了关键作用。
    // if the advertise addr is not in the peer list we are proxying
    proxying := true
    for _, addr := range m.peers {
        if addr == m.advertiseAddr {
            proxying = false
            break
        }
    }
discoverd组件本身依赖Raft协议保证数据的一致性,这里提到的数据,在discoverd里指的是服务,我们可以把我们的服务注册到discoverd上,我们的这些服务可能也需要一个Leader节点,例如Flynn的调度组件(scheduler)就只能在Leader节点上执行调度任务。那是否这些服务也是依赖Raft协议来选择Leader节点呢?
注册到discoverd组件上的服务不依赖Raft协议选择Leader,discoverd组件根据注册时间的长短来选择Leader,活的最长的服务节点被选择为Leader。所有注册到discoverd组件上的服务必须向discoverd组件发送心跳信息,discoverd组件如果检测到某个服务节点没有了心跳信息,就会把该节点移除,如果该节点恰好是Leader节点,那么就会触发重新选择Leader的动作。
以下代码是心跳检查相关代码
// Open starts the raft consensus and opens the store.
func (s *Store) Open() error {
    go s.expirer()
    return nil
}
// expirer runs in a separate goroutine and checks for instance expiration.
func (s *Store) expirer() {
    defer s.wg.Done()
    ticker := time.NewTicker(s.ExpiryCheckInterval)
    defer ticker.Stop()
    for {
        // Wait for next check or for close signal.
        select {
        case <-s.closing:
            return
        case <-ticker.C:
        }
        // Check all instances for expiration.
        if err := s.EnforceExpiry(); err != nil && err != raft.ErrNotLeader {
            s.logger.Printf("enforce expiry: %s", err)
        }
    }
}
// 以下代码有删减,仅保留主要逻辑
// EnforceExpiry checks all instances for expiration and issues an expiration command, if necessary.
// This function returns raft.ErrNotLeader if this store is not the current leader.
func (s *Store) EnforceExpiry() error {
    var cmd []byte
    // Ignore if this store is not the leader and hasn't been for at least 2 TTLs intervals.
    if !s.IsLeader() {
        return raft.ErrNotLeader
    } else if s.leaderTime.IsZero() || time.Since(s.leaderTime) < (2*s.InstanceTTL) {
        return ErrLeaderWait
    }
    // Iterate over services and then instances.
    var instances []expireInstance
    for service, m := range s.data.Instances {
        for _, inst := range m {
            // Ignore instances that have heartbeated within the TTL.
            if t := s.heartbeats[instanceKey{service, inst.ID}]; time.Since(t) <= s.InstanceTTL {
                continue
            }
            // Add to list of instances to expire.
            // The current expiry time is added to prevent a race condition of
            // instances updating their expiry date while this command is applying.
            instances = append(instances, expireInstance{
                Service:    service,
                InstanceID: inst.ID,
            })
        }
    }
    // Create command to expire instances.
    cmd, err := json.Marshal(&expireInstancesCommand{
        Instances: instances,
    })
    // Apply command to raft.
    if _, err := s.raftApply(expireInstancesCommandType, cmd); err != nil {
        return err
    }
    return nil
}
以下是出发重新选举的代码:
func (s *Store) Apply(l *raft.Log) interface{} {
    // Extract the command type and data.
    typ, cmd := l.Data[0], l.Data[1:]
    // Determine the command type by the first byte.
    switch typ {
    case expireInstancesCommandType:
        return s.applyExpireInstancesCommand(cmd)
    default:
        return fmt.Errorf("invalid command type: %d", typ)
    }
}
func (s *Store) applyExpireInstancesCommand(cmd []byte) error {
    var c expireInstancesCommand
    if err := json.Unmarshal(cmd, &c); err != nil {
        return err
    }
    // Iterate over instances and remove ones with matching expiry times.
    services := make(map[string]struct{})
    for _, expireInstance := range c.Instances {
        // Remove instance.
        delete(m, expireInstance.InstanceID)
        // Broadcast down event.
        s.broadcast(&discoverd.Event{
            Service:  expireInstance.Service,
            Kind:     discoverd.EventKindDown,
            Instance: inst,
        })
        // Keep track of services invalidated.
        services[expireInstance.Service] = struct{}{}
    }
    // Invalidate all services that had expirations.
    for service := range services {
        s.invalidateServiceLeader(service)
    }
    return nil
}
// invalidateServiceLeader updates the current leader of service.
func (s *Store) invalidateServiceLeader(service string) {
    // Retrieve service config.
    c := s.data.Services[service]
    // Ignore if there is no config or the leader is manually elected.
    if c == nil || c.LeaderType == discoverd.LeaderTypeManual {
        return
    }
    // Retrieve current leader ID.
    prevLeaderID := s.data.Leaders[service]
    // Find the oldest, non-expired instance.
    var leader *discoverd.Instance
    for _, inst := range s.data.Instances[service] {
        if leader == nil || inst.Index < leader.Index {
            leader = inst
        }
    }
    // Retrieve the leader ID.
    var leaderID string
    if leader != nil {
        leaderID = leader.ID
    }
    // Set leader.
    s.data.Leaders[service] = leaderID
    // Broadcast event.
    if prevLeaderID != leaderID {
        var inst *discoverd.Instance
        if s.data.Instances[service] != nil {
            inst = s.data.Instances[service][leaderID]
        }
        s.broadcast(&discoverd.Event{
            Service:  service,
            Kind:     discoverd.EventKindLeader,
            Instance: inst,
        })
    }
}
心跳信息是在哪里呢,估计都想到了,一定是在注册服务的那里。discoverd提供了对应的客户端,代码在discoverd/client里面,有个heartbeat.go就是专门来发心跳的。我们可以通过discoverd客户端的AddServiceAndRegister方法来完成服务注册功能
func (c *Client) AddServiceAndRegister(service, addr string) (Heartbeater, error) {
    if err := c.maybeAddService(service); err != nil {
        return nil, err
    }
    return c.Register(service, addr)
}
func (c *Client) Register(service, addr string) (Heartbeater, error) {
    return c.RegisterInstance(service, &Instance{Addr: addr})
}
func (c *Client) RegisterInstance(service string, inst *Instance) (Heartbeater, error) {
    h := newHeartbeater(c, service, inst)
    err := runAttempts.Run(func() error {
        firstErr := make(chan error)
        go h.run(firstErr)
        return <-firstErr
    })
    if err != nil {
        return nil, err
    }
    return h, nil
}
func (h *heartbeater) run(firstErr chan<- error) {
    path := fmt.Sprintf("/services/%s/instances/%s", h.service, h.inst.ID)
    register := func() error {
        h.Lock()
        defer h.Unlock()
        return h.client().Put(path, h.inst, nil)
    }
    timer := time.NewTimer(nextHeartbeat())
    for {
        select {
        case <-timer.C:
            if err := register(); err != nil {
                timer.Reset(nextHeartbeatFailing())
                break
            }
            timer.Reset(nextHeartbeat())
        case <-h.stop:
            h.client().Delete(path)
            close(h.done)
            return
        }
    }
}
discoverd也是以Http协议提供服务,比如通过GET /services/abc/leader来获取abc服务的Leader节点,当然,也可以使用SSE协议来监听abc服务的Leader变化事件。Flynn的调度组件(scheduler)就是采用SSE协议监听Leader节点的变化。
    r.PUT("/services/:service/leader", h.servePutLeader)
    r.GET("/services/:service/leader", h.serveGetLeader)
 
  
  
 