Go Chanel 使用与原理 二

智码旅人
• 阅读 3065

有了上篇的基本了解,可以翻阅源码了

涉及的数据结构

// Go/src/runtime/chan.go

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

type waitq struct {
    first *sudog
    last  *sudog
}


// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

Go Chanel 使用与原理 二

makechan()

  • make(chan int,3)带有设置缓存大小的参数,则会分配一段连续空间,buf 指向这段内存空间
  • c.buf = add(unsafe.Pointer(c), hchanSize) 分配 hchanSize 大小的空间,其中常量 hchanSize hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    ···

    var c *hchan // 返回的是指针
    switch {
    case size == 0 || elem.size == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()  
    case elem.kind&kindNoPointers != 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)   // <============
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(uintptr(size)*elem.size, elem, true)   // <==============
    }
    ···
    return c
}

chansend()

我们截取不同情况的代码段。

如果 chanel 为 nil

如果 chanel 为空,即没有用 make 分配内存,那么会调用 gopark 方法

if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

而 gopark 方法会将当前的 goroutine 休眠,然后回调通过参数传来的 unlockf 方法。注意看回上面的代码,调用 gopark 方法时传递的 unlockf 参数为 nil,所以会一直休眠。

// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
// Reason explains why the goroutine has been parked.
// It is displayed in stack traces and heap dumps.
// Reasons should be unique and descriptive.
// Do not re-use reasons, add new ones.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    if reason != waitReasonSleep {
        checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
    }
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock
    mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
    mcall(park_m)
}

这时, Go 语言启动的时候会有一个 goroutine sysmon 一直检测系统的运行情况,其中有一个方法 checkdead(),当检测到所有 goroutine 都处于休眠,即死锁,便抛出错误。

//  /Go/src/runtime/proc.go

func checkdead() {
    ...
    throw("all goroutines are asleep - deadlock!")  
}

如果 chanel 已经被关闭了

直接引发 panic:

    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

发送数据的三种情况

当前 hchan 的 recvq 接收队列上已经有 goroutine 阻塞

    lock(&c.lock)
    ···
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

send 方法判断到接收方 sudog 的 elem 字段存有对应的内存空间地址值的话,调用 sendDirect 方法。这里啰嗦一下,sudog 的 elem 是在 func chanrecv 方法中赋值的,将接收方 goroutine 用来接收数据的栈空间地址赋值给 elem。

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ···
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

sendDirect 方法中 memmove 方法直接拷贝 t.size 个字节到目的内存空间。

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src is on our stack, dst is a slot on another stack.

    // Once we read sg.elem out of sg, it will no longer
    // be updated if the destination's stack gets copied (shrunk).
    // So make sure that no preemption points can happen between read & use.
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    // No need for cgo write barrier checks because dst is always
    // Go memory.
    memmove(dst, src, t.size)
}

当前 hchan.buf 还有可用空间:将数据放到 buffer 里面。


    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

当前 hchan.buf 已满:阻塞当前 goroutine

    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

goparkunlock 方法将当前 goroutine 休眠,并且释放锁资源

// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {
    gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}
参考文章
http://legendtkl.com/2017/08/...
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
东方客主 东方客主
4年前
go-map源码简单分析(map遍历为什么时随机的)
GO中map的底层是如何实现的首先Go语言采用的是哈希查找表,并且使用链表解决哈希冲突。GO的内存模型先看这一张map原理图(https://imghelloworld.osscnbeijing.aliyuncs.com/49dfa7b81e19fbab143ddc0a7b3b7fa0.png)map再来看
九路 九路
4年前
理解go语言包导入路径的含义
Go语言是使用包(package)作为基本单元来组织源码的,可以说一个Go程序就是由一些包链接在一起构建而成的。虽然与Java、Python等语言相比这算不上什么创新,但与祖辈C语言的头文件包含机制相比则是“先进”了许多。编译速度快是这种”先进性“的一个突出表现,即便是每次编译都是从零开始。Go语言的这种以包为基本构建单元的构建模型使得依赖分
Peter20 Peter20
4年前
mysql中like用法
like的通配符有两种%(百分号):代表零个、一个或者多个字符。\(下划线):代表一个数字或者字符。1\.name以"李"开头wherenamelike'李%'2\.name中包含"云",“云”可以在任何位置wherenamelike'%云%'3\.第二个和第三个字符是0的值wheresalarylike'\00%'4\
捉虫大师 捉虫大师
3年前
Sentinel-Go 源码系列(一)|开篇
大家好呀,打算写一个Go语言组件源码分析系列,一是为了能学习下Go语言,看下别人是怎么写Go的,二是也掌握一个组件。本次选择了SentinelGo,一是对Java版本的Sentinel算是有一些了解,也在生产上落地过,二是感觉他的代码应该不会太复杂(仅仅是感觉),三是在云原生越来越热的趋势下,用Go实现的限流降级容错应该是比较通用的
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
MySQL数据库InnoDB存储引擎Log漫游(1)
作者:宋利兵来源:MySQL代码研究(mysqlcode)0、导读本文介绍了InnoDB引擎如何利用UndoLog和RedoLog来保证事务的原子性、持久性原理,以及InnoDB引擎实现UndoLog和RedoLog的基本思路。00–UndoLogUndoLog是为了实现事务的原子性,
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
智码旅人
智码旅人
Lv1
其实人世间的挚爱,只有两件事情就可以表达。
文章
5
粉丝
0
获赞
0