如何创建m,m的3种状态切换

环境:

$ go version
go version go1.12.7 linux/amd64
$ uname -a
18.04.1-Ubuntu SMP x86_64 x86_64 x86_64 GNU/Linux

go调度的实现还是挺复杂的,从最简单的m切入再好不过了。

创建一个全新的m


m是一个结构体,在runtime中分配一个m通常使用new关键字new(m)。整个runtime中唯一调用new(m)的地方,就是allocm函数:

// Allocate a new m unassociated with any thread.
// Can use p for allocation context if needed.
// fn is recorded as the new m's m.mstartfn.
//
// This function is allowed to have write barriers even if the caller
// isn't because it borrows _p_.
//
//go:yeswritebarrierrec
func allocm(_p_ *p, fn func()) *m {
    // 通过tls得到当前线程正在运行的g
    // g还是g0,还不确定TODO
    // 但是看到获取_g_的目的都是在使用_g_.m(当前线程),所以无论是g还是g0,_g_.m结果都一样
	_g_ := getg() 
	_g_.m.locks++ // disable GC because it can be called from sysmon
	if _g_.m.p == 0 {
        // 如果当前线程没有p,就把为allocm函数准备的_p_
        // 暂时借给当前线程_g_.m
		acquirep(_p_) // temporarily borrow p for mallocs in this function
	}

	// Release the free M list. We need to do this somewhere and
	// this may free up a stack we can use.
	if sched.freem != nil {
		lock(&sched.lock)
		var newList *m
		for freem := sched.freem; freem != nil; {
			if freem.freeWait != 0 {
				next := freem.freelink
				freem.freelink = newList
				newList = freem
				freem = next
				continue
            }
            // 正如freeWait的注释所说
            // if == 0, safe to free g0 and delete m (atomic)
            // 释放sched.freem中sched.freem.freeWait=0的m的系统栈(g0)
			stackfree(freem.g0.stack)
			freem = freem.freelink
		}
		sched.freem = newList
		unlock(&sched.lock)
	}

	mp := new(m) // 在堆上分配一个m
    mp.mstartfn = fn 
    // (1)给mp分配信号栈gsignal
    // (2)将mp链入全局allm
	mcommoninit(mp)

	// In case of cgo or Solaris or Darwin, pthread_create will make us a stack.
    // Windows and Plan 9 will layout sched stack on OS stack.
    // 正如注释所说,如果是下面几种系统,g0栈通过创建线程分配
	if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
		mp.g0 = malg(-1)
	} else {
		mp.g0 = malg(8192 * sys.StackGuardMultiplier) // 分配g0的栈,大小为8K
	}
	mp.g0.m = mp

	if _p_ == _g_.m.p.ptr() {
		releasep() // 这里与前面的acquirep(_p_)相呼应。
	}
	_g_.m.locks--
	if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt // TODO
    }
    
	return mp
}

注意这个函数的注释:创建一个没有与thread相关联的m,为什么?因为在allocm函数里,仅仅是在用户空间分配一块m的堆内存。在这个函数里没有涉及到线程相关操作。那为什么通常说m对应一个线程呢?原因在后面解释。

在runtime里,调用allocm函数的地方有两处:

+--------+       +----------------+
| newm() |       | oneNewExtraM() |
+-----+--+       +--+-------------+
      +----+    +---+
           |    |
        +--v----v--+
        | allocm() |
        +-----+----+
              |
              v
          +---+----+
          | new(m) |
          +--------+

下面先看newm函数:

// Create a new m. It will start off with a call to fn, or else the scheduler.
// fn needs to be static and not a heap allocated closure.
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrierrec
func newm(fn func(), _p_ *p) {
	mp := allocm(_p_, fn) // 详细过程见上面
	mp.nextp.set(_p_) // 
	mp.sigmask = initSigmask
    ...
	newm1(mp)
}
func newm1(mp *m) {
    ...
	execLock.rlock() // Prevent process clone.
	newosproc(mp) // 开始创建os线程
	execLock.runlock()
}

在此之前,我们在堆上分配的m还不能称作一个线程。因为还没有进行线程的创建,系统中线程数依旧是原来的数量。

下面,就要开始创建线程了。观察我们创建的m如何与新建的线程相联系:

// located at runtime/os_linux.go
// May run with m.p==nil, so write barriers are not allowed.
//go:nowritebarrier
func newosproc(mp *m) {
	stk := unsafe.Pointer(mp.g0.stack.hi) // 得到g0栈起始位置,说明新线程的用户栈就是g0的栈
    ...
	// Disable signals during clone, so that the new thread starts
	// with signals disabled. It will enable them in minit.
	var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    // 这里就是创建线程的代码了:)
	ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
	...
}
// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));
TEXT runtime·clone(SB),NOSPLIT,$0
	MOVL	flags+0(FP), DI
	MOVQ	stk+8(FP), SI
	MOVQ	$0, DX
	MOVQ	$0, R10

	// Copy mp, gp, fn off parent stack for use by child.
	// Careful: Linux system call clobbers CX and R11.
	MOVQ	mp+16(FP), R8
	MOVQ	gp+24(FP), R9
	MOVQ	fn+32(FP), R12

	MOVL	$SYS_clone, AX // 系统调用号
	SYSCALL

	// In parent, return.
	CMPQ	AX, $0 // 如果返回值为0,是子进程返回了;如果非0,则是父进程返回了
	JEQ	3(PC)
	MOVL	AX, ret+40(FP) // 如果是父进程返回,直接将rax中的返回值(子进程的pid)放进ret变量中
	RET 

    // 注意下面的指令,只有子进程才会执行

	// In child, on new stack.
	MOVQ	SI, SP // 切换到子进程的栈上

    // If g or m are nil, skip Go-related setup.
    // 通过前面的分析可以知道可以知道,m和m.g0都有值
	CMPQ	R8, $0    // m
	JEQ	nog
	CMPQ	R9, $0    // g
	JEQ	nog

    // Initialize m->procid to Linux tid
    // 因为现在已经在子进程中了,所以只有通过系统调用的方式获得子进程的pid
	MOVL	$SYS_gettid, AX
	SYSCALL
	MOVQ	AX, m_procid(R8)

    // Set FS to point at m->tls.
    // 通过SYS_arch_prctl系统调用来设置m.tls为当前进程(线程)的tls
	LEAQ	m_tls(R8), DI
	CALL	runtime·settls(SB)

	// In child, set up new stack
	get_tls(CX)
	MOVQ	R8, g_m(R9) // g0.m = m
	MOVQ	R9, g(CX)   // 将g0设置为当前m正在运行的g
	CALL	runtime·stackcheck(SB)

nog:
	// Call fn
	CALL	R12   // 调用mstart函数,never return
    ...

在上面代码的分析中,在不同的语境下,我将线程与进程混用,这是可以的(参考内核书籍进程相关章节,如「ULK」)。另外,对plan9不熟悉的话,可以使用objdump -d xxx | grep 'runtime.clone'查看x86汇编,对照着看。关于settls函数的内容,参考我上一篇文章Go程序启动源码分析

如果从内核的角度来看,父进程创建子进程后就回到了自己的control flow中。至于子进程什么时候被调度(mstart函数什么时候执行),那就要看内核的调度了,但是子进程迟早是会执行的。如果子进程被内核调度,回到用户空间执行的第一条语句就是CMPQ AX, $0,由于rax(AX)= 0,所以下一条指令是MOVQ SI, SP

这里父进程直接从clone函数中返回了(功成身退),而子进程则转去执行另一个控制流-mstart函数-后面会讲到)。

需要注意的是,runtime/proc.go里面有两个函数名字挺相近的函数,一个是mstart函数;另一个是startm函数。不要将二者弄混了。

现在,我们已经分析完newm函数了。再完善一下调用链:

+-----------------------+   +----------+   +--------+   +-------------------------+
| startTemplateThread() |   | startm() |   | main() |   | startTheWorldWithSema() |
+----------------+------+   ++---------+   +---+----+   +-------+-----------------+
                 +---------+ | +---------------+                |
                           | | |  +-----------------------------+
                           v v v  v
                          ++-+-+--++       +----------------+
                          | newm() |       | oneNewExtraM() |
                          +-----+--+       +--+-------------+
                                |             |
                                +----+    +---+
                                     |    |
                                  +--v----v--+
                                  | allocm() |
                                  +-----+----+
                                        v
                                    +---+----+
                                    | new(m) |
                                    +--------+

开始运行m


在继续讲解之前,先介绍一下关于m的几个状态。与p和g不同的是,m的状态没有在其结构体中与成员的形式体现出来。但是只要稍加仔细阅读源码,会发现m也是有状态的。

在源码文档中将m分为两个状态:

non-spinning状态还可以细分为两个状态(ps:这两个状态的名词是我自己造的,不具有权威性^_^):

在下面的讲解中,将会看到线程m如何在这几个状态间来回的切换。

一个新建的线程,回到用户空间,执行的第一个函数就是mstart函数。注意,这里特别强调第一个函数,第一条指令在前面已经讲过。

// Called to start an M.
//
// This must not split the stack because we may not even have stack
// bounds set up yet.
//
// May run during STW (because it doesn't have a P yet), so write
// barriers are not allowed.
//
//go:nosplit
//go:nowritebarrierrec
func mstart() {
	_g_ := getg()
	...
	// Initialize stack guards so that we can start calling
	// both Go and C functions with stack growth prologues.
	_g_.stackguard0 = _g_.stack.lo + _StackGuard
	_g_.stackguard1 = _g_.stackguard0
	mstart1()
	...
}

这个函数会在总共两个地方调用(它们都是在g0上开始运行):

func mstart1() {
	_g_ := getg()

	if _g_ != _g_.m.g0 {
		/*更加说明了,mstart函数是运行在g0上的,但是执行当前代码的线程可能是m0,也可能是其他的*/
		throw("bad runtime·mstart") 
	}

	// Record the caller for use as the top of stack in mcall and
	// for terminating the thread.
	// We're never coming back to mstart1 after we call schedule,
	// so other calls can reuse the current frame.
	save(getcallerpc(), getcallersp()) /*更新当前g0的sched.pc和sched.sp*/
	asminit()                          /*在asm.amd64.s中,没有函数体*/
	minit()                            /*在os_linux.go中,*/

	// Install signal handlers; after minit so that minit can
	// prepare the thread to be able to handle the signals.
	if _g_.m == &m0 {
		mstartm0() /*只有在程序启动时,才会执行到这里*/
	}

	if fn := _g_.m.mstartfn; fn != nil {
		/*
			执行创建m时传入的参数fn(想要执行的函数),如果mstartfn != nil,只有下面3种情况:
			(1)mspinning(将m的状态设置为spinning)
			(2)templateThread(一直循环)
			(3)sysmon(一直循环)
			其中第一种是最常见的情况,这三种情况运行都不需要p
		*/
		fn()
	}

	/*
		从这里开始,说明m执行完了自己的任务函数m.mstartfn
		下面应该开始去执行任务g了,前提是拿到一个p
	*/

	if _g_.m != &m0 {
		acquirep(_g_.m.nextp.ptr()) /*给m绑定一个p,在创建线程的时候,newm中设置了m.nextp*/
		_g_.m.nextp = 0
	}
	/*执行前,m的状态为spinning*/
	schedule()
}
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
	_g_ := getg()
	...
top:
	...
	var gp *g
	var inheritTime bool
	...
	if gp == nil {
		// Check the global runnable queue once in a while to ensure fairness.
		// Otherwise two goroutines can completely occupy the local runqueue
		// by constantly respawning each other.
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			/*
				从全局队列sched.runq中获取一个,注意如果globrunqget第二个参数为0的话
				globrunqget函数的效果有一点不同
			*/
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}
	if gp == nil {
		/*
			当前线程的本地队列_g_.m.p.runq中获取一个q
		*/
		gp, inheritTime = runqget(_g_.m.p.ptr())
		if gp != nil && _g_.m.spinning {
			throw("schedule: spinning with local work")
		}
	}
	if gp == nil {
		/*
			在findrunnable函数里,就是处于spinning状态
		*/
		gp, inheritTime = findrunnable() // blocks until work is available
	}

	// This thread is going to run a goroutine and is not spinning anymore,
	// so if it was marked as spinning we need to reset it now and potentially
	// start a new spinning M.
	/*
		留意上面的注释⬆⬆⬆
	*/
	if _g_.m.spinning {
		/*
			因为已经找到可运行的goroutine了,将本线程m设置为non-spinning状态
			说明一个处于spinning状态,说明它正在寻找goroutine
		*/
		resetspinning() 
	}
	/*
		至此m开始处于non-spinning状态,也就是executing状态
		这个线程m通过gogo函数,开始执行gp这个goroutine
	*/
	...
	execute(gp, inheritTime)
}

就这样,线程在schedule函数中不断的找可运行的g,并来回的在g与g0中切换。schedule()里面值得注意的是resetspinning函数:

func resetspinning() {
	_g_ := getg()
	if !_g_.m.spinning {
		throw("resetspinning: not a spinning m")
	}
	_g_.m.spinning = false
	nmspinning := atomic.Xadd(&sched.nmspinning, -1)
	if int32(nmspinning) < 0 {
		throw("findrunnable: negative nmspinning")
	}
	// M wakeup policy is deliberately somewhat conservative, so check if we
	// need to wakeup another P here. See "Worker thread parking/unparking"
	// comment at the top of the file for details.
	/*
		见文件开始的注释:
		We unpark an additional thread when we ready a goroutine if
		(1) there is an idle P and there are no "spinning" worker threads.
	*/
	if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
		wakep()
	}
}

在runtime中所有调用wakep()的地方均要满足两个条件:

总结就是,如果m去运行一个g了(前提是能找到一个runnable g),那么这时需要根据上面两个条件对系统中的m进行判断,看是否有必要唤醒一个新的m。

// Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready).
func wakep() {
	// be conservative about spinning threads
	if !atomic.Cas(&sched.nmspinning, 0, 1) {
		return
	}
	/*
		(1)第一个参数为nil,说明在startm函数内部会去找空闲的p
		(2)第二个参数为true,说明startm()的caller已经将sched.nmspinning加1了
	*/
	startm(nil, true)
}
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
	lock(&sched.lock)
	if _p_ == nil {
		_p_ = pidleget() /*如果没有指定p,就从全局空闲p队列中取一个出来*/
		...
	}
	mp := mget() /*Note:从全局sched中获取一个m,有可能为nil,也即是unpark m*/
	unlock(&sched.lock)
	if mp == nil {
		var fn func()
		if spinning {
			// The caller incremented nmspinning, so set m.spinning in the new M.
			fn = mspinning
		}
		newm(fn, _p_) //= 通过newm新建的线程m,从内核回来后执行mstart函数,然后执行fn,将自己设置为spinning状态
		return
	}
	if mp.spinning {
		/*通过mget函数获取的m,是从sched.midle中获得的m,应该处于non-spinning(parking)状态*/
		throw("startm: m is spinning") 
	}
	if mp.nextp != 0 {
		/*处于parking状态的m不应该有p*/
		throw("startm: m has p") 
	}
	if spinning && !runqempty(_p_) {
		throw("startm: p has runnable gs")
	}
	// The caller incremented nmspinning, so set m.spinning in the new M.
	mp.spinning = spinning
	mp.nextp.set(_p_)
	notewakeup(&mp.park) /*底层通过系统调用SYS_futex唤醒m*/
}

凡是调用了startm函数,说明有p了(要么指定p;要么全局sched.pidle中有空闲p),为了不浪费cpu资源,应该尽快找一个m来。

与startm函数对应的是stopm函数:

func stopm() {
	_g_ := getg()

	if _g_.m.locks != 0 {
		throw("stopm holding locks")
	}
	if _g_.m.p != 0 {
		throw("stopm holding p")
	}
	if _g_.m.spinning {
		throw("stopm spinning") /*说明在调用stopm前,必须处于no-spinning状态*/
	}

	lock(&sched.lock)
	mput(_g_.m) 
	unlock(&sched.lock)
	notesleep(&_g_.m.park) /*通过系统调用SYS_futex让m休眠,os_linux.go*/
	noteclear(&_g_.m.park)
	acquirep(_g_.m.nextp.ptr())
	_g_.m.nextp = 0
}

这个函数通过mput()将当前线程m放进全局队列sched.midle中。由此线程m变成了non-spinning(parking)状态。

最后用一张图来描述m状态的变迁:

m状态变迁

over:)!