go 语言中重要的组件chan,可以简单的理解为一个带锁的消息队列。
0x01 组件元素
1.1 chan的定义
更愿意称chan为golang语言中的组件,而不是语法糖;chan有边界,例如一个抽水机,有和外部交互的输入输出,内部有自己组成元素的交互逻辑。下面是chan的定义,主要的关键元素有data buff,recvq,sendq,三者呈现了生产者,消费者,数据数组的模型。
//sodog 表达了一个g协程在等待列表的情况。 例如在chan中的发送和接受
//sudog 表述的是多对多的关系,一个go协程可以在多个等待列表中。
//所以多个sodog 关联了一个go 协程,可能多个sudogs 在等待同一个同步对象。
type sudog struct {
//关联的go协程
g *g
next *sudog
prev *sudog
//省略....
//发送消息成功的情况下被唤醒,该字段为true
//chan close 发送消息失败,情况下被唤醒,该字段为false
success bool
}
//sodog 等待队列,记录了头部和尾部,环状。
type waitq struct {
first *sudog
last *sudog
}
// 语法中 chan的定义
type hchan struct {
//队列中数据量
qcount uint // total data in the queue
//环形队列的大小
dataqsiz uint // size of the circular queue
//数据数组指针
buf unsafe.Pointer // points to an array of dataqsiz elements
//元素大小
elemsize uint16
//chan组件关闭标识
closed uint32
//元素类型
elemtype *_type // element type
//发送者index
sendx uint // send index
//接受者index
recvx uint // receive index
//接受者,环状队列
recvq waitq // list of recv waiters
//发送者,环状队列
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
//排它锁
lock mutex
}
1.2 和外部的交互
chan 对象实例的生命周期过程包含创建,关闭,输入数据,输出数据。
1.2.1 make
func makechan(t *chantype, size int) *hchan {
elem := t.elem
//元素尺寸大小安全检查
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
//内存对齐检查
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//缓冲buff 内存大小溢出检查,mem 表示要分配的内存大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
//不带缓冲
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
//元素指向的数据载体大小为零
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
//初始化chan的属性。
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
1.2.2 close
下面是chan的关闭源码,chan在关闭的时候会唤醒所有在等待的sender go协程 和 receiver 协程。
func closechan(c *hchan) {
//关闭一个空的chan,抛出异常
if c == nil {
panic(plainError("close of nil channel"))
}
//判断关闭标志,如果已经关闭,抛出异常。
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
//关闭
c.closed = 1
// A gList is a list of Gs linked through g.schedlink. A G can only be
// on one gQueue or gList at a time.
//type gList struct {
// head guintptr
//}
//调度器使用go协程链表
var glist gList
// release all readers
for {
//接收队列弹出一个sg
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 得到关联的协程
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// Mark gp ready to run.
goready(gp, 3)
}
}
1.2.3 send
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//判空检查,根据在不同场景下对nil chan写入时候的异常返回,这里的block表示协程在调
//用的时候是否需要阻塞。例如:select 使用场景的时候协程不阻塞的,所以select场景
//向nil chan写入数据时会返回false,而 chan <- xx 是阻塞场景,这时候向nil chan
//写入数据会抛异常
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//省略....
//非阻塞场景,chan关闭或者满的情况下返回false
if !block && c.closed == 0 && full(c) {
return false
}
//省略...
//如果接收者协程队列不为空,直接发送数据
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//如果buff还有空间,投递的消息对象入队列
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
//阻塞当前协程
gp := getg()
mysg := acquireSudog()
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
//唤醒,释放协程
closed := !mysg.success
releaseSudog(mysg)
if closed { //如果是chan 关闭情况下被唤醒抛异常
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
1.2.4 recev
// chan接收函数,接收值写入到ep 指针
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
//如果非阻塞,没有元素 返回false,false
// Otherwise, if c is closed, zeros *ep and returns (true, false).
//如果 chan关闭,接受元素为空,忽略接收的值返回true,false
// Otherwise, fills in *ep with an element and returns (true, true).
// 如果收到元素,返回true,true
// A non-nil ep must point to the heap or the caller's stack.
//非空的接收数据指针需要在堆上,或者调用者的栈上。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
//阻塞情况下向空chan 收数据抛异常
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//非阻塞、队列为空
//如果chan关闭,直接返回false
if !block && empty(c) {
//省略....
if atomic.Load(&c.closed) == 0 {
//return false,false
return
}
//队列为空
if empty(c) {
return true, false
}
}
if c.closed != 0 && c.qcount == 0 {
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 存在等待中的sender协程,如果buffer 为空,直接从sender处获得值
// 如果buffer不为空,receiver 从缓冲队列的头部读取值,sender的值,存放到
// buffer的队尾。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//队列不为空,从队列中拿
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
//非阻塞情况,队列为空
if !block {
unlock(&c.lock)
return false, false
}
//阻塞
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
c.recvq.enqueue(mysg)
//唤醒
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
0x02 使用tips
- 向关闭的chan写入数据会panic
- 由生产者关闭chan
- 多生产者时,引入协调者关闭chan
- chan 可以做FIFO