版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

本文基于go1.17.6

1.什么是协程?

协程,英文Coroutines,是一种基于线程之上,但又比线程更加轻量级的存在,这种由程序员自己写程序来管理的轻量级线程叫做『用户空间线程』, 具有对内核来说不可见的特性。

1.1 协程的特点

  • 线程的切换由操作系统负责调度,协程由用户自己进行调度
  • 线程的默认Stack大小是MB级别,而协程更轻量,接近KB级别。
  • 在同一个线程上的多个协程,访问某些资源可以不需要锁
  • 适用于被阻塞的,且需要大量并发的场景。

1.2 Golang的GMP模型

GMP模型

  • CPU驱动线程上的任务执行
  • 线程由操作系统内核进行调度,Goroutine由Golang运行时(runtime)进行调度
  • P的 local runnable queue是无锁的,global runnable queue是有锁的
  • P的 local runnable queue长度限制为256

注意:

    1. M和P是绑定关系
    1. M和G是绑定关系
    1. P只是暂存G,他们之间不是绑定关系

E-R图

E-R图

简化后的E-R图

E-R图简化后

注意: 后面为了书写简单直接将

  • local runnable queue表示为本地队列
  • global runnable queue表示为全局队列

延伸

timer的四叉堆和内存分配器使用的mcache也是每个P一个

Q: 为什么默认情况下P的数量与CPU数量一致?

A:

这样可以避免把CPU时间浪费在上线文切换上

1.3 协程和线程的资源消耗对比

类别栈内存上下文切换备注
Thread1MB1us内存占用使用的是Java线程的默认栈大小
Goroutine4KB0.2us内存占用使用的是Linux+x86下的栈大小

2. 常见的Goroutine 让出/调度/抢占场景

Goroutine状态机

2.1 协程被创建

  • 1)P的本地队列有剩余空间时,放入P的本地队列
  • 2)P的本地队列没有剩余空间时,将本地队列的一部分Goroutine以及待加入的Goroutine添加到全局队列

2.2 主动让出

2.2.1 使用runtime.Gosched

gosched.go

package main

import (
	"fmt"
	"runtime"
)

func main() {
	runtime.GOMAXPROCS(1)
	for i := 0; i < 100; i++ {
		fmt.Println("Goroutine1:", i)
		//time.Sleep(500 * time.Millisecond)
		if i == 5 {
			go func() {
				for i := 0; i < 100; i++ {
					fmt.Println("Goroutine2:", i)
					//time.Sleep(500 * time.Millisecond)
				}
			}()
			runtime.Gosched()
		}
	}
}

具体执行流程

1.Gosched() -> 2.mcall(fn func(*g)) -> 3.gosched_m(gp *g) ->
4.goschedImpl(gp *g) ->  dropg()
                          globrunqput() 
                          schedule() 

step2 mcall(fn func(*g))

mcall函数是通过汇编实现的, 在 asm_amd64.s

  • 1)保存当前goroutine的状态(PC/SP)到g->sched中,方便下次调度;
  • 2)切换到m->g0的栈;
  • 3)然后g0的堆栈上调用fn;

注意:每个m上都有一个自己g0,仅用于调度,不指向任何可执行的函数

mcall returns to the original goroutine g later, when g has been rescheduled. fn must not return at all; typically it ends by calling schedule, to let the m run other goroutines.

step4 goschedImpl(gp *g)

  • 1)修改Goroutine的状态 _Grunning -> _Grunnable
  • 2)dropg() 将G和M解绑
  • 3)globrunqput(gp) 将G放入全局runnable队列
  • 4)schedule() 进行一轮调度,寻找一个runnable的G,并执行它,函数不会返回

step4-4 schedule()

schedule() –> 1.findrunnable()
              2.execute() –>gogo()
step4-4 schedule() –> findrunnable()
  • 1)从同一个P的本地runnable队列中
  • 2)从全局的runnable队列中
  • 3)从网络轮训器(netpoll)中,是否有事件就绪的G
  • 4)通过runtime.runqsteal从其它P的本地runnable队列偷取一半G放入本地runnable队列,并取出一个用来执行

gogo() 汇编实现,用于恢复现场(PC/SP),运行上一步找到的新的可运行的G

Q:为什么一定要单独设置一个g0来执行goschedImpl(gp *g)

A: schedule()会将G的stack搞乱

另外有些文章说g0的栈会使用操作系统分配的线程栈,但是根据萌叔的研究,这个与操作系统有关的。 macOS是使用的线程栈(8MB),但是linux下g0的栈初始大小只有8KB。 runtime/proc.go

	// In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.
	// Windows and Plan 9 will layout sched stack on OS stack.
	if iscgo || mStackIsSystemAllocated() {
		mp.g0 = malg(-1)
	} else {
		mp.g0 = malg(8192 * sys.StackGuardMultiplier)
	}
// mStackIsSystemAllocated indicates whether this runtime starts on a
// system-allocated stack.
func mStackIsSystemAllocated() bool {
	switch GOOS {
	case "aix", "darwin", "plan9", "illumos", "ios", "solaris", "windows":
		return true
	case "openbsd":
		switch GOARCH {
		case "386", "amd64", "arm", "arm64":
			return true
		}
	}
	return false
}

2.2.2 任务执行完毕

goexit1() -> mcall(fn func(*g)) -> goexit0(gp *g)
// goexit continuation on g0.
func goexit0(gp *g) {
	_g_ := getg()
	casgstatus(gp, _Grunning, _Gdead)
	gp.m = nil
	dropg()
	gfput(_g_.m.p.ptr(), gp)
	schedule()
}
  • 1)修改G的状态 _Grunning -> _Gdead
  • 2)解除G和M的绑定关系
  • 3)将G放入P的空闲G的链表(gfree list)
  • 4)触发一轮调度

2.3 抢占式调度

抢占式调度是由守护进程 sysmon() 触发的 sysmon()是一个特殊的m,它不需要和P进行绑定。

func sysmon() {
    for{
        // 1. 运行计时器
        // 2. 检查网络轮询器(netpoll)
        // 3. 触发抢占式调度
        // 4. 触发GC
    }
}

最大时间片是10ms

推荐观看 幼麟实验室的视频

  1. 深度探索Go语言:抢占式调度
  2. 深度探索Go语言:抢占式调度(2)

2.3.1 基于协作的抢占式调度

依赖栈增长监测代码

sysmon() -> retake() -> preemptone()
type g struct {
	stackguard0 uintptr // offset known to liblink
	preempt     bool // preemption signal, duplicates stackguard0 = stackpreempt
}

2.3.2 基于信号的抢占式调度

发出信号

sysmon() -> retake() -> preemptone() -> signalM(mp, sigPreempt)
func preemptone(_p_ *p) bool {
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}

	gp.preempt = true

	// Every call in a goroutine checks for stack overflow by
	// comparing the current stack pointer to gp->stackguard0.
	// Setting gp->stackguard0 to StackPreempt folds
	// preemption into the normal stack overflow check.
	gp.stackguard0 = stackPreempt

	// Request an async preemption of this P.
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	}

	return true
}

接收到信号

sighandler() -> doSigPreempt() -> asyncPreempt()->  globalrunqput()

asyncPreempt由汇编实现 preempt_amd64.s

2.4 hand off p

延伸阅读

disk io引起golang线程数暴涨的问题

场景读写磁盘文件

涉及syscall.Syscall()

注意:

  • 由于handeroff机制的存在,读写磁盘文件的过程中,如果IO的压力过大可能会导致大量的系统线程被创建
  • 在Golang中(Linux平台),读写网络连接是非阻塞式系统调用,并且是边缘触发
  • 在Golang中(Linux平台),读写磁盘文件是阻塞式系统调用
File.Read() -> poll.FD.Read() -> syscall.Read() -> syscall.Syscall()

poll/fd_unix.go

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
    ...
	for {
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) // 执行syscall
		if err != nil {
			n = 0
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil { // gopark
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}
func read(fd int, p []byte) (n int, err error) {
	var _p0 unsafe.Pointer
	if len(p) > 0 {
		_p0 = unsafe.Pointer(&p[0])
	} else {
		_p0 = unsafe.Pointer(&_zero)
	}
	r0, _, e1 := Syscall(SYS_READ, uintptr(fd), uintptr(_p0), uintptr(len(p)))
	n = int(r0)
	if e1 != 0 {
		err = errnoErr(e1)
	}
	return
}

Syscall和RawSyscall的源码

2.4.1 entersyscall()


1.设置_g_.m.locks++,禁止g被强占
2.设置_g_.stackguard0 = stackPreempt,禁止调用任何会导致栈增长/分裂的函数
3.保存现场,在 syscall 之后会依据这些数据恢复现场
4.更新G的状态为_Gsyscall
5.释放局部调度器P:解绑P与M的关系;
6.更新P状态为_Psyscall
7.g.m.locks–解除禁止强占。

进入系统调用的goroutine会阻塞,导致内核M会阻塞。此时P会被剥离掉, 所以P可以继续去获取其余的空闲M执行其余的goroutine。

2.4.2 阻塞式系统调用长期运行将会导致的流程

sysmon() -> retake() -> handoffp()

如果P的本地队列不为空,handoffp()会尝试获取一个M来运行P M有2种来源:

  • 1)midle: idle m’s waiting for work
  • 2)newm() 创建一个新M

2.4.3 exitsyscall()


1.设置 g.m.locks++ 禁止强占
2.调用 exitsyscallfast() 快速退出系统调用
    2.1. Try to re-acquire the last P,如果成功就直接接return;
    2.2. Try to get any other idle P from allIdleP list;
    2.3. 没有获取到空闲的P
3.如果快速获取到了P:
    3.1. 更新G 的状态是_Grunning
    3.2. 与G绑定的M会在退出系统调用之后继续执行
4. 没有获取到空闲的P:
    4.1. 调用mcall()函数切换到g0的栈空间;
    4.2. 调用exitsyscall0函数:
        4.2.1. 更新G 的状态是_Gwaiting
        4.2.2. 调用dropg():解除当前g与M的绑定关系;
        4.2.3. 调用globrunqput将G插入global queue的队尾,
        4.2.4. 调用stopm()释放M,将M加入全局的idel M列表,这个调用会阻塞,知道获取到可用的P。
        4.2.5. 如果4.2.4中阻塞结束,M获取到了可用的P,会调用schedule()函数,执行一次新的调度。

2.5 系统调用

以netpoll为例,linux操作系统下,netpoll基于epoll实现的

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

2.5.1 让出流程

pollDesc.waitRead() -> runtime.poll_runtime_pollWait() -> runtime.gopark()
-> mcall(fn func(*g)) -> park_m(gp *g)

gopark()主要流程

  • 1)g切换到g0
  • 2)修改G的状态 _Grunning -> _Gwaiting
  • 3)dropg() 解除G和M的绑定关系
  • 4)schedule() 触发一轮调度

2.5.2 放回 主动触发netpoll()

findrunnable()->netpoll()->injectglist()->globrunqputbatch()/runqputbatch()
pollWork()
startTheWorldWithSema
sysmon()

注意: netpoll函数是非阻塞的

2.6 定时器

场景

time.Sleep(1 * time.Second)

2.6.1 让出

time.Sleep() -> runtime.timeSleep() -> mcall() -> park_m()
-> resetForSleep() -> resettimer() -> doaddtimer()

2.6.2 唤醒

checkTimers() -> runtimer() -> runOneTimer() -> goready() -> systemstack() -> ready()

goready()主要流程

  • 1)切换到g0的栈空间(不严谨)
  • 2)修改Goroutine的状态 _Gwaiting -> _Grunnable
  • 3)runqput() 把Goroutine放入P的本地队列的头部
  • 4)如果M的数量不足,尝试创建一些M

注意: mcall()和systemstack()是对应的

2.7 Channel

场景

ch := make(chan int)
ch <- 15

2.7.1 写入channel并阻塞

chansend1() -> chansend() ->  hchan.sendq.enqueue() -> gopark()

2.7.2 就绪

chanrecv1() -> chanrecv() ->  hchan.sendq.dequeue() -> recv() -> goready()

2.8 同步原语

互斥锁与2.6、2.7的情况非常类似,只是G会存储在Treap中

  • Treap是一种平衡二叉查找树
  • semaRoot 是全局唯一的

3.参考资料

1.g0-特殊的goroutine 2.golang syscall原理 3.Linux中的EAGAIN含义 4.Golang-gopark函数和goready函数原理分析 5.幼麟实验室-协程让出、抢占、监控和调度 6.Golang 调度器 GMP 原理与调度全分析 7.time.Sleep(1) 后发生了什么 8.mcall systemstack等汇编函数


请我喝瓶饮料

微信支付码