发送与接收流程

环境:

$ go version
go version go1.12.7 linux/amd64
$ uname -a
18.04.1-Ubuntu SMP x86_64 x86_64 x86_64 GNU/Linux

大致流程


Go runtime包里的chan.go实现里channel的大部分功能。其中chansend()chanrecv()两个函数完成了channel的发送与接收功能。

值得注意的是,源码中没有特意的将buffered channel与unbuffered channel分开处理。二者的发送与接收仍然是通过chansend()chanrecv()两个函数完成。二者处理逻辑大致相同。

func chansend()

c <- *ep

func chanrecv()

*ep <- c

两个函数的代码逻辑并不复杂,两个函数中都出现了g的’泄漏’,值得理解一下,我不知道使用’泄漏’一词是否准确。之所以称之为’泄漏’,且看下面的代码:

if c == nil {
	if !block {
        ...
	}
	gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
	throw("unreachable")
}

当channel为nil时,直接将当前g park,重新调度。这会让我们再也无法重新调度到当前g。与之类似的操作:

gp := getg()
mysg := acquireSudog()
...
mysg.g = gp
...
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

以这种方式park一个g,在以后的某个时刻,可以通过hchan.sendq拿到g的指针,让其重新进入调度。

与Kernel设计再次”撞脸”


对channel的整个流程分析下来,发现其在发送时的优化与Kernel中System V 消息队列的发送优化类似:

if (!pipelined_send(msq, msg)) { /* 如果有进程在等待消息,直接给它 */
		/* no one is waiting for this message, enqueue it */
		list_add_tail(&msg->m_list, &msq->q_messages);
		msq->q_cbytes += msgsz;
		msq->q_qnum++;
		atomic_add(msgsz, &ns->msg_bytes);
		atomic_inc(&ns->msg_hdrs);
	}

详见Kernel源码

未解疑惑


最后还有一点关于创建channel的疑惑:

/*
	hchan.buf为空: unbuffered channel
*/
case mem == 0:
	// Queue or element size is zero.
	c = (*hchan)(mallocgc(hchanSize, nil, true))
	// Race detector uses this location for synchronization.
	c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
	// Elements do not contain pointers.
	// Allocate hchan and buf in one call.
	c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
	c.buf = add(unsafe.Pointer(c), hchanSize)
default:
	// Elements contain pointers.
	c = new(hchan)
	c.buf = mallocgc(mem, elem, true)
}

如果hchan.buf中包含指针,则不将两段内存(hchan与hchan.buf)连续分配,为什么?TODO

参考资料: