int makecontext(ucontext_t *ucp, void (*func)(), int argc, ...)
该函数用以初始化一个ucontext_t类型的结构,也就是我们所说的用户执行上下文。函数指针func指明了该context的入口函数,argc指明入口参数个数,该值是可变的,但每个参数类型都是int型,这些参数紧随argc传入。
另外,在调用makecontext之前,一般还需要显式的指明其初始栈信息(栈指针SP及栈大小)和运行时的信号屏蔽掩码(signal mask)。
同时也可以指定uc_link字段,这样在func函数返回后,就会切换到uc_link指向的context继续执行。
int setcontext(const ucontext_t *ucp)
该函数用来将当前程序执行线索切换到参数ucp所指向的上下文状态,在执行正确的情况下,该函数直接切入到新的执行状态,不再会返回。比如我们用上面介绍的makecontext初始化了一个新的上下文,并将入口指向某函数entry(),那么setcontext成功后就会马上运行entry()函数。
int getcontext(ucontext_t *ucp)
该函数用来将当前执行状态上下文保存到一个ucontext_t结构中,若后续调用setcontext或swapcontext恢复该状态,则程序会沿着getcontext调用点之后继续执行,看起来好像刚从getcontext函数返回一样。 这个操作的功能和setjmp所起的作用类似,都是保存执行状态以便后续恢复执行,但需要重点指出的是:getcontext函数的返回值仅能表示本次操作是否执行正确,而不能用来区分是直接从getcontext操作返回,还是由于setcontext/swapcontex恢复状态导致的返回,这点与setjmp是不一样的。
// have never exit so we get a pristine stack for our coroutinesstaticvoidgrow_stack(intn,intnum_coros){if(n==num_coros+1){longjmp(bufs[0],1);assert(0);return;}if(!setjmp(bufs[n])){char*big_array;big_array=alloca(STACK_SIZE);asmvolatile(""::"m"(big_array));grow_stack(n+1,num_coros);}else{if(n==0){return;//came from coro_allocate; return back there}while(1){assert(spawned_fun);coro_callbackf=spawned_fun;spawned_fun=NULL;assert(n==coro_pid);f(spawned_user_state);used_pids[n]=0;coro_yield(0);}}}
/* * This function invokes the start function of the coroutine when the * coroutine is first called. If it was called from coro_new, then it sets * up the stack and initializes the saved context. */void_coro_enter(coroc){if(_save_and_resumed(c->ctxt)){/* start the coroutine; stack is empty at this point. */cvalue_return;_return.p=_cur;_cur->start(_value);/* return the exited coroutine to the exit handler */coro_call(&_on_exit,_return);}/* this code executes when _coro_enter is called from coro_new */INIT_CTXT:{/* local and new stack pointers at identical relative positions on the stack */intptr_tlocal_sp=(intptr_t)&local_sp;/* I don't know what the addition "- sizeof(void *)" is for when the stack grows downards */intptr_tnew_sp=c->stack_base+(_stack_grows_up?_frame_offset:c->stack_size-_frame_offset-sizeof(void*));/* copy local stack frame to the new stack */_coro_cpframe(local_sp,new_sp);/* reset any locals in the saved state to point to the new stack */_coro_rebase(c,local_sp,new_sp);}}
说明一下,代码中的_save_and_resume是一个宏,直接对应于 setjmp。
不难理解,其中的 if 分支是后续经由 long_jmp 恢复后的执行路径,该分支就是调用之前指定的入口函数,调用结束后马上切换回主协程处理返回值。
/* This probing code is derived from Douglas Jones' user thread library */struct_probe_data{intptr_tlow_bound;/* below probe on stack */intptr_tprobe_local;/* local to probe on stack */intptr_thigh_bound;/* above probe on stack */intptr_tprior_local;/* value of probe_local from earlier call */jmp_bufprobe_env;/* saved environment of probe */jmp_bufprobe_sameAR;/* second environment saved by same call */jmp_bufprobe_samePC;/* environment saved on previous call */jmp_buf*ref_probe;/* switches between probes */};voidboundhigh(struct_probe_data*p){intc;p->high_bound=(intptr_t)&c;}voidprobe(struct_probe_data*p){intc;p->prior_local=p->probe_local;p->probe_local=(intptr_t)&c;__LABEL_0:_setjmp(*(p->ref_probe));p->ref_probe=&p->probe_env;__LABEL_1:_setjmp(p->probe_sameAR);boundhigh(p);}voidboundlow(struct_probe_data*p){intc;p->low_bound=(intptr_t)&c;probe(p);}voidfill(struct_probe_data*p){boundlow(p);}staticvoid_infer_jmpbuf_offsets(struct_probe_data*pb){/* following line views jump buffer as array of long intptr_t */unsignedi;intptr_t*p=(intptr_t*)pb->probe_env;intptr_t*sameAR=(intptr_t*)pb->probe_sameAR;intptr_t*samePC=(intptr_t*)pb->probe_samePC;intptr_tprior_diff=pb->probe_local-pb->prior_local;intptr_tmin_frame=pb->probe_local;for(i=0;i<sizeof(jmp_buf)/sizeof(intptr_t);++i){intptr_tpi=p[i],samePCi=samePC[i];if(pi!=samePCi){if(pi!=sameAR[i]){perror("No Thread Launch\n");exit(-1);}if((pi-samePCi)==prior_diff){/* the i'th pointer field in jmp_buf needs to be save/restored */_offsets[_offsets_len++]=i;if((_stack_grows_up&&min_frame>pi)||(!_stack_grows_up&&min_frame<pi)){min_frame=pi;}}}}_frame_offset=(_stack_grows_up?pb->probe_local-min_frame:min_frame-pb->probe_local);}staticvoid_infer_direction_from(int*first_addr){intsecond;_stack_grows_up=(first_addr<&second);}staticvoid_infer_stack_direction(){intfirst;_infer_direction_from(&first);}staticvoid_probe_arch(){struct_probe_datap;p.ref_probe=&p.probe_samePC;_infer_stack_direction();/* do a probe with filler on stack */fill(&p);/* do a probe without filler */boundlow(&p);_infer_jmpbuf_offsets(&p);}
NORETURNcilk_fiber_sysdep::run(){// Only fibers created from a pool have a proc method to run and execute. CILK_ASSERT(m_start_proc);CILK_ASSERT(!this->is_allocated_from_thread());CILK_ASSERT(!this->is_resumable());// TBD: This setjmp/longjmp pair simply changes the stack pointer.// We could probably replace this code with some assembly.if(!CILK_SETJMP(m_resume_jmpbuf)){// Change stack pointer to fiber stackJMPBUF_SP(m_resume_jmpbuf)=m_stack_base;CILK_LONGJMP(m_resume_jmpbuf);}// Verify that 1) 'this' is still valid and 2) '*this' has not been// corrupted.CILK_ASSERT(magic_number==m_magic);// If the fiber that switched to me wants to be deallocated, do it now.do_post_switch_actions();// Now call the user proc on the new stackm_start_proc(this);// alloca() to force generation of frame pointer. The argument to alloca// is contrived to prevent the compiler from optimizing it away. This// code should never actually be executed.int*dummy=(int*)alloca(sizeof(int)+(std::size_t)m_start_proc&0x1);*dummy=0xface;// User proc should never return.__cilkrts_bug("Should not get here");}
#pragma textflag NOSPLITvoid·entersyscall(int32dummy){// Disable preemption because during this function g is in Gsyscall status,// but can have inconsistent g->sched, do not let GC observe it.m->locks++;// Leave SP around for GC and traceback.save(runtime·getcallerpc(&dummy),runtime·getcallersp(&dummy));g->syscallsp=g->sched.sp;g->syscallpc=g->sched.pc;g->syscallstack=g->stackbase;g->syscallguard=g->stackguard;g->status=Gsyscall;if(g->syscallsp<g->syscallguard-StackGuard||g->syscallstack<g->syscallsp){// runtime·printf("entersyscall inconsistent %p [%p,%p]\n",// g->syscallsp, g->syscallguard-StackGuard, g->syscallstack);runtime·throw("entersyscall");}
if(runtime·atomicload(&runtime·sched.sysmonwait)){// TODO: fast atomicruntime·lock(&runtime·sched);if(runtime·atomicload(&runtime·sched.sysmonwait)){runtime·atomicstore(&runtime·sched.sysmonwait,0);runtime·notewakeup(&runtime·sched.sysmonnote);}runtime·unlock(&runtime·sched);save(runtime·getcallerpc(&dummy),runtime·getcallersp(&dummy));}
m->mcache=nil;m->p->m=nil;runtime·atomicstore(&m->p->status,Psyscall);if(runtime·sched.gcwaiting){runtime·lock(&runtime·sched);if(runtime·sched.stopwait>0&&runtime·cas(&m->p->status,Psyscall,Pgcstop)){if(--runtime·sched.stopwait==0)runtime·notewakeup(&runtime·sched.stopnote);}runtime·unlock(&runtime·sched);save(runtime·getcallerpc(&dummy),runtime·getcallersp(&dummy));}// Goroutines must not split stacks in Gsyscall status (it would corrupt g->sched).// We set stackguard to StackPreempt so that first split stack check calls morestack.// Morestack detects this case and throws.g->stackguard0=StackPreempt;m->locks--;}
#pragma textflag NOSPLITvoid·entersyscallblock(int32dummy){P*p;m->locks++;// see comment in entersyscall// Leave SP around for GC and traceback.save(runtime·getcallerpc(&dummy),runtime·getcallersp(&dummy));g->syscallsp=g->sched.sp;g->syscallpc=g->sched.pc;g->syscallstack=g->stackbase;g->syscallguard=g->stackguard;g->status=Gsyscall;if(g->syscallsp<g->syscallguard-StackGuard||g->syscallstack<g->syscallsp){// runtime·printf("entersyscall inconsistent %p [%p,%p]\n",// g->syscallsp, g->syscallguard-StackGuard, g->syscallstack);runtime·throw("entersyscallblock");}
后面的部分就不太一样了,基本上就是直接将当前M与P解耦,P重新回到Pidle状态。
p=releasep();handoffp(p);if(g->isbackground)// do not consider blocked scavenger for deadlock detectionincidlelocked(1);// Resave for traceback during blocked call.save(runtime·getcallerpc(&dummy),runtime·getcallersp(&dummy));g->stackguard0=StackPreempt;// see comment in entersyscallm->locks--;}
// The goroutine g exited its system call.// Arrange for it to run on a cpu again.// This is called only from the go syscall library, not// from the low-level system calls used by the runtime.#pragma textflag NOSPLITvoidruntime·exitsyscall(void){m->locks++;// see comment in entersyscallif(g->isbackground)// do not consider blocked scavenger for deadlock detectionincidlelocked(-1);if(exitsyscallfast()){// There's a cpu for us, so we can run.m->p->syscalltick++;g->status=Grunning;// Garbage collector isn't running (since we are),// so okay to clear gcstack and gcsp.g->syscallstack=(uintptr)nil;g->syscallsp=(uintptr)nil;m->locks--;if(g->preempt){// restore the preemption request in case we've cleared it in newstackg->stackguard0=StackPreempt;}else{// otherwise restore the real stackguard, we've spoiled it in entersyscall/entersyscallblockg->stackguard0=g->stackguard;}return;}m->locks--;
// Call the scheduler.runtime·mcall(exitsyscall0);// Scheduler returned, so we're allowed to run now.// Delete the gcstack information that we left for// the garbage collector during the system call.// Must wait until now because until gosched returns// we don't know for sure that the garbage collector// is not running.g->syscallstack=(uintptr)nil;g->syscallsp=(uintptr)nil;m->p->syscalltick++;}
Grunnable: Go 语言中,包括用户入口函数main·main的执行goroutine在内的所有任务,都是通过runtime·newproc/runtime·newproc1 这两个函数创建的,前者其实就是对后者的一层封装,提供可变参数支持,Go语言的go关键字最终会被编译器映射为对runtime·newproc的调用。当runtime·newproc1完成了资源的分配及初始化后,新任务的状态会被置为Grunnable,然后被添加到当前 P 的私有任务队列中,等待调度执行。
staticvoidpark0(G*gp){gp->status=Gwaitng;gp->m=nil;m->curg=nil;if(m->waitunlockf){m->waitunlockf(m->waitlock);m->waitunlockf=nil;m->waitlock=nil;}if(m->lockedg){stoplckedm();execute(gp);// Never return}schedule();}
Go 1.2 的用户任务采用了“分段式栈”的实现方案,其栈空间是根据需要动态扩展的,每个函数调用点都会判断当前栈空间是否满足需要,如果不够就要追加分配。要确保调用runtime·mstackalloc时不会再出现栈分配的情形,就不能直接在用户空间上运行该函数。现在,我们很容易想到的就是利用runtime·mcall切换到g0上执行栈分配!因为g0的初始栈空间比较大,可以认为能够满足调用需要。
当系统中存在空闲 P & M (参见这里) 时,runtime 会首先查找本地就绪队列,若其空,则调用netpoller; netpoller通过OS提供的epoll或kqueue机制,检查已到达的IO事件,并唤醒对应的Goroutine返回给runtime,将其再度执行。(runtime/proc.c:findrunnable())
structHchan{uintgocount;// total data in quintgodataqsize;// size of the circular quint16elemsize;uint16pad;// ensures proper alignment of the buffer that follows Hchan in memoryboolclosed;Alg*elemalg;// interface for element typeuintgosends;// send indexuintgorecvx;// receive indexWaitQrecvq;// list of recv waitersWaitQsendq;// list of send waitersLock;}
structScase{SudoGsg;// must be first member (cast to Scase)Hchan*chan;// chanbyte*pc;// return pcuint16kind;uint16so;// vararg of selected boolbool*receivedp;// pointer to received bool }
structSelect{uint16tease;// total count of scare[]uint16ncase;// currently filled scare[]uint16*pollorder;// case poll orderHcan**lockorder;// channel lock orderScasescase[1];// one per case (in order of appearance )}
// Go 语言版代码片段......select{casech01<-x:foo()/* do something of case #1 */casey<-ch02:bar()/* do something of case #2 */default:def()/* do something for default */}......
// 对应C代码片段示意Select*sel=newselect(3);......if(selectsend(sel,ch01,&x)){foo();// do something of case #1 }elseif(selectrecv(sel,ch02,&y)){bar();// do something of case #2}elseif(selectdefault(sel)){def();// do something of default}else{void(*rpc)(void);rpc=selectgo(&sel);(*rpc)();//goto rpc;}......
M 代表OS的线程,P代表当前系统的处理器数(一般由GOMAXPROCS 环境变量指定),G代表Go语言的用户级线程,也就是通常所说的 Goroutine。
新的调度器由1:1 进化到 M:N 的关键在于新加了 P 这个抽象结构。在多核平台上,P的数量一般对应处理器核心或硬件线程的数量,调度器需要保证所有的P都有G执行,以保证并行度。
M 必须与P绑定方能执行任务G,如下图所示:
在旧版 Go 调度器实现中,由于缺少P, 一旦运行 G (goroutine)的 M (OS线程)陷入阻塞状态(如调用某个阻塞的系统调用)时,M 对应的 OS 线程就会被操作系统调度出去,从而导致系统中其他就绪的G也不能执行;而添加了P这个逻辑结构后,一旦发生上述情况,阻塞的 M 将被与其对应的 P 剥离,RUNTIME会再分配一个 M 并将其与已经剥离出来的 P 绑定,运行其他就绪的G。这个过程如下图所示:
在实际实现中,考虑到代码执行的局部性因素,一般会倾向于推迟 M 与 P 剥离的时机。具体来说,RUNTIME中存在一个驻留的线程sysmon,用来监控所有进入Syscall 的 M,只有当 Syscall 执行时间超出某个阈值时,才会将 M 与 P 分离。
另外一个保证系统运行稳定性的方式是负载均衡机制,在Go中,用了 “任务窃取” 的方法。
首先介绍一下 Go 的任务队列,每个 P 都有一个私有的任务队列 (实现上是一个用数组表示的循环链表)以及一个公共队列(单链表表示),私有队列的功能是为了减轻公共队列的竞争开销。
当一个 P 的私有任务队列为空时,它会从全局队列中寻找就绪态的 G 执行;如果全局队列也为空,则会随机选择窃取其他 P 私有执行队列中的任务G,从而保证所有线程尽可能以最大负载工作。其示意图如下:
由于 P 的私有队列采用了数组结构,很容易计算出队列中间的位置,因此“窃取者” 采用了与 “被窃取者” 均分任务的方法,以尽可能达到负载均衡。
无论从公共队列取任务还是进行“窃取”,都会引起一定的竞争开销,因此 RUNTIME 会倾向于将新建任务或新转变为就绪态的任务添加到当前执行 P 的私有队列中。
仅当执行的任务调用 yield 机制让出处理器或进入了一个长时间执行的系统调用时,该任务才会被添加到公共队列中。
在BEAM系统中,除了process之外,还存在三种其他的调度单位:端口(ports)、链入式驱动(linkd-in drivers)和系统级活动(system level activities); 这三种特殊的任务形式主要用来进行IO操作和执行其他语言的代码等功能,其部分功能很像 Go 中对执行阻塞 Syscall 任务的“剥离”机制,具体实现方法这里暂时不讨论。我们主要将精力集中在 Erlang 的 process 的调度机制上。
与 Go 不同,Erlang 的调度器是一个轮转而非协作式的调度器,每个进程创建时会被分配一个称为“reduction”的值,是一个计算量的度量(基本上等同于函数调用的次数),类似 OS 的时间片。进程每执行一定量的计算后,reduction值就会累计,一旦达到阈值,该进程就会给切换出去。这种调度方式在 Erlang 中被称为 “reduction-counting”。
关于调度器,其实还有很多内容,如 Go 和 Erlang 都支持“垃圾回收”,而GC在两种语言中对调度的影响如何等;同时,讲Go的 M:N 调度时说到 M 一旦陷入Syscall 阻塞后,系统会创建一个新的M(OS 线程)来接管 P 及其任务队列,那么当设计一个高度并发的IO系统时(如 Web 服务器),频繁的Syscall会导致大量 OS 线程创建,从而影响性能。Go如何解决这个问题呢?