sonic orch调度系统(1)----select

ByteWaltzX
• 阅读 3173

​ 常见的服务器模型有多进程模型,多线程,IO多路复用,协程等模型。sonic的核心守护进程orchagent采用的是IO多路复用模型,早期的sonic采用的是select实现多路复用,后面的版本采用的是epoll。使用select(跟多路复用的select名字一样)类对底层进行了封装,屏蔽了差异。

class Selectable

事件基类,描述了epoll事件,可以是读,写,异常等事件。该结构对通用epoll事件进行了封装,真实事件通过该类派生出来,比如redis数据库事件:class RedisSelect : public Selectable;netlink事件:class NetLink : public Selectable;通知:class NotificationConsumer : public Selectable,orch执行单元:class Executor : public Selectable,定时器:class SelectableTimer : public Selectable等。

class Selectable
{
public:
    Selectable(int pri = 0) : m_priority(pri),
                              m_last_used_time(std::chrono::steady_clock::now()) {
                              lastusedsequence = g_lastusedsequence++;}

    virtual ~Selectable() = default;

    /* return file handler for the Selectable */
    virtual int getFd() = 0;

    /* Read all data from the fd assicaited with Selectable */
    virtual void readData() = 0;

    /* true if Selectable has data in its cache */
    // 判断是否还有数据,如果有放入就绪事件set
    virtual bool hasCachedData()
    {
        return false;
    }

    /* true if Selectable was initialized with data */
    // 判断是否需要读取初始数据
    virtual bool initializedWithData()
    {
        return false;
    }

    /* run this function after every read */
    // 更新事件数
    virtual void updateAfterRead()
    {
    }

    int getPri() const
    {
        return m_priority;
    }

private:

    friend class Select;//友元类为Select

    // only Select class can access and update m_last_used_time

    std::chrono::time_point<std::chrono::steady_clock> getLastUsedTime() const
    {
        return m_last_used_time;
    }
    // 最后使用序列号
    unsigned long getLastUsedsequence() const
    {
        return lastusedsequence;
    }
    // 跟新最后使用序列号
    void updateLastUsedTime()
    {
        m_last_used_time = std::chrono::steady_clock::now();
        lastusedsequence = g_lastusedsequence++;
    }
    // 优先级,实现基于优先级调度
    int m_priority; // defines priority of Selectable inside Select
                    // higher value is higher priority
    std::chrono::time_point<std::chrono::steady_clock> m_last_used_time;
    unsigned long lastusedsequence;//上次使用序列号
    static unsigned long g_lastusedsequence;//全局基准序列号,用于对同优先级业务进行公平调度
};

class Select

class Select
{
public:
    Select();
    ~Select();

    /* Add object for select 给epoll添加一个事件 */
    void addSelectable(Selectable *selectable);

    /* Remove object from select 删除一个epoll事件 */
    void removeSelectable(Selectable *selectable);

    /* Add multiple objects for select 添加多个epoll事件 */
    void addSelectables(std::vector<Selectable *> selectables);

    enum {//返回的事件类型
        OBJECT = 0,
        ERROR = 1,
        TIMEOUT = 2,
    };
    //执行epoll 
    int select(Selectable **c, unsigned int timeout = std::numeric_limits<unsigned int>::max());
    int select(std::vector<Selectable *> &vc, unsigned int timeout = std::numeric_limits<unsigned int>::max());

private:
    //epoll事件比较函数,通过该函数实现事件的优先级
    struct cmp
    {
        bool operator()(const Selectable* a, const Selectable* b) const
        {
            /* Choose Selectable with highest priority first */
            if (a->getPri() > b->getPri())
                return true;
            else if (a->getPri() < b->getPri())
                return false;

            /* if the priorities are equal */
            /* use Selectable which was selected later */
            if (a->getLastUsedsequence() < b->getLastUsedsequence())
                return true;
            else if (a->getLastUsedsequence() > b->getLastUsedsequence())
                return false;

            /* when a == b */
            return false;
        }
    };
    //epoll轮询函数
    int poll_descriptors(Selectable **c, unsigned int timeout);
    int poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout);

    int m_epoll_fd;//epoll句柄
    std::unordered_map<int, Selectable *> m_objects;//监听的事件句柄与其对应的selectable之间的关系
    std::set<Selectable *, Select::cmp> m_ready;//已经就绪的事件集合,提供了比较函数,从而实现优先级调度
};

Select::Select()

Select::Select()
{
    m_epoll_fd = ::epoll_create1(0);//创建epoll句柄
    if (m_epoll_fd == -1)
    {
        std::string error = std::string("Select::constructor:epoll_create1: error=("
                          + std::to_string(errno) + "}:"
                          + strerror(errno));
        throw std::runtime_error(error);
    }
}

Select::~Select()

Select::~Select()
{
    (void)::close(m_epoll_fd);
}

void Select::addSelectable(Selectable *selectable)

void Select::addSelectable(Selectable *selectable)
{
    const int fd = selectable->getFd();

    if(m_objects.find(fd) != m_objects.end())//已经添加了该事件,退出
    {
        SWSS_LOG_WARN("Selectable is already added to the list, ignoring.");
        return;
    }

    m_objects[fd] = selectable;

    if (selectable->initializedWithData())//是否已经有数据可读,读出已有的数据
    {
        m_ready.insert(selectable);
    }
    //添加可读事件
    struct epoll_event ev = {
        .events = EPOLLIN,
        .data = { .fd = fd, },
    };

    int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
    if (res == -1)
    {
        std::string error = std::string("Select::add_fd:epoll_ctl: error=("
                          + std::to_string(errno) + "}:"
                          + strerror(errno));
        throw std::runtime_error(error);
    }
}

void Select::removeSelectable(Selectable *selectable)

void Select::removeSelectable(Selectable *selectable)
{
    const int fd = selectable->getFd();

    m_objects.erase(fd);
    m_ready.erase(selectable);
    //从epoll中删除事件
    int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, NULL);
    if (res == -1)
    {
        std::string error = std::string("Select::del_fd:epoll_ctl: error=("
                          + std::to_string(errno) + "}:"
                          + strerror(errno));
        throw std::runtime_error(error);
    }
}

void Select::addSelectables(vector<Selectable *> selectables)

void Select::addSelectables(vector<Selectable *> selectables)
{
    for(auto it : selectables)//添加多个事件
    {
        addSelectable(it);
    }
}

int Select::poll_descriptors(......)

提取一个就绪事件

int Select::poll_descriptors(Selectable **c, unsigned int timeout)
{
    int sz_selectables = static_cast<int>(m_objects.size());
    std::vector<struct epoll_event> events(sz_selectables);
    int ret;
    //阻塞等待事件发生,发生错误或者被中断打断则继续监听,发生事件则执行事件
    do
    {
        ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
    }
    while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal

    if (ret < 0)
        return Select::ERROR;
    //遍历每一个发生的事件
    for (int i = 0; i < ret; ++i)
    {
        int fd = events[i].data.fd;
        Selectable* sel = m_objects[fd];//获取事件描述符
        sel->readData();//读取数据
        m_ready.insert(sel);//插入就绪集合
    }
    //存在就绪事件
    if (!m_ready.empty())
    {
        auto sel = *m_ready.begin();

        *c = sel;

        m_ready.erase(sel);
        // we must update clock only when the selector out of the m_ready
        // otherwise we break invariant of the m_ready
        // 更新该事件的使用时间,使用事件会作为事件优先级进行使用,越频繁的优先级越低,从而避免同优先级的事件
        // 饿死
        sel->updateLastUsedTime();
        // 有数据,依然放入已经就绪集合
        if (sel->hasCachedData())
        {
            // reinsert Selectable back to the m_ready set, when there're more messages in the cache
            m_ready.insert(sel);
        }
        // 对数据进行刷新,如果该句柄只发生了一次事件,那么这里会进行减1,下次m_ready中将不会存在该sel。
        // 仔细分析了sonic的selectable的实现,这里是有bug的,会造成大量的空转。
        sel->updateAfterRead();

        return Select::OBJECT;
    }

    return Select::TIMEOUT;
}

int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout)

提取多个就绪事件,该函数是在上面的函数的基础上的改进。只提取一个事件将会造成"饿死和胀死"的问题。由于m_ready是有序队列,对于高优先的事件总是会被优先提取,如果高优先级的事件依赖于低优先级事件的话,会造成高优先级的业务一直被调度,但是缺少依赖条件而不能执行业务,低优先级业务总是得不到调度,形成死锁问题。同时提取所有就绪事件可以解决高低优先级死锁问题。

int Select::poll_descriptors(std::vector<Selectable *> &vc, unsigned int timeout)
{
    int sz_selectables = static_cast<int>(m_objects.size());
    std::vector<struct epoll_event> events(sz_selectables);
    int ret;

    do
    {
        ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout);
    }
    while(ret == -1 && errno == EINTR); // Retry the select if the process was interrupted by a signal

    if (ret < 0)
        return Select::ERROR;

    for (int i = 0; i < ret; ++i)
    {
        int fd = events[i].data.fd;
        Selectable* sel = m_objects[fd];
        sel->readData();
        m_ready.insert(sel);
    }
    
    auto iter = m_ready.begin();
    while(iter !=m_ready.end())
    {
        auto sel = *iter;
        vc.push_back(sel);
        iter = m_ready.erase(iter);
        sel->updateLastUsedTime();
    }

    for(auto se:vc)
    {
        if (se->hasCachedData())
        {
            m_ready.insert(se);
        }
        se->updateAfterRead();
    }

    if(!vc.empty())
    {
        return Select::OBJECT;
    }
    
    return Select::TIMEOUT;
}

int Select::select(Selectable **c, unsigned int timeout)

int Select::select(Selectable **c, unsigned int timeout)
{
    SWSS_LOG_ENTER();

    int ret;

    *c = NULL;
    if (timeout == numeric_limits<unsigned int>::max())
        timeout = -1;

    /* check if we have some data */
    ret = poll_descriptors(c, 0);

    /* return if we have data, we have an error or desired timeout was 0 */
    if (ret != Select::TIMEOUT || timeout == 0)
        return ret;

    /* wait for data */
    ret = poll_descriptors(c, timeout);

    return ret;
}

int Select::select(std::vector<Selectable *> &vc, unsigned int timeout)

int Select::select(std::vector<Selectable *> &vc, unsigned int timeout)
{
    SWSS_LOG_ENTER();

    int ret;

    if (timeout == numeric_limits<unsigned int>::max())
        timeout = -1;

    /* check if we have some data */
    ret = poll_descriptors(vc, 0);

    /* return if we have data, we have an error or desired timeout was 0 */
    if (ret != Select::TIMEOUT || timeout == 0)
        return ret;

    /* wait for data */
    ret = poll_descriptors(vc, timeout);

    return ret;

}
点赞
收藏
评论区
推荐文章
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
3年前
java并发编程实践 笔记 2017
\TOC\javaIO模型BIO:JDK1.4之前的IO,阻塞IONIO:linux多路复用技术(select模式)实现IO事件的轮询方式:同步非阻塞的模式,这种方式目前是主流的网络通信模式Mina,netty网络通信框架AIO:jdk1.7
说透IO多路复用模型
在说IO多路复用模型之前,我们先来大致了解下Linux文件系统。在Linux系统中,不论是你的鼠标,键盘,还是打印机,甚至于连接到本机的socketclient端,都是以文件描述符的形式存在于系统中,诸如此类,等等等等,所以可以这么说,一切皆文件。
Stella981 Stella981
3年前
Redis 单线程如何处理那么多的并发客户端连接?
为什么Redis是单线程的1.官方答案因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了。IO多路复用技术redis采用网络IO多路复用技术来保证在多连接的
Stella981 Stella981
3年前
Redis网络模型的源码分析
Redis的网络模型是基于I/O多路复用程序来实现的。源码中包含四种多路复用函数库epoll、select、evport、kqueue。在程序编译时会根据系统自动选择这四种库其中之一。下面以epoll为例,来分析Redis的I/O模块的源码。epoll系统调用方法Redis网络事件处理模块的代码都是围绕epoll那三个系统方法
Wesley13 Wesley13
3年前
Java核心(五)深入理解BIO、NIO、AIO
导读:本文你将获取到:同/异步阻/非阻塞的性能区别;BIO、NIO、AIO的区别;理解和实现NIO操作Socket时的多路复用;同时掌握IO最底层最核心的操作技巧。BIO、NIO、AIO的区别是什么?同/异步、阻/非阻塞的区别是什么?文件读写最优雅的实现方式是什么?NIO如何实现多路复用功能
Stella981 Stella981
3年前
Redis和多路复用模型
作者:Rico原文:hogwartsrico.github.io/2020/06/24/RedisandMultiplexing/几种I/O模型为什么Redis中要使用I/O多路复用这种技术呢?首先,Redis是跑在单线程中的,所有的操作都是按照顺序线性执行的,但是由于读写操作等待用户输入或输出都是阻塞的
Stella981 Stella981
3年前
Redis线程模型
Redis是单线程模型,它内部采用了文件事件处理器filtereventhandler,而这个处理器是单线程的。文件事件处理器包含:多个socket、IO多路复用程序、事件分派器、事件处理器(连接应答处理器、事件请求处理器、事件回复处理器)。流程:IO多路复用程序会
Stella981 Stella981
3年前
Bypass ngx_lua_waf SQL注入防御(多姿势)
0x00前言ngx\_lua\_waf是一款基于ngx\_lua的web应用防火墙,使用简单,高性能、轻量级。默认防御规则在wafconf目录中,摘录几条核心的SQL注入防御规则:select.(from|limit)(?:(union(.?)select))(?:from\Winformation_schema\W)这边
Stella981 Stella981
3年前
Nginx
!(https://imagestatic.segmentfault.com/255/117/25511790966008dc5b00fd8)Nginx进程模型分析在介绍Nginx的进程模型之前我们先来给大家解释下一些常见的名词,这能辅助我们更好的了解Nginx的进程模型。作为Web服务器,设计的初衷就是为了能够处理更多的客户端的请
IO模型介绍(select、poll、epoll)
什么是IO?IO中的I就是input,O就是output,IO模型即输入输出模型,而比较常听说的便是磁盘IO,网络IO。什么是操作系统的IO?我们如果需要对磁盘进行读取或者写入数据的时候必须得有主体去操作,这个主体就是应用程序。应用程序是不能直接进行一些读
ByteWaltzX
ByteWaltzX
Lv1
未曾青梅,青梅枯萎,芬芳满地。
文章
5
粉丝
0
获赞
0