sonic消息传递机制与架构(1)

逻辑拓云人
• 阅读 7469

sonic是一个网络操作系统,采用了大量的相互独立的第三方开源组件,这些组件在依赖,编译环境,库,配置方式都有很大的不同。为了让这些组件在sonic中相互协作,互不干扰,同时尽量不修改第三方组件的代码,sonic采用容器技术为各个组件提供独立的运行环境,通过容器间共享网络命名空间进行通信。各个第三组件有各自的配置文件格式和消息格式,如何让这些组件互通信息了。sonic采用redis数据库作为消息传递平台,通过纯字符消息方式屏蔽各个组件的插件,通过胶水代码将其粘起来。

sonic消息框架图

sonic消息传递机制与架构(1)

实现

sonic通过redis数据库的发布-订阅机制和键空间事件机制实现了整个消息传递机制。

基类

class TableBase {
public:
    TableBase(int dbId, const std::string &tableName)
        : m_tableName(tableName), m_dbId(dbId)
    {
        /* Look up table separator for the provided DB */
        auto it = tableNameSeparatorMap.find(dbId);

        if (it != tableNameSeparatorMap.end())
        {
            m_tableSeparator = it->second;
        }
        else
        {
            SWSS_LOG_NOTICE("Unrecognized database ID. Using default table name separator ('%s')", TABLE_NAME_SEPARATOR_VBAR.c_str());
            m_tableSeparator = TABLE_NAME_SEPARATOR_VBAR;
        }
    }

    std::string getTableName() const { return m_tableName; }
    int getDbId() const { return m_dbId; }

    /* Return the actual key name as a combination of tableName<table_separator>key */
    std::string getKeyName(const std::string &key)
    {
        if (key == "") return m_tableName;
        else return m_tableName + m_tableSeparator + key;
    }

    /* Return the table name separator being used */
    std::string getTableNameSeparator() const
    {
        return m_tableSeparator;
    }

    std::string getChannelName() { return m_tableName + "_CHANNEL"; }
private:
    static const std::string TABLE_NAME_SEPARATOR_COLON;
    static const std::string TABLE_NAME_SEPARATOR_VBAR;
    static const TableNameSeparatorMap tableNameSeparatorMap;

    std::string m_tableName;
    std::string m_tableSeparator;
    int m_dbId;
};
class TableEntryWritable {
public:
    virtual ~TableEntryWritable() = default;

    /* Set an entry in the table */
    virtual void set(const std::string &key,
                     const std::vector<FieldValueTuple> &values,
                     const std::string &op = "",
                     const std::string &prefix = EMPTY_PREFIX) = 0;
    /* Delete an entry in the table */
    virtual void del(const std::string &key,
                     const std::string &op = "",
                     const std::string &prefix = EMPTY_PREFIX) = 0;

};

消费者基类

 消费者响应生产者的事件,可以采用阻塞或者轮询的方式处理。sonic采用了异步事件通知机制(poll)进行处理。消费者类必须实现事件通知机制相关的接口。
RedisSelect

该类对异步通知机制Selectable(select,poll等)进行了封装,集成该类的派生类可以加入异步事件机制,通过集成该类,消费者可以持续监听事件。

class RedisSelect : public Selectable
{
public:
    /* The database is already alive and kicking, no need for more than a second */
    static constexpr unsigned int SUBSCRIBE_TIMEOUT = 1000;

    RedisSelect(int pri = 0);//调度优先级

    int getFd() override;
    void readData() override;
    bool hasCachedData() override;
    bool initializedWithData() override;
    void updateAfterRead() override;

    /* Create a new redisContext, SELECT DB and SUBSCRIBE */
    void subscribe(DBConnector* db, const std::string &channelName);

    /* PSUBSCRIBE */
    void psubscribe(DBConnector* db, const std::string &channelName);

    void setQueueLength(long long int queueLength);

protected:
    std::unique_ptr<DBConnector> m_subscribe;
    long long int m_queueLength;//接收的应答的个数,一个请求一个应答。
};
getFd
int RedisSelect::getFd()
{
    return m_subscribe->getContext()->fd;
}
readData
void RedisSelect::readData()
{
    redisReply *reply = nullptr;

    if (redisGetReply(m_subscribe->getContext(), reinterpret_cast<void**>(&reply)) != REDIS_OK)
        throw std::runtime_error("Unable to read redis reply");

    freeReplyObject(reply);
    m_queueLength++;//事件加一次,

    reply = nullptr;
    int status;
    do
    {
        status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
        if(reply != nullptr && status == REDIS_OK)
        {//一个响应加一次,该值会大于最终处理的循环次数,造成空转,但是不加的话,极端情况下会造成丢失信息问题
            m_queueLength++;
            freeReplyObject(reply);
        }
    }
    while(reply != nullptr && status == REDIS_OK);

    if (status != REDIS_OK)
    {
        throw std::runtime_error("Unable to read redis reply");
    }
}
hasCachedData
bool RedisSelect::hasCachedData()
{//判断是否还有消息,存在消息的话,加入m_ready,保证已经读出来的消息能被处理
    return m_queueLength > 1;
}
updateAfterRead
void RedisSelect::updateAfterRead()
{
    m_queueLength--;//假设一次处理一个应答,这里减去1,即使一次处理了多个消息,依然只减掉1,造成空转的根本原因
}
setQueueLength
void RedisSelect::setQueueLength(long long int queueLength)
{
    m_queueLength = queueLength;//设置消息个数,用于构造函数
}
subscribe and psubscribe
/* Create a new redisContext, SELECT DB and SUBSCRIBE */
void RedisSelect::subscribe(DBConnector* db, const std::string &channelName)
{
    m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT));

    /* Send SUBSCRIBE #channel command */
    std::string s("SUBSCRIBE ");
    s += channelName;
    RedisReply r(m_subscribe.get(), s, REDIS_REPLY_ARRAY);
}

/* PSUBSCRIBE */
void RedisSelect::psubscribe(DBConnector* db, const std::string &channelName)
{
    m_subscribe.reset(db->newConnector(SUBSCRIBE_TIMEOUT));

    /*
     * Send PSUBSCRIBE #channel command on the
     * non-blocking subscriber DBConnector
     */
    std::string s("PSUBSCRIBE ");
    s += channelName;
    RedisReply r(m_subscribe.get(), s, REDIS_REPLY_ARRAY);
}

消费者进一步封装基类

class TableEntryPoppable {
public:
    virtual ~TableEntryPoppable() = default;

    /* Pop an action (set or del) on the table */
    virtual void pop(KeyOpFieldsValuesTuple &kco, const std::string &prefix = EMPTY_PREFIX) = 0;

    /* Get multiple pop elements */
    virtual void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX) = 0;
};

class TableConsumable : public TableBase, public TableEntryPoppable, public RedisSelect {
public:
    /* The default value of pop batch size is 128 */
    static constexpr int DEFAULT_POP_BATCH_SIZE = 128;//一次消费128条消息

    TableConsumable(int dbId, const std::string &tableName, int pri) : TableBase(dbId, tableName), RedisSelect(pri) { }
};

redis lua执行脚本

EVAL script numkeys key [key ...] arg [arg ...]
首先大家一定要知道eval的语法格式,其中:
   <1> script:     你的lua脚本
   <2> numkeys:  key的个数
   <3> key:         redis中各种数据结构的替代符号
   <4> arg:         你的自定义参数
ok,可能乍一看模板不是特别清楚,下面我可以用官网的小案例演示一下:
eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 username age jack 20

上面这一串代码大概是什么意思呢? 第一个参数的字符串就是script,也就是lua脚本。2表示keys的个数,KEYS[1] 就是 username的占位符, KEYS[2]就是age的占位符,ARGV[1]就是jack的占位符,ARGV[2]就是20的占位符,,以此类推,,,所以最后的结果应该就是:{return username age jack 20} 是不是有点像C#中的占位符:{0}呢?下面我在Redis中给大家演示一下:

admin@admin:~$ redis-cli
127.0.0.1:6379> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 username age jack 20
1) "username"
2) "age"
3) "jack"
4) "20"
127.0.0.1:6379>

然后我们通过下面命令执行,这种方式和前面介绍的不一样,参数 --eval script key1 key2 , arg1 age2 这种模式,key和value用一个逗号隔开就好了,最后我们也看到了,数据都出来了,对吧.

admin@admin:~$  redis-cli --eval t.lua username age , jack 20                 
1) "username"
2) "age"
3) "jack"
4) "20"
admin@admin:~$
注意上面的逗号左右两边都有空格
  • 脚本也可以在REPL模式上执行,不过需要先加载脚本:
admin@admin:~$ redis-cli script load "$(cat t.lua)"                          
"a42059b356c875f0717db19a51f6aaca9ae659ea"
admin@admin:~$
admin@admin:~$ redis-cli
127.0.0.1:6379> EVALSHA a42059b356c875f0717db19a51f6aaca9ae659ea 2 username age jack 20
1) "username"
2) "age"
3) "jack"
4) "20"
127.0.0.1:6379>
  • lua脚本比较大小,需要使用函数tonumber将字符转换成数字,然后比较大小
admin@admin:~$ cat flashsale.lua
local buyNum = ARGV[1]                        -- 本次购买的数量
local goodsKey = KEYS[1]                      -- 本次购买的商品名
local goodsNum = redis.call('get',goodsKey)   -- 获取商品的库存个数
if tonumber(goodsNum) >= tonumber(buyNum)     -- 库存足够,那么出货
    then redis.call('decrby',goodsKey,buyNum) -- 减少本次买的量
    return buyNum                             -- 返回购买的量
else
    return '0'                                -- 数量不够,直接返回0
end
admin@admin:~$
admin@admin:~$ redis-cli --eval flashsale.lua "pets" , 8
"8"
admin@admin:~$
上面脚本实现的是一个简单的秒杀程序
点赞
收藏
评论区
推荐文章
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
徐小夕 徐小夕
3年前
lerna + dumi + eslint多包管理实践
背景在开发大型项目时,我们通常会遇到同一工程依赖不同组件包,同时不同的组件包之间还会相互依赖的问题,那么如何管理组织这些依赖包就是一个迫在眉睫的问题.我们目前已有的方案有:Multirepo(多个依赖包独立进行git管理)和Monorepo(所有依赖库完全放入一个项目工程).Multirepo的缺点在于每个库变更之后,需要发布到线上,然后在项目
Easter79 Easter79
3年前
tomcat系列之五:Tomcat各个组件生命周期
一,Tomcat中各个组件的关系1,组件有大有小,大组件管理小组件。比如Server管理Service,Service管理连接器和容器2,组件有内有外,外层组件控制内层组件。比如外层组件连接器负责对外交流,外层组件调用内层组件完成业务功能二,创建组件的顺序先创建子组件,再创建父组件,然后把子组件注入到父组件中先创建内层组件,再创建外层
科林-Colin 科林-Colin
4年前
Vue 组件通信方式及其应用场景总结
前言相信实际项目中用过vue的同学,一定对vue中父子组件之间的通信并不陌生,vue中采用良好的数据通讯方式,避免组件通信带来的困扰。今天笔者和大家一起分享vue父子组件之间的通信方式,优缺点,及其实际工作中的应用场景首先我们带着这些问题去思考1vue中到底有多少种父子组件通信方式?2vue中那种父子组件最佳通信方式是什么?3
徐小夕 徐小夕
4年前
《精通react/vue组件设计》之实现一个健壮的警告提示(Alert)组件
前言本文是笔者写组件设计的第七篇文章,今天带大家实现一个自带主题且可关闭的Alert组件,该组件在诸如Antd或者elementUI等第三方组件库中都会出现,主要用
Stella981 Stella981
3年前
Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus
背景对于Android系统来说,消息传递是最基本的组件,每一个App内的不同页面,不同组件都在进行消息传递。消息传递既可以用于Android四大组件之间的通信,也可用于异步线程和主线程之间的通信。对于Android开发者来说,经常使用的消息传递方式有很多种,从最早使用的Handler、BroadcastReceiver、接口回调,到近几年流行的通
Stella981 Stella981
3年前
Flutter中日期组件DatePicker及组件汉化
Flutter提供了DatePicker组件进行时间选择。日期组件及时间组件代码示例:import'package:flutter/material.dart';//第三方插件,需要提前配置import'package:date_format/date_format.dart';classDat
Stella981 Stella981
3年前
Nginx + lua +[memcached,redis]
精品案例1、Nginxluamemcached,redis实现网站灰度发布2、分库分表/基于Leaf组件实现的全球唯一ID(非UUID)3、Redis独立数据监控,实现订单超时操作/MQ死信操作SelectPollEpollReactor模型4、分布式任务调试Quartz应用
Stella981 Stella981
3年前
30.Docker 控制组
Docker控制组控制组是Linux容器机制的另外一个关键组件,负责实现资源的审计和限制。它提供了很多有用的特性;以及确保各个容器可以公平地分享主机的内存、CPU、磁盘IO等资源;当然,更重要的是,控制组确保了当容器内的资源使用产生压力时不会连累主机系统。尽管控制组不负责隔离容器之间相互访问、处理数据和进程,它在防
Stella981 Stella981
3年前
AutoFac简介
在.NET上现在存在许多的依赖注入容器,如:CastleWindsor、StructureMap、Autofac、Unity。这里主要介绍一下Autofac,Autofac和其他容器的不同之处是它和C语言的结合非常紧密,在使用过程中对你的应用的侵入性几乎为零,更容易与第三方的组件集成。Autofac的主要特性如下:1)灵活的组件实例化:
提升前端开发效率的五种实用技术
组件化开发是一种将页面拆分成独立的可重用组件的开发方式。通过组件化开发,我们能够将复杂的界面逻辑拆分成独立的模块,提高代码的复用性和维护性。常用的组件化框架如React、Vue和Angular,它们提供了强大的组件化开发能力,使我们能够轻松构建复杂的用户界面,并提供了组件的生命周期管理和状态管理机制。