Skynet 服务创建流程

Stella981
• 阅读 827

Skynet 服务创建流程

根据设计综述 Skynet 是为了让服务器充分利用多核优势,将不同的业务放在独立的执行环境中处理。
Skynet 核心功能是加载一个 C 模块(动态库),模块用数字 id 标识,作为其 handle ,模块被称为服务 service 。服务间可以自由发送消息。每个模块可以向 Skynet 框架注册一个 callback 函数,用来接收发给它的消息。
每个服务是被一个个消息驱动,当无消息时,服务处于挂起状态。每个服务拥有一个属于自己的消息队列,框架中存在一个全局队列负责调度处理服务所接收到的消息。

代码层面,Skynet 服务对应于数据结构 struct skynet_context ,其中重要字段如下。

struct skynet_context {
    void * instance;             // 模块自定义数据
    struct skynet_module * mod;      // 框架模块数据
    void * cb_ud;                 // 传给回调函数的自定义数据
    skynet_cb cb;                  // 回调函数
    struct message_queue * queue;  // 消息队列,用于接收发送给服务的消息
};

函数 skynet_context_new 用于创建服务,返回值表示此服务。

// 参数 name: 服务名
// 参数 param: 传递给服务的参数
struct skynet_context * skynet_context_new(const char * name, const char *param);

服务调用函数 skynet_callback 用于向框架注册回调函数,处理接收到的消息。

// 参数 context: 表示服务
// 参数 ud: user data 表示自定义数据
// 参数 cb: 表示回调函数
void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb);

// 参数 context: 表示服务
// 参数 ud: user data 由 skynet_callback 指定
// 参数 type: 消息类型
// 参数 session: 由发送方指定,标识发送的消息
// 参数 source: 表示发送方服务的地址
// 参数 msg sz: 数据
typedef int (*skynet_cb)(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz);

函数 skynet_send 用于向服务发送消息。向一个服务发送消息就是向这个服务的消息队列中添加消息。

// 参数 context: 表示服务
// 参数 source: 表示发送方服务的地址,可为 0
// 参数 destination: 表示接收方服务的地址
// 参数 type: 消息类型
// 参数 session: 用于发送方标识发送的消息,可为 0
// 参数 data sz: 数据
// 返回值 : session
int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz);

C 服务

// 参数 name: 服务名
// 参数 param: 传递给服务的 c-style 字符串,以空白字符分割
struct skynet_context * skynet_context_new(const char * name, const char *param)
{
    struct skynet_module * mod = skynet_module_query(name);
    // 执行 create 传递参数无
    void *inst = skynet_module_instance_create(mod);
    // 执行 init ,参数 param 是传递给服务的参数
    int r = skynet_module_instance_init(mod, inst, ctx, param);
}

函数 skynet_context_new 完成 C 服务的创建,一次函数调用即可完成。函数执行成功,返回的 context 便是创建的服务。创建过程中的初始化包含 createinit 流程,在 init 流程中调用 skynet_callback 注册回调函数。此后,服务创建完成,便可接收消息。
Lua 函数 skynet.launch 用于在 Lua 中创建 C 服务。


Lua 服务

Lua 服务本质上也是 C 服务,只是将 Lua 回调函数注册到 C 服务中,将对服务接收消息的处理移动到 Lua 中。完成此功能的 C 服务是 snlua 服务。

因此,创建 Lua 服务需要先创建 snlua 服务,然后在 Lua 中调用 skynet.start 将 Lua 回调函数注册到 C 服务中,skynet.start 函数执行完后,Lua 服务创建完成。

skynet.newservice 用于创建 Lua 服务。参数 name 是服务名,参数 ... 是传给服务的参数,需要是 Lua 中能被转换成字符串的值。

function skynet.newservice(name, ...)
    return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
end

skynet.newservice 流程如下。

  1. 当前服务向 launcher 服务发送请求,请求创建服务 name
  2. launcher 服务创建新服务。
  3. 新服务创建完成后,skynet.newservice 返回新服务的地址。

具体的创建过程如下所示。
逻辑来到 launcher 服务 command.LAUNCH 函数(主要代码片段)。

local function launch_service(service, ...)
    local param = table.concat({...}, " ")
    local inst = skynet.launch(service, param)
    local response = skynet.response()
    if inst then -- launch 成功
        services[inst] = service .. " " .. param
        instance[inst] = response
    else -- launch 失败
        response(false)
        return
    end
    return inst
end

function command.LAUNCH(_, service, ...)
    launch_service(service, ...)
    return NORET
end

command.LAUNCH 调用 launch_service 传递的参数依次是 ("snlua", name, ...)skynet.newservice 传递的参数对应。 在 launch_service 函数中 service 变量是 "snlua"param 变量是要创建的服务名及其参数。调用 skynet.launch 创建 C 服务 snlua 且将 param 传递给 snlua ,返回的 inst 是创建的地址。 创建成功记录数据到 servicesinstance 变量中,services 存储通过 launcher 服务创建的服务,instance 存储 skynet.response 用于回复请求方创建结果。
创建失败调用 response(false) 回复请求方创建失败。
新服务创建完成后,发送 LAUNCHOK 到 launcher 服务。launcher 服务回复请求方新服务的地址。

逻辑来到 C 服务 snlua ,模块是 service_snlua.c 。
snlua 服务的核心工作就是将消息处理回调函数对接到 Lua 中指定的回调函数。
snlua 服务首先执行 create 函数,调用 lua_newstate 创建 Lua 虚拟机。然后在 init 函数调用 skynet_callback 向框架注册回调函数,并向自身发送第一条消息,用于进行后续初始化。注意 args 参数是要创建的服务信息。

// 参数 args: 字符串,包含由空白字符分割的多个字符串,第一个字符串是要创建的 Lua 服务名,参考 skynet.launch 的 param 参数
int
snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args) {
    int sz = strlen(args);
    char * tmp = skynet_malloc(sz);
    memcpy(tmp, args, sz);
    // 1. 注册回调函数 launch_cb
    skynet_callback(ctx, l , launch_cb);
    // 2. 在第一条消息中进行后续初始化
    const char * self = skynet_command(ctx, "REG", NULL);
    uint32_t handle_id = strtoul(self+1, NULL, 16);
    // it must be first message
    skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
    return 0;
}

struct snlua *
snlua_create(void) {
    struct snlua * l = skynet_malloc(sizeof(*l));
    memset(l,0,sizeof(*l));
    l->L = lua_newstate(lalloc, l); // 创建 Lua 虚拟机
    return l;
}

问题:为何没有在 snlua_init 中直接调用 init_cb
乍一看,后续初始化逻辑 init_cb 可在 snlua_init 函数中完成,但这里设计成延迟到第一条消息中执行,好处是简化 snlua_init 函数逻辑,虽然增加了流程,但由于是在第一条消息中处理,整个过程是连续的,从框架整体来看,可认为此过程是“原子”的。对于这种复杂初始化流程,我也很认可这种设计,学习了。

在第一条消息中处理后续初始化。如果初始化失败,snlua 服务退出。

// 参数 msg sz: 要创建的服务信息,参考 snlua_init 中 args 参数
static int
launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) {
    assert(type == 0 && session == 0);
    struct snlua *l = ud;
    skynet_callback(context, NULL, NULL); // 细节:先清空回调字段
    int err = init_cb(l, context, msg, sz);
    if (err) {
        skynet_command(context, "EXIT", NULL);
    }
    return 0;
}

Lua 服务具体的初始化逻辑如下。

// 参数 args sz: 对应 launch_cb 中 msg sz
static int
init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
    lua_State *L = l->L;
    l->ctx = ctx;
    lua_gc(L, LUA_GCSTOP, 0); // 关闭 GC
    lua_pushboolean(L, 1);  /* signal for libraries to ignore env. vars. */
    lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");
    luaL_openlibs(L); // 打开标准库
    lua_pushlightuserdata(L, ctx); // 设置 struct skynet_context
    lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");
    luaL_requiref(L, "skynet.codecache", codecache , 0);
    lua_pop(L,1);

    // 保存一些路径到如下全局变量中
    const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");
    lua_pushstring(L, path);
    lua_setglobal(L, "LUA_PATH");
    const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");
    lua_pushstring(L, cpath);
    lua_setglobal(L, "LUA_CPATH");
    const char *service = optstring(ctx, "luaservice", "./service/?.lua");
    lua_pushstring(L, service);
    lua_setglobal(L, "LUA_SERVICE");
    const char *preload = skynet_command(ctx, "GETENV", "preload");
    lua_pushstring(L, preload);
    lua_setglobal(L, "LUA_PRELOAD");

    // 设置 traceback 函数
    lua_pushcfunction(L, traceback);
    assert(lua_gettop(L) == 1);

    // 加载 Lua 服务入口脚本
    const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
    int r = luaL_loadfile(L,loader);
    if (r != LUA_OK) {
        skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));
        report_launcher_error(ctx);
        return 1;
    }
    lua_pushlstring(L, args, sz);
    r = lua_pcall(L,1,0,1);
    if (r != LUA_OK) {
        skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));
        report_launcher_error(ctx);
        return 1;
    }
    lua_settop(L,0);
    
    // 处理 skynet.memlimit 设置的内存限制
    if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {
        size_t limit = lua_tointeger(L, -1);
        l->mem_limit = limit;
        skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));
        lua_pushnil(L);
        lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");
    }
    lua_pop(L, 1);

    lua_gc(L, LUA_GCRESTART, 0); // 重启 GC
    return 0;
}

函数 init_cb 的核心作用就是调用 lua_pcall 函数执行 Lua 服务入口脚本,脚本文件由 args sz 指定,具体实现有一些细节。

  • 上述整个加载过程是关闭 Lua GC 的,完成后才重新启动 GC 。猜测是为了加快加载速度。
  • skynet.codecache 用于在 Lua 虚拟机之间共享代码。
  • 调用 lua_pcall 前设置了 traceback 函数。
  • 通过 lualoader 加载 lualoader 脚本,并传递 args sz 参数
  • snlua 服务中调用 skynet_callback(context, NULL, NULL); 删除回调函数后,未再发现注册回调函数的 C 代码,此注册是在 Lua 中完成的。

执行 lua_pcall 函数,逻辑来到框架提供的 lualoader 脚本,位于 ./lualib/loader.lua

-- ... 就是 C 函数 init_cb 中的 args sz
local args = {} -- Lua 服务名及参数
for word in string.gmatch(..., "%S+") do
    table.insert(args, word)
end
SERVICE_NAME = args[1] -- 服务名

-- 定位并加载 Lua 服务脚本文件
local main, pattern -- 分别表示加载后的 Lua chunk 和 Lua 服务脚本文件路径
local err = {}
for pat in string.gmatch(LUA_SERVICE, "([^;]+);*") do
    local filename = string.gsub(pat, "?", SERVICE_NAME)
    local f, msg = loadfile(filename)
    if not f then
        table.insert(err, msg)
    else
        pattern = pat
        main = f
        break
    end
end

-- 预处理,可选
if LUA_PRELOAD then
    local f = assert(loadfile(LUA_PRELOAD))
    f(table.unpack(args))
    LUA_PRELOAD = nil
end
-- 执行 Lua 服务入口脚本
main(select(2, table.unpack(args)))

前面提到,需要在脚本调用 skynet.start 向框架注册 Lua 回调函数,完成创建 Lua 服务。lualoader 中调用 main 函数,执行 Lua 服务入口脚本,于是 skynet.start 函数被调用,Lua 服务创建完成。
skynet.start 函数中 c.callback(skynet.dispatch_message) 完成 Lua 回调函数的注册
如下代码,skynet.start 执行完毕后,注册一个 0 秒定时器回调,那时调用 start_func 执行上层业务初始化,并根据初始化结果发送消息到 launcher 服务,告知创建成功与否。

function skynet.start(start_func)
    c.callback(skynet.dispatch_message) -- 注册 Lua 回调函数
    -- 注册定时器回调处理上层业务初始化
    init_thread = skynet.timeout(0, function()
        skynet.init_service(start_func)
        init_thread = nil
    end)
end

function skynet.init_service(start)
    -- 初始化上层业务,并告知 launcher 创建结果
    local ok, err = skynet.pcall(start)
    if not ok then
        skynet.error("init service failed: " .. tostring(err))
        skynet.send(".launcher","lua", "ERROR")
        skynet.exit()
    else
        skynet.send(".launcher","lua", "LAUNCHOK")
    end
end

注意,skynet.start 函数执行完后,在 Skynet 框架层面 Lua 服务已创建完成,可对外提供服务。
skynet.init_service 是在业务层面完成初始化,然后才通知 launcher 服务。

问题:为何没有在 skynet.start 中直接调用 skynet.init_service ?
先看调用链 snlua - init_cb() -> lua_pcall -> lualoader - main() -> skynet.start()skynet.start 调用堆栈是从 init_cb 函数触发,而 skynet.init_service 会调用上层业务初始化 start_func 函数,start_func 函数可能会很复杂,比如有 RPC ,在 skynet.start 函数中将 skynet.init_service 放到定时器回调中执行,可以将业务层初始化和框架层初始化分离,简化 init_cb 函数触发到脚本中的逻辑。学习了。


理解:在 Skynet 框架层面,创建服务的“原子”性。
对于 C 服务,一次 C 函数 skynet_context_new 调用完成创建,框架内部处理具体过程中的多线程临界区,但从框架层面来看,给这个服务发送消息,要么未查询到服务,要么查询到服务且此时服务是可接收消息的。
对于 Lua 服务,需要一次 C 函数 skynet_context_new 调用和第一条消息完成创建,虽然带有流程,但第一条消息保证了连续性,从框架层面来看,也满足给这个服务发送消息,要么未查询到服务,要么查询到服务且此时服务是可接收消息的。但 Lua 服务中,skynet.init_service 是在定时器回调中调用的,假设在此函数执行前,消息队列中已经存在其它消息,此时在 Lua raw_dispatch_message 函数中,如下代码片段,若 p == nil 则进行如下处理。

local p = proto[prototype]
if p == nil then
    if session ~= 0 then
        c.send(source, skynet.PTYPE_ERROR, session, "")
    else
        unknown_request(session, source, msg, sz, prototype)
    end
    return
end

理解:函数 skynet.register_protocolskynet.dispatch 调用时机。
此函数指定用于业务的消息处理函数。而 skynet.start 的参数 start_func 调用之前,可能已经接收到了消息。
因此,如果某类消息依赖于 start_func 进行初始化,则应该在 start_func 中才指定用于业务的消息处理函数。
如果某消息不依赖于 start_func ,则可和 skynet.start 的调用时机一样,指定消息处理函数。

理解:服务与 worker 线程。
Skynet 在启动 worker 线程之前,就创建了服务,并且可向此服务发送消息,只是 worker 线程开始工作后,才开始调度执行服务接收到的消息。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
Skynet 简单的服务编写 (2)
          前面我们简单的用Skynet写了个lua服务,实现了简单的发送消息并调用输出“HelloWorld!”  而由于Skynet是网络框架,所以我们这次用Skynet写一个简单的服务来监听80端口。  具体流程如下:      1、在main文件入口内监ip为:192.168.2.5,端口为:80    
Wesley13 Wesley13
2年前
1. 容器化部署一套云服务 第一讲 Jenkins(Docker + Jenkins + Yii2 + 云服务器))
容器化部署一套云服务系列1\.容器化部署一套云服务之Jenkins(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2Fjackson0714%2Fp%2Fdeploy1.html)一、购买服务器服务器!caeef00
Stella981 Stella981
2年前
Skynet 设计综述
1.多线程模式,可以使得状态共享、数据交换更加高效。而多线程模型的诸多弊端,比如复杂的线程锁、线程调度问题等,都可以通过减小底层的规模,精简设计,最终把危害限制在很小的范围内。2.做为核心功能,Skynet仅解决一个问题:把一个符合规范的C模块,从动态库(so文件)中启动起来,绑定一个永不重复(即使模块退出)的数字id做为其handl
Stella981 Stella981
2年前
Skynet 简单的服务编写 (1)
Skynet通过内部注册的协议进行消息传输。看以下简单例子:  1、首先我们新建一个服务network;  2、然后对其进行发送一条lua消息,让其调用start函数进行输出;main.lualocalskynetrequire"skynet"localfunctionmain()
Stella981 Stella981
2年前
Skynet 代码(1)
服务端:mainlocalskynetrequire"skynet"localfunctionaction()skynet.uniqueservice("debug_console",8000)skynet.uniqueservice"watchdog"skynet.e
Stella981 Stella981
2年前
Google地球出现“无法连接到登录服务器(错误代码:c00a0194)”解决方法
Google地球出现“无法连接到登录服务器(错误代码:c00a0194)”解决方法参考文章:(1)Google地球出现“无法连接到登录服务器(错误代码:c00a0194)”解决方法(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.codeprj.com%2Fblo
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这