Skynet Cluster 简介

Stella981
• 阅读 1190

Skynet Cluster 简介

Cluster 模块负责 Skynet 节点之间的通信。

Cluster 概述

      | --- --- ---> |
node1 |              | node2
      | <-- --- ---  |

两个节点之间通信最多会创建 2 条 TCP 链路。如上图所示。

  • 节点 node1 主动向 node2 发起通信和 node2 的回应是一条 TCP 链接
  • 节点 node2 主动向 node1 发起通信和 node1 的回应是另一条 TCP 链接

Cluster 模块包含三个服务:clusterd clusteragentclustersender ,通信使用 gate 服务。

  • clusterd 服务提供管理功能,初始化 gate 服务监听网络连接。每个节点只存在一个 clusterd 服务。
  • gate 服务接收到新的 socket 连接时,通知 clusterd 服务,此时创建 clusteragent 服务接收数据。每个 clusteragent 服务对应一个 socket 连接。
  • 主动与其它节点通信时,clusterd 服务创建 clustersender 服务建立连接并发送数据。每个 clustersender 服务对应一个 socket 连接。

举个例子。假设存在如下节点配置:

center = "192.168.1.1:10000"
login1 = "192.168.1.2:10011"
login2 = "192.168.1.3:10012"
  • 若 login1 主动与 center 节点通信,则建立一条 TCP 连接,login1 创建一个 clustersender 服务与 center 创建的一个 clusteragent 服务对应。
  • 若 login2 主动与 center 节点通信,则建立一条 TCP 连接,login2 创建一个 clustersender 服务与 center 创建的另一个 clusteragent 服务对应。
  • 若 center 主动与 login1 节点通信,则建立一条 TCP 连接,center 创建一个 clustersender 服务与 login1 创建的一个 clusteragent 服务对应。
  • 若 center 主动与 login2 节点通信,则建立一条 TCP 连接,center 创建另一个 clustersender 服务与 login2 创建的一个 clusteragent 服务对应。

上述一共会创建 4 条 TCP 连接,4 个 clusteragent 服务和 4 个 clustersender 服务。

Cluster 接口

在使用 Skynet Cluster 之前,需要先设置配置信息,指定节点名称对应的 IP 地址及端口号。配置格式如下:db 是节点名称而 127.0.0.1:2528 是该节点对应的 IP 地址及端口号。

db = "127.0.0.1:2528"
db2 = "127.0.0.1:2529"

配置信息可通过配置文件给出,在 Skynet Config 中把文件名赋值给 cluster 即可。
此外也可以直接通过字符串给出配置信息。

调用 cluster.reload(config) 重载配置信息,config 可以是文件名或者字符串。

使用过程中通常涉及如下接口。接口位于库文件 lualib/skynet/cluster.lua 中。

调用 require "skynet.cluster" 创建 clusterd 服务。require 时调用 skynet.uniqueservice("clusterd") 创建唯一的 clusterd 服务。
调用 cluster.open(port) 监听网络连接。
调用 cluster.call(node, address, ...)node 节点发起请求。
调用 cluster.send(node, address, ...)node 节点推送数据。
调用 cluster.register(name, addr) 注册字符串 name 对应的地址。此时,请求方只需使用字符串 name 就可访问到对应的服务。 调用 cluster.query(node, name) 查询字符串 name 对应的服务地址。

通常,服务器调用 cluster.open 监听网络连接。客户端调用 cluster.call 或者 cluster.send 指定节点名称服务地址发起访问。

Cluster 加载配置流程

clusterd 服务在启动时会调用 loadconfig 函数加载配置,此外也可以通过 cluster.reload(config) 重载配置。

clusterd.lua:72 loadconfig 函数用于加载配置。配置格式就是前面提到的节点名称和对应的 IP 地址及端口号。加载完配置后,便可获取到节点对应的地址信息。具体流程如下。

  • 加载选项到 config 中。目前支持在配置中设置 __nowaiting = true ,若向节点发起连接请求时,未找到连接对应的地址,则直接报错而不是挂起协程等待该节点的地址信息被设置。

  • 加载配置数据到 node_address 中。

  • ct.namequerynot config.nowaitingtrue ,则唤醒等待节点地址信息的协程。

    local ct = connecting[name] if ct and ct.namequery and not config.nowaiting then skynet.error(string.format("Cluster node [%s] resloved : %s", name, address)) skynet.wakeup(ct.namequery) end

  • config.nowaitingtrue 则表示不等待节点的配置信息,此时唤醒所有的协程。

    if config.nowaiting then -- wakeup all connecting request for name, ct in pairs(connecting) do if ct.namequery then skynet.wakeup(ct.namequery) end end end

  • 若节点的地址发生变更,则向新地址重新发起连接请求。

    for _, name in ipairs(reload) do -- open_channel would block skynet.fork(open_channel, node_channel, name) end

Cluster 监听网络连接

cluster.open 函数用于监听网络连接。clusterd.lua124 command.listen 处理具体的监听逻辑,网络模块采用 gate 服务。处理来自 gate 服务的消息来处理网络请求。

function command.listen(source, addr, port)
    local gate = skynet.newservice("gate")
    if port == nil then
        local address = assert(node_address[addr], addr .. " is down")
        addr, port = string.match(address, "([^:]+):(.*)$")
    end
    skynet.call(gate, "lua", "open", { address = addr, port = port })
    skynet.ret(skynet.pack(nil))
end

Cluster 发起网络连接请求

调用 cluster.sendcluster.call 向节点发送数据,若还未建立连接,则会先尝试建立连接。概要流程如下:

  • cluster.lua:8 request_sender 函数中调用 pcall(skynet.call, clusterd, "lua", "sender", node) 通过 clusterd 服务创建 clustersender 服务,创建完服务后,则连接建立完成。
  • clustersender 服务发送数据,即可向目标节点发送数据。

获取 clustersender 服务

对于 cluster.call 函数,通过 cluster.lua41 get_sender 函数获取 clustersender 服务,若处于创建过程中,则调用 skynet.wait(task) 等待。
对于 cluster.send 函数,由于不能阻塞调用,所以通过 get_queue 函数获取 clustersender 服务。若处于创建 clustersender 服务的过程中,则把请求插入到队列中然后返回:table.insert(task_queue[node], table.pack(address, ...)) 。通过对 task_queue 设置元表 setmetatable(task_queue, { __index = get_queue } ) ,来触发首次调用 request_sender 函数。

函数 request_sender 的注意点在于需要按顺序逐个唤醒等待的协程。由于在 ipairs(q) 的过程中,可能存在对 q 的修改是 table.insert 操作,因此是允许的。最后设置 sender[node] = c 表示服务创建完毕,后续通过 sender 获取即可。

for _, task in ipairs(q) do
    if type(task) == "table" then
        if c then
            skynet.send(c, "lua", "push", task[1], skynet.pack(table.unpack(task,2,task.n)))
        end
    else
        skynet.wakeup(task)
        skynet.wait(confirm)
    end
end
task_queue[node] = nil
sender[node] = c

创建 clustersender 服务,建立网络连接

函数 clusterd.lua:14 open_channel 完成连接建立过程。流程如下:

  • 若当前正处于连接建立的过程中,则挂起当前协程,等待。

    local ct = connecting[key] if ct then local co = coroutine.running() table.insert(ct, co) skynet.wait(co) return assert(ct.channel) end

  • 获取节点的 IP 地址数据:local address = node_address[key] 。若 not config.nowaitingtrue 则需要等待节点的地址数据,此时也会被挂起,等待在 loadconfig 中被唤醒。

  • 创建 clustersender 服务,调用 pcall(skynet.call, c, "lua", "changenode", host, port)clustersender 发送 changenode 消息,由 clustersender 服务 connect 目标节点。

  • 完成连接建立,唤起 connecting 中之前被挂起的连接请求。

    connecting[key] = nil for _, co in ipairs(ct) do skynet.wakeup(co) end

  • 检查 node_address[key] 节点地址是否发生变更,若变更则再次调用 open_channel 函数。

    if node_address[key] ~= address then return open_channel(t,key) end

于是,连接建立完成,可与目标节点通信。

Cluster 接收网络连接数据

clusterd 服务创建 gate 服务,接收来自 gate 服务的消息处理网络请求。当有新的网络连接时 clusterd.lua195 command.socket 函数处理新的连接,此时 subcmd == "open" 创建 clusteragent 服务接收网络数据。
subcmd == "closer" or subcmd == "error" 表示网络连接已经断开,此时处理连接断开逻辑。

function command.socket(source, subcmd, fd, msg)
    if subcmd == "open" then
        skynet.error(string.format("socket accept from %s", msg))
        -- new cluster agent
        cluster_agent[fd] = false
        local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
        local closed = cluster_agent[fd]
        cluster_agent[fd] = agent
        if closed then
            skynet.send(agent, "lua", "exit")
            cluster_agent[fd] = nil
        end
    else
        if subcmd == "close" or subcmd == "error" then
            -- close cluster agent
            local agent = cluster_agent[fd]
            if type(agent) == "boolean" then
                cluster_agent[fd] = true
            elseif agent then
                skynet.send(agent, "lua", "exit")
                cluster_agent[fd] = nil
            end
        else
            skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
        end
    end
end

Cluster 具体的发送和接收数据逻辑

发送数据的细节见 clustersender 服务,接收数据的细节见 clusteragent 服务。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
2年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这