Golang chan

go 语言中重要的组件chan,可以简单的理解为一个带锁的消息队列。

0x01 组件元素

1.1 chan的定义

更愿意称chan为golang语言中的组件,而不是语法糖;chan有边界,例如一个抽水机,有和外部交互的输入输出,内部有自己组成元素的交互逻辑。下面是chan的定义,主要的关键元素有data buff,recvq,sendq,三者呈现了生产者,消费者,数据数组的模型。

//sodog 表达了一个g协程在等待列表的情况。 例如在chan中的发送和接受
//sudog 表述的是多对多的关系,一个go协程可以在多个等待列表中。
//所以多个sodog 关联了一个go 协程,可能多个sudogs 在等待同一个同步对象。
type sudog struct {
	//关联的go协程
	g *g
	next *sudog
	prev *sudog
	//省略....
	//发送消息成功的情况下被唤醒,该字段为true
	//chan close 发送消息失败,情况下被唤醒,该字段为false
	success bool
}

//sodog 等待队列,记录了头部和尾部,环状。
type waitq struct {
	first *sudog
	last  *sudog
}

// 语法中 chan的定义
type hchan struct {
 //队列中数据量
	qcount   uint           // total data in the queue
	//环形队列的大小
	dataqsiz uint           // size of the circular queue
	//数据数组指针
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	//元素大小
	elemsize uint16
	//chan组件关闭标识
	closed   uint32
	//元素类型
	elemtype *_type // element type
	//发送者index
	sendx    uint   // send index
	//接受者index
	recvx    uint   // receive index
	//接受者,环状队列
	recvq    waitq  // list of recv waiters
	//发送者,环状队列
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	//排它锁
	lock mutex
}

1.2 和外部的交互

chan 对象实例的生命周期过程包含创建,关闭,输入数据,输出数据。

1.2.1 make

func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	//元素尺寸大小安全检查
	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	//内存对齐检查
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	//缓冲buff 内存大小溢出检查,mem 表示要分配的内存大小
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	//不带缓冲
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
		//元素指向的数据载体大小为零
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	//初始化chan的属性。
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

1.2.2 close

下面是chan的关闭源码,chan在关闭的时候会唤醒所有在等待的sender go协程 和 receiver 协程。

func closechan(c *hchan) {
//关闭一个空的chan,抛出异常
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	//判断关闭标志,如果已经关闭,抛出异常。
	lock(&c.lock)
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}
	
	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
		racerelease(c.raceaddr())
	}
	//关闭
	c.closed = 1

	// A gList is a list of Gs linked through g.schedlink. A G can only be
  // on one gQueue or gList at a time.
  //type gList struct {
  //	head guintptr
  //}
  //调度器使用go协程链表
	var glist gList

	// release all readers
	for {
	  //接收队列弹出一个sg
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		// 得到关联的协程
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		// Mark gp ready to run.
		goready(gp, 3)
	}
}

1.2.3 send

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	//判空检查,根据在不同场景下对nil chan写入时候的异常返回,这里的block表示协程在调
	//用的时候是否需要阻塞。例如:select 使用场景的时候协程不阻塞的,所以select场景
	//向nil chan写入数据时会返回false,而 chan <- xx 是阻塞场景,这时候向nil chan
	//写入数据会抛异常
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	//省略....
	//非阻塞场景,chan关闭或者满的情况下返回false
	if !block && c.closed == 0 && full(c) {
		return false
	}
	//省略...
 //如果接收者协程队列不为空,直接发送数据
  if sg := c.recvq.dequeue(); sg != nil {
  // Found a waiting receiver. We pass the value we want to send
  // directly to the receiver, bypassing the channel buffer (if any).
  send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true
	}
	//如果buff还有空间,投递的消息对象入队列
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
	//阻塞当前协程
	gp := getg()
	mysg := acquireSudog()
	c.sendq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
	
	//唤醒,释放协程
	closed := !mysg.success
	releaseSudog(mysg)
	if closed { //如果是chan 关闭情况下被唤醒抛异常
		if c.closed == 0 { 
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}

1.2.4 recev

// chan接收函数,接收值写入到ep 指针
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
//如果非阻塞,没有元素 返回false,false
// Otherwise, if c is closed, zeros *ep and returns (true, false).
//如果 chan关闭,接受元素为空,忽略接收的值返回true,false
// Otherwise, fills in *ep with an element and returns (true, true).
// 如果收到元素,返回true,true
// A non-nil ep must point to the heap or the caller's stack.
//非空的接收数据指针需要在堆上,或者调用者的栈上。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	//阻塞情况下向空chan 收数据抛异常
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	//非阻塞、队列为空
	//如果chan关闭,直接返回false
		if !block && empty(c) {
      //省略....
      if atomic.Load(&c.closed) == 0 {
        //return false,false
        return 
      }
      //队列为空
      if empty(c) {
        return true, false
      }
		}
	
    if c.closed != 0 && c.qcount == 0 {
      return true, false
    }
	

	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		
		// 存在等待中的sender协程,如果buffer 为空,直接从sender处获得值
		// 如果buffer不为空,receiver 从缓冲队列的头部读取值,sender的值,存放到
		// buffer的队尾。
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	//队列不为空,从队列中拿
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
	//非阻塞情况,队列为空
	if !block {
		unlock(&c.lock)
		return false, false
	}
	
	//阻塞
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	c.recvq.enqueue(mysg)
	//唤醒
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success

0x02 使用tips

  1. 向关闭的chan写入数据会panic
  2. 由生产者关闭chan
  3. 多生产者时,引入协调者关闭chan
  4. chan 可以做FIFO