CoroC 语言开源了

Published on:
Tags: CoroC, 协程

CoroC 是什么?

CoroC 是 “Coroutine-C” 的简称,顾名思义,就是在 C 语言的基础上,增加了对 “协程” 的原生支持,从而简化了在纯 C 下开发轻量并发系统的难度。

CoroC 实现了哪些功能?

在设计上,CoroC 借鉴了大量 Go 语言的元素,包括 channelselect, netpolling 等,当然,为了很好的实现并发,我们也在 C 中加入了并发的 “智能指针” 。 同时增加了一些新的探索性功能,比如 “Actor 模型”、“Fork-Join 模型”、“优先级调度”、“分时抢占” 等,当然其中一些还处于实验阶段。

在实现上,CoroC 分成两个主要部分,运行时库编和译器前端。

  • 运行时库是一个用纯 C 编写的静态库,可以直接用在 C 语言项目中
  • 编译器前端是一个基于 clang-3.5 开发的 “源-源变换” 工具,通过它可以将一个包含扩展语法的 CoroC 程序转化为一个含 C 库调用的纯 C 程序

如何获取 CoroC 代码和文档?

目前,我已将 CoroC 运行时和编译器的代码分别发布在 bitbucket.org 上,

编写 CoroC 的目的

编写 CoroC 的动机,一方面是想通过实际开发,理解 Go 语言并发设计的原理,另一方面是学习基于 clang / llvm 的二次开发方法。

联系我们

当然,目前的 CoroC 还处在一个玩具级别的状态,其中不免会有很多错误,希望大家多多指教!

如果您对于 CoroC 有什么建议,可以直接在 bitbucket 上给我发 pull request,当然也可以发邮件到我的邮箱 amalcaowei@gmail.com.

构建C协程之ucontext篇

Published on:

原理简介

所谓 “ucontext” 机制是 GNU C 库提供的一组用于创建、保存、切换用户态执行“上下文”(context)的API,可以看作是 “setjmp/long_jmp” 的“升级版”,主要包括以下四个函数:

void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);
int swapcontext(ucontext_t *oucp, ucontext_t *ucp);
int getcontext(ucontext_t *ucp);
int setcontext(const ucontext_t *ucp);

结构体 ucontext_t 和上述4个函数声明一起定义在系统头文件<ucontext.h> 中,该类型的具体实现与体系结构相关,但规范要求其至少要包含以下字段:

typedef struct ucontext {
  struct ucontext *uc_link;
    sigset_t       uc_sigmask;
    stack_t         uc_stack;
    mcontext_t uc_mcontext;
    ...
} ucontext_t;

其中 sigset_tstack_t 定义在标准头文件 <signal.h> 中, uc_link 字段保存当前context结束后继续执行的context记录, uc_sigmask 记录该context运行阶段需要屏蔽的信号, uc_stack 是该context运行的栈信息, 最后一个字段uc_mcontext 则保存具体的程序执行上下文——如PC值、堆栈指针、寄存器值等信息——其实现方式依赖于底层运行的系统架构,是平台、硬件相关的。

下面具体来看每个函数的功能:

  • int makecontext(ucontext_t *ucp, void (*func)(), int argc, ...) 该函数用以初始化一个ucontext_t类型的结构,也就是我们所说的用户执行上下文。函数指针func指明了该context的入口函数,argc指明入口参数个数,该值是可变的,但每个参数类型都是int型,这些参数紧随argc传入。 另外,在调用makecontext之前,一般还需要显式的指明其初始栈信息(栈指针SP及栈大小)和运行时的信号屏蔽掩码(signal mask)。 同时也可以指定uc_link字段,这样在func函数返回后,就会切换到uc_link指向的context继续执行。

  • int setcontext(const ucontext_t *ucp) 该函数用来将当前程序执行线索切换到参数ucp所指向的上下文状态,在执行正确的情况下,该函数直接切入到新的执行状态,不再会返回。比如我们用上面介绍的makecontext初始化了一个新的上下文,并将入口指向某函数entry(),那么setcontext成功后就会马上运行entry()函数。

  • int getcontext(ucontext_t *ucp) 该函数用来将当前执行状态上下文保存到一个ucontext_t结构中,若后续调用setcontext或swapcontext恢复该状态,则程序会沿着getcontext调用点之后继续执行,看起来好像刚从getcontext函数返回一样。 这个操作的功能和setjmp所起的作用类似,都是保存执行状态以便后续恢复执行,但需要重点指出的是:getcontext函数的返回值仅能表示本次操作是否执行正确,而不能用来区分是直接从getcontext操作返回,还是由于setcontext/swapcontex恢复状态导致的返回,这点与setjmp是不一样的。

  • int swapcontext(ucontext_t *oucp, ucontext_t *ucp) 理论上,有了上面的3个函数,就可以满足需要了(后面讲的libgo就只用了这3个函数,而实际只需setcontext/getcontext就足矣了),但由于getcontext不能区分返回状态,因此编写上下文切换的代码时就需要保存额外的信息来进行判断,显得比较麻烦。 为了简化切换操作的实现,ucontext 机制里提供了swapcontext这个函数,用来“原子”地完成旧状态的保存和切换到新状态的工作(当然,这并非真正的原子操作,在多线程情况下也会引入一些调度方面的问题,后面会详细介绍)。 为了进一步理解swapcontext这个函数的设计目的,可以尝试利用getcontext/setcontext完成同样的功能,你需要怎样编写代码? 同时,也不妨思考一下下面这段代码的执行结果(该例出自维基百科Setcontext 条目):

#include <stdio.h>
#include <ucontext.h>
#include <unistd.h>

int main(int argc, char *argv[]) {
  ucontext_t context;

  getcontext(&context);
  puts("Hello world");
  sleep(1);
  setcontext(&context);
  return 0;
}

小结

可以看出,用ucontext机制实现一个“协程”系统并不困难。 实际上,每个运行上下文(ucontext_t)就直接对应于“协程”概念,对于协程的“创建”(Create)、“启动” (Spawn)、“挂起” (Suspend)、“切换” (Swap)等操作,很容易通过上面的4个API及其组合加以实现,需要的工作仅在于设计一组数据结构保存暂不运行的context结构,提供一些调度的策略即可。 这方面的开源实现有很多,其中最著名的就是Go的前身,libtask库。

对于将“协程”映射到多OS线程执行的情形,就要稍稍复杂一些,但主要的问题是集中在共享任务队列的实现、调度线程间的互斥等,至于“协程”的映射问题,与单线程情况没有太大的区别。 对于这方面的开源借鉴,当然首推Go的运行时 —— 但由于标准Go实现没有使用GNU C库,而是自行设计了包括C编译器在内的整套工具链,因而就没有直接采用ucontext机制(尽管其内部实现机制与ucontext原理类似)。

以后有机会,会再分析一下GCC Go语言前端的运行时实现——libgo。 libgo的调度器部分基本用C开发并由GCC编译,“goroutine”(Go语言中相对于“协程”的概念)也直接以“ucontext”机制实现,其代码对于分析C语言下“协程”系统实现方法而言,具有较高的参考价值。

构建C协程之setjmp/long_jmp篇

Published on:

原理简介

在标准C中的头文件<setjmp.h>中定义了一组函数 setjmp / long_jmp 用来实现“非本地跳转”的功能,利用 setjmp 可以保存当前执行线索状态,稍后通过 long_jmp 函数可以实现状态的恢复,并且可以跨多层函数调用栈进行跳转。具体接口定义如下:

  • int setjmp(jmp_buf env) 该函数主要用来保存当前执行状态,作为后续跳转的目标。调用时,当前状态会被存放在env指向的结构中,env将被 long_jmp 操作作为参数,以返回调用点 — 跳转的结果看起来就好像刚从setjmp返回一样。 直接调用setjmp保存状态后,返回值是0;而从long_jmp操作返回时,返回值是非0的 — 通过判断setjmp的返回值,就可以判断当前执行状态。

  • void long_jmp(jmp_buf env, int value) 该函数用来恢复env中保存的执行状态,另一参数value用来传递返回值给跳转目标 — 如果value值为0,则跳转后返回setjmp处的值为1;否则,返回setjmp处的值为value

setjmp / long_jmp 这一机制的设计初衷是为了方便程序从较深的调用栈中直接返回到之前调用点 — 这非常有利于实现高效的错误处理机制,比如 C++ 中的异常机制就是如此。假设我们有这样一条调用路径:

fun0() -> fun1() -> fun2() -> ... -> funN()

假设在 funN 函数中发生了一个错误,需要返回 fun0 函数对错误进行处理,按照惯常的方法需要层层返回错误,效率较低。利用 setjmp / long_jmp 机制,就可以在 fun0 函数调用 fun1 前用 setjmp 保存一个状态,然后一旦调用路径中的某个环节出现错误,就使用 long_jmp 跳回 fun0 函数,通过 setjmp 的返回值就可以判断错误类型并做后续处理,非常简便,与 C++ / Java 等语言中的 try {...} catch (...) {...} 结构很类似。

这里需要注意的是,long_jmp 返回后的执行依赖于之前 setjmp 执行时的栈环境,在上面的例子中,由于 funN 执行跳转时,fun0 的执行栈没有释放,因此返回后继续执行没有任何问题。 但假如在 fun0 返回后(更精确的说是执行 setjmp 的作用域退出后),再通过 long_jmp 跳转回 fun0, 由于原先栈帧(stack frame)已被释放,其对应内存空间可能别做他用,因而这时程序的执行就进入了不可知状态,很可能因起错误,这点需要特别注意!

setjmp 会将状态信息保存到一个平台相关的结构 jmp_buf 中,这个结构对于程序员来说一般是透明的,也就是说我们并不知道 jmp_buf 的具体字段及其含义,也就不能做诸如栈空间切换的操作 — 这对于实现”协程”系统来说,就比较麻烦了。因为”协程”间并发执行的性质要求系统对不同”协程”的栈空间进行隔离。

尽管如此,我还是在网上找到了很多利用 setjmp / long_jmp 实现的”协程”系统,现挑其中几个比较有代表性的例子介绍一下。

案例一: setjmp-longjmp-ucontext-snippets

这是一个小型的 “N:1” 的协程系统,代码托管在github。利用 setjmp / long_jmp 实现协程,同时还提供了一个简单的 Channel 实现,以供协程间通信。

实现分析

我们把精力主要放在协程的实现方式上,看看它如何解决“栈切换”的问题。

该库提供了以下几个协程操作的API:

  • void coro_allocate (int num_cores) 在程序开始时调用,静态预分配 num_cores 个协程空间,程序中最大运行的协程数不能超过 num_cores 个。

  • int coro_spawn(coro_callback f, void *user_state) 启动一个协程,入口函数由第一个参数 f 指定, user_state 是 f 的参数。

  • int coro_runnable(int pid) 将编号为 pid 的协程设为可执行态。

  • void coro_yield(int pid) 让出处理器,并切换到以 pid 为编号的其他协程继续执行。

下面来具体看看其中的奥秘,我们将重点集中在 coro_allocate 这个函数,它为每个协程分配一个 jmp_buf 结构和一个指示该协程状态的int型数据。其中0号协程对应于“调度器”或者说“运行时环境”。 最后调用 grow_stack(0, num_cores) 完成后续工作,应该说所有的玄机都藏在grow_stack这个函数中,我们看一下它的代码:

// have never exit so we get a pristine stack for our coroutines
static void grow_stack(int n, int num_coros) {
  if (n == num_coros + 1) {
    longjmp(bufs[0],1);
    assert(0);
    return;
  }

  if (!setjmp(bufs[n])) {
    char *big_array;
    big_array = alloca(STACK_SIZE);
    asm volatile("" :: "m" (big_array));

    grow_stack(n + 1, num_coros);
  } else {
    if (n == 0) {
      return; //came from coro_allocate; return back there
    }
    while(1) {
      assert(spawned_fun);
      coro_callback f = spawned_fun;
      spawned_fun = NULL;

      assert(n == coro_pid);
      f(spawned_user_state);
      used_pids[n] = 0;
      coro_yield(0);
    }
  }
}

这是一个递归函数,深度是 num_cores + 1。调用顺序从 0 到 num_cores, 每一层调用都利用 setjmp 将状态保存到相应编号的 jmp_buf 结构中,然后通过alloca在当前栈帧上分配一个较大的空间,作为该编号协程的运行栈。当参数n超过num_coros时,程序逐层返回到调用者。

之后调用 coro_spawn 启动一个协程时,先将参数 f 和 user_state 分别保存到全局变量 spawned_fun 和 spawned_user_state 中,然后找到一个空闲的协程编号,恢复该编号在 grow_stack 时保存的状态 —— 也就是说回到了 grow_stack 的 else 分支,调用全局的函数指针 spawned_fun 指向的入口函数 (由于系统是运行在单线程环境的,因此使用全局变量不会出现问题,但笔者个人认为为每个协程设计一个结构体来保存这些信息更好一些)。

刚才介绍时我们特别强调了跳转到已经释放的“栈帧”可能引发错误,但这里却偏偏这样做,道理是什么呢? 问题的关键就在于grow_stack时用alloca预留栈空间的操作,这个操作本质上将原来主程序的栈空间划分成N份,然后假设每个协程运行时使用的栈都不会超过为它们预留的那段空间。而编号为0 的协程恰好对应于运行时环境,因此运行时环境的栈就位于最低端,如果后面的操作使用的栈空间不“越界”,那就不会影响1号协程的执行;其他协程之间也是同理。

小结

案例一利用了程序栈帧顺序增长的特点,实现非常巧妙。但缺点在于不但需要提前指定系统支持的最大协程数,而且所有协程的栈都必须在原始程序栈空间的基础上分配,栈的大小及支持的最大协程数量(也就是可划分的最大栈数量)都因此受到了限制。

案例二:libconcurrency

这个项目托管在google code,也是采用 setjmp / long_jmp 实现的轻量协程系统,但与案例一的不同之处在于,libconcurrency 使用了“栈拷贝”技术 — 每个协程的运行栈是通过malloc在堆空间动态分配的,然后再将原始的栈帧数据复制到新的栈上。正因如此,其系统的可扩展性比较好,协程可以动态创建,且理论上没有上限。

实现分析

还是照例先来分析一下协程操作的API,我们最关心以下几个:

  • coro coro_init() 用来在系统启动时对协程环境进行初始化,这里隐藏着系统最关键的部分,稍后详细分析。

  • coro coro_new(_entry fn) 新分配一个协程并指定其入口函数 fn,这个函数非常重要,我们将首先分析。

  • cvalue coro_call(coro target, cvalue value) 启动协程 target,并传入指定参数 value,同时返回协程执行后的结果。

先分析 coro_new 的实现。 该函数本身不难理解:先用malloc分配 struct _coro 对象 c 及其栈空间,然后初始化该对象,指定入口函数、栈指针和栈大小,最后调用 _coro_enter (c) 完成后续操作 —— _coro_enter函数才是我们重点分析的对象,其代码如下:

/*
 * This function invokes the start function of the coroutine when the
 * coroutine is first called. If it was called from coro_new, then it sets
 * up the stack and initializes the saved context.
 */
void _coro_enter(coro c)
{
  if (_save_and_resumed(c->ctxt))
  {    /* start the coroutine; stack is empty at this point. */
      cvalue _return;
      _return.p = _cur;
      _cur->start(_value);
      /* return the exited coroutine to the exit handler */
      coro_call(&_on_exit, _return);
  }
  /* this code executes when _coro_enter is called from coro_new */
INIT_CTXT:
  {
      /* local and new stack pointers at identical relative positions on the stack */
      intptr_t local_sp = (intptr_t)&local_sp;
      /* I don't know what the addition "- sizeof(void *)" is for when
        the stack grows downards */
      intptr_t new_sp = c->stack_base +
          (_stack_grows_up
              ? _frame_offset
              : c->stack_size - _frame_offset - sizeof(void *));

      /* copy local stack frame to the new stack */
      _coro_cpframe(local_sp, new_sp);

      /* reset any locals in the saved state to point to the new stack */
      _coro_rebase(c, local_sp, new_sp);
  }
}

说明一下,代码中的_save_and_resume是一个宏,直接对应于 setjmp。

不难理解,其中的 if 分支是后续经由 long_jmp 恢复后的执行路径,该分支就是调用之前指定的入口函数,调用结束后马上切换回主协程处理返回值。

我们重点来看 INIT_CTXT 标号后的语句块,这部分用来做协程执行状态的初始化,比较难理解,该过程大体分3步:

  1. 利用栈变量 local_sp 的地址值来确定当前栈帧的位置,然后通过之前 malloc 的指针及大小确定 new_sp 的值,计算时需要注意不同的体系结构下,栈的生长方向可能不同,需要区别对待。

  2. 调用 _coro_cpframe 函数将当前栈帧内容复制到新的栈空间上,它本质上就是一个 memcpy,不过和前面一样,需要注意栈的方向。另外,当前栈使用大小,也就是需要复制的栈长度由全局变量 _frame_offset 指定,该变量在 coro_init 时确定,后面再介绍。

  3. 最后,根据新旧SP值,调用 _coro_rebase 函数对之前保存在 jmp_buf 中的状态信息进行修正,使得下次跳回时能落到新的栈帧上执行。 深入到 _coro_rebase 实现中,会发现这个函数首先计算新旧SP的差值,然后将这个差值加回到 jmp_buf (被看作是一个intptr_t类型的数组)的部分元素上,视为修正——具体哪个位置的值需要被修正保存在全局的数组 _offset[] 中,它的值同样是在 coro_init 函数执行阶段被确定的。

先来解释一下这种“线性修正”之所以可行的原因:

一般情况下,我们可以认为 jmp_buf 是一个数组,数组元素的位宽与具体硬件平台相关,比如IA32下是32bit (int),X64下是64bit (long long)。 其中保存的主要信息就是运行 setjmp 点的 PC、SP(栈指针)以及BP(栈底位置)等。 这些信息大致上可以分成两部分:一类与当前的运行栈地址相关,比如SP、BP;另一类与之无关,比如PC。 基于C语言栈“线性生长”的特点,通过弥补新旧栈地址的线性差值,就可以达到切换栈的效果。

然而,具体实现中 jmp_buf 的每个元素位置对应什么信息则是平台相关的,作为一个以可移植性为目的的系统不应该对其实现做任何假设,因此只能在程序启动阶段以某种方式动态计算获取。

这个计算的过程就隐藏在 coro_init 中,具体通过 _probe_arch 函数实现,相关代码如下:

/* This probing code is derived from Douglas Jones' user thread library */
struct _probe_data {
  intptr_t low_bound;        /* below probe on stack */
  intptr_t probe_local;  /* local to probe on stack */
  intptr_t high_bound;   /* above probe on stack */
  intptr_t prior_local;  /* value of probe_local from earlier call */

  jmp_buf probe_env; /* saved environment of probe */
  jmp_buf probe_sameAR;  /* second environment saved by same call */
  jmp_buf probe_samePC;  /* environment saved on previous call */

  jmp_buf * ref_probe;    /* switches between probes */
};

void boundhigh(struct _probe_data *p)
{
  int c;
  p->high_bound = (intptr_t)&c;
}

void probe(struct _probe_data *p)
{
  int c;
  p->prior_local = p->probe_local;
  p->probe_local = (intptr_t)&c;
__LABEL_0:
  _setjmp( *(p->ref_probe) );
  p->ref_probe = &p->probe_env;
__LABEL_1:
    _setjmp( p->probe_sameAR );
  boundhigh(p);
}

void boundlow(struct _probe_data *p)
{
  int c;
  p->low_bound = (intptr_t)&c;
  probe(p);
}

void fill(struct _probe_data *p)
{
  boundlow(p);
}

static void _infer_jmpbuf_offsets(struct _probe_data *pb)
{
  /* following line views jump buffer as array of long intptr_t */
  unsigned i;
  intptr_t * p = (intptr_t *)pb->probe_env;
  intptr_t * sameAR = (intptr_t *)pb->probe_sameAR;
  intptr_t * samePC = (intptr_t *)pb->probe_samePC;
  intptr_t prior_diff = pb->probe_local - pb->prior_local;
  intptr_t min_frame = pb->probe_local;

  for (i = 0; i < sizeof(jmp_buf) / sizeof(intptr_t); ++i) {
      intptr_t pi = p[i], samePCi = samePC[i];
      if (pi != samePCi) {
          if (pi != sameAR[i]) {
              perror("No Thread Launch\n" );
              exit(-1);
          }
          if ((pi - samePCi) == prior_diff) {
              /* the i'th pointer field in jmp_buf needs to be save/restored */
              _offsets[_offsets_len++] = i;
              if ((_stack_grows_up && min_frame > pi) || (!_stack_grows_up && min_frame < pi)) {
                  min_frame = pi;
              }
          }
      }
  }
  
  _frame_offset = (_stack_grows_up
      ? pb->probe_local - min_frame
      : min_frame - pb->probe_local);
}

static void _infer_direction_from(int *first_addr)
{
  int second;
  _stack_grows_up = (first_addr < &second);
}

static void _infer_stack_direction()
{
  int first;
  _infer_direction_from(&first);
}

static void _probe_arch()
{
  struct _probe_data p;
  p.ref_probe = &p.probe_samePC;

  _infer_stack_direction();

  /* do a probe with filler on stack */
  fill(&p);
  /* do a probe without filler */
  boundlow(&p);
  _infer_jmpbuf_offsets(&p);
}
  • 首先需要分析栈的生长方向,这个实现很简单,只要比较调用者和被调用者栈变量的地址大小就可以了;

  • 下一步通过调用fill(&p) -> boundlow(p) -> probe(p),将__LABLE_0的状态记录到 _probe_data 的 probe_samePC字段,而将__LABEL_1的状态记录到probe_sameAR字段;probe_local字段则记录执行 probe 函数时的栈顶 —— SP值。

  • 然后,通过调用boundlow(&p) -> probe(p),将__LABLE_0的状态记录到 _probe_data 的probe_env字段,而__LABEL_1的状态仍旧记录到probe_sameAR字段;将上一次记录的执行 probe 函数时记录的SP值保存到prior_local字段,同时更新probe_local字段记录本次执行probe函数的SP值。

  • 最后,调用 _infer_jmpbuf_offsets 函数进行最终计算。 这时,probe_samePC保存了第一次__LABEL_0处的状态,probe_env保存了第二次__LABEL_0处的状态,二者的PC属性相同,栈属性存在线性偏差(由于前次多了一层调用);而 probe_envprobe_sameAR的栈属性相同,PC属性不同(调用setjmp的位置不同)。 通过这三组 jmp_buf 数据的关系,以及之前记录的两次调用过程的SP值之间的偏差,就能求得 jmp_buf 各项的属性是栈相关的还是栈无关的,将所有栈相关量在 jmp_buf 中的索引位置记录在 _offset 数组中即可。

  • 同时,在遍历 jmp_buf 量的时候,还要找到其中与SP相差最大的值记录到 min_frame 变量中,这可能就是BP的值,用SP和BP相减,就得到了当前调用栈帧的大小,这个值最终被保存在全局变量 _frame_offset中,作为后来进行“栈拷贝”时的重要参数。

小结

本例利用了 setjmp 操作的底层实现原理,特别是 jmp_buf 结构的实现方式,设计了一个“可移植”的方案。 这个方案虽然可行,但是仍在一定程度上对 setjmp / long_jmp 的实现做了一些假设。 项目开发者并没有给出业已经过测试、可正确运行的平台; 笔者也没有在X86之外的系统上做过实验,因此对这种实现的普适性无法给出保证。 但总的来说,案例二的实现的技巧还是颇值得玩味的。

同样,libconcurrency库也仅支持 “N:1” 的映射方式,底层实现中没有用到多线程 —— 尽管从代码实现来看,似乎作者希望提供某种“线程安全”支持,但究竟是否如此,作者也没有提供任何用例及说明。

案例三:Cilkplus 协程实现

综合上面两个案例,我们不难发现,利用 setjmp / long_jmp 机制实现协程系统虽然在理论上具有可移植性好、性能好的优点,但用于实践中,由于 setjmp / long_jmp 实现的不透明性,导致很难构建出一个符合产品级需求的协程框架 — 即使上面介绍的那些具有一定想象力的实现,也几乎难以直接应用于实际产品。

产生上述问题的原因,主要在于不同体系结构的实现存在很大差异。 那么能否退而求其次,放弃“可移植性”这个优势,集中于几种常见的架构,专注于性能方面的提高呢? 答案是肯定的 — 接下来介绍的 Cilkplus 语言 Runtime 库,就是 Intel 基于自家 X86 / X86_64 平台的特点,实现的一个高效的“协程”框架。

定制的 setjmp / long_jmp

Cilkplus 运行时环境所使用的 setjmp / long_jmp 并非 C 库中提供的版本,而是编译器内嵌版本_builtin_setjmp / _builtin_longjmp,对应的 jmp_buf 结构对于开发者 —— 至少是系统开发者是可见的 —— 它本身是一个 void* 型数组,其中存放着运行状态的PC、SP及BP值,而且明确知道每个值在数组中的位置 —— 这样就可以在 setjmp 后,直接对保存的状态值进行“修正”。

实现”协程”栈切换的原理

前面介绍了为每个“协程”分配独立运行栈对于一个“协程”运行时系统的重要性,也了解了 setjmp / long_jmp 机制下实现栈分配,特别是高可用性的栈分配机制存在一定困难。现在来看看 Cilkplus 如何解决这个问题。

由于 Cilkplus 仅针对 Intel X86 / X64 平台,因此在介绍其“协程”栈切换原理之前,有必要先回顾一下 X86 / X64 平台的调用栈规则。以32位的 X86 为例介绍:

  • 调用者规则: 调用者(Caller)首先需要将参数按照从右到左的顺序依次压栈,然后调用相应的函数,返回后再将参数栈释放。如下代码所示:
push [var] ; Push last parameter first
push 216   ; Push the second parameter
push eax   ; Push first parameter last

call _myFunc ; Call the function (assume C naming)

add esp, 12
  • 被调用者规则: 被调用者(Callee)首先将当前BP(也就是Caller的BP)压栈,然后将当前SP(Caller的SP) 赋值给BP。之后如果遇到分配新的栈变量、创建调用参数或者用 alloca 动态分配空间时,则将 SP 减去新分配空间大小,并用“BP +/– 偏移量”的方式访问这些局部变量。访问当前栈帧的局部变量只需要BP即可,这点非常重要!
   push ebp
   mov  ebp, esp
  • 栈帧结构图如下所示:

了解了这些基本知识后,就可以分析 Cilkplus 的代码了,其中最关键的部分如下所示:

NORETURN cilk_fiber_sysdep::run()
{
    // Only fibers created from a pool have a proc method to run and execute. 
    CILK_ASSERT(m_start_proc);
    CILK_ASSERT(!this->is_allocated_from_thread());
    CILK_ASSERT(!this->is_resumable());

    // TBD: This setjmp/longjmp pair simply changes the stack pointer.
    // We could probably replace this code with some assembly.
    if (! CILK_SETJMP(m_resume_jmpbuf))
    {
        // Change stack pointer to fiber stack
        JMPBUF_SP(m_resume_jmpbuf) = m_stack_base;
        CILK_LONGJMP(m_resume_jmpbuf);
    }

    // Verify that 1) 'this' is still valid and 2) '*this' has not been
    // corrupted.
    CILK_ASSERT(magic_number == m_magic);

    // If the fiber that switched to me wants to be deallocated, do it now.
    do_post_switch_actions();

    // Now call the user proc on the new stack
    m_start_proc(this);

    // alloca() to force generation of frame pointer.  The argument to alloca
    // is contrived to prevent the compiler from optimizing it away.  This
    // code should never actually be executed.
    int* dummy = (int*) alloca(sizeof(int) + (std::size_t) m_start_proc & 0x1);
    *dummy = 0xface;

    // User proc should never return.
    __cilkrts_bug("Should not get here");
}

上面是启动一个新“协程”的过程,在该调用之前,新的栈空间已经通过malloc分配好了,指针保存在变量m_stack_base中。下面的任务就是如何将运行栈切换到这个新的空间。

  • 首先来看 if 分支,也就是直接从 CILK_SETJMP 返回的情形:通过 JMPBUF_SP 这个宏,可以访问到刚刚保存的SP值,将其修改并指向新的地址 —— 注意这里只对SP进行了修改,没有修改BP值,也没有进行”栈拷贝”。

  • 然后,通过 CILK_LONGJMP 跳转到修改后的状态 —— 执行线索切换到 if 分支后的代码 —— 注意,这时候 SP 已经切换成新值了,而 BP 还是原来的值 —— 根据之前的介绍,当前栈上的局部变量,包括函数的调用参数都通过“BP +/– 偏移量”进行访问,因此新的协程依然能访问到原来的栈变量、参数(注意:虽然run()方法本身没有参数,但由于是C++类方法,所以隐含了参数 this 指针,同时该对象的所有成员变量也依赖this指针才能访问到)。

  • 新协程继续执行,调用 m_start_proc(this) 进入新的入口函数 —— 这时由于使用新的 SP,调用参数及新函数的执行栈就都在新的栈空间上分配了,也就完成了“栈切换”。

Fast Clone / Slow Clone

如果说上面介绍的是 Cilkplus Runtime 协程机制的“普通”玩法,那么所谓的 “Fast Clone” / “Slow Clone” 就堪称 Cilkplus 协程机制的“文艺”玩法了。 该机制与Cilkplus的“Work Stealing”调度器一起,作为系统实现的精华,被各种介绍Cilkplus的文章反复提及,可以说是这门新语言的核心技术创新之一。

要分析 “Fast Clone” / “Slow Clone”,首先要了解一下 Cilkplus 的并行开发模型。 在Cilkplus中,主要提供了 cilk_spawncilk_sync 这两个关键字来处理并行任务的创建、同步执行线索的功能。

cilk_spawn 关键字的语法主要有两种形式:

int a = cilk_spawn foo(123); // 变量a记录foo(123)的返回值
cilk_spawn bar(); // bar() 没有返回值或忽略其返回值

cilk_spawn 关键字后面的函数调用会以单独的线索(“协程”)与原先的主线程并行执行,主线程需要在后面必要的位置插入 cilk_sync 语句进行显式同步操作: 比如上面例子中,主线程需要访问变量 a 的值时。 如果程序员没有显式提供cilk_sync 语句,编译器会在适当的位置插入同步语句。

对于一个程序,在多个线程上并行执行不同的部分是否能带来真正的性能提升,需要综合考量并行任务的粒度及其创建、同步等操作引入的额外开销,同时也要考量当前系统的负载情况 —— 这需要在运行过程中动态的判断。 如果一味将所有 spawn 的任务都分配新的线程执行,可能带来较大的开销而得不偿失。 为此,Cilkplus提出了一种动态优化的方案,即所谓的 “Fast Clone” / “Slow Clone”。

该方案的核心思想是,在 Spawn 一个新的执行线索后,并不马上为其创建新的执行线程,而是仅仅创建一个任务的执行状态(也就是“协程”); 系统后台有若干个执行线程,会根据负载情况获取任务并执行。

具体来说,Cilkplus采用了一种“Work First”的执行策略,在Spawn时刻,首先将当前状态(通过setjmp机制)保存起来,然后当前线程直接去执行Spawn的任务,而原先“主线程”的执行状态就挂到当前工作线程的任务队列中:

  • 若此时有其他空闲的工作线程,则“窃取”挂起的“主线程”任务,(通过long_jmp机制)恢复其执行,这样就实现了真正的并行 —— 这就是所谓的“Slow Clone”(“慢版本”);并行的任务通过“同步”操作合并,类似传统的“fork-join”模型,新任务(即原来的“主线程”)也就在同步点被释放了。

  • 相反,如果挂起的任务没有被其他工作线程“窃取”,则当前线程执行完 Spawn 的任务后,会恢复之前挂起的任务,直接返回了“主线程”继续后续操作。 由于 Spawn 任务本身就是在原来执行线程上运行的,因此可以跳过同步操作,看起来好像与不使用 cilk_spawn 关键字时的效果一样,是一个串行的版本。这就是所谓的 “Fast Clone”(即“快版本”)。

下面我们通过一个小例子来具体看一下 Cilkplus 编译器所作的“翻译”工作。

以下是原始的 cilkplus 程序:

int fib(int n) {
  if (n < 2)
    return n;
  int a = cilk_spawn fib(n-1);
  int b = fib(n-2);
  cilk_sync;
  return a + b;
}

下面是经编译器处理生成的程序,为了方便起见,这里用了伪代码表示:

struct struct_anon {
    int n;
};
static void __cilk_spawn_helper_fib(struct_anon *agg, int *ret) {
  ... ...
    __cilk_helper_prologue();
    *ret = fib(agg->n);
    __cilk_helper_epilogue();
  ... ...
}
int fib (int n) {
  ... ...
    __cilkrts_stack_frame sf;
    __cilk_parent_prologue(&sf);
  ... ...
cilk.spawn.savestate:
    int a;
    if (! SETJMP(&sf.ctx))
        goto cilk.spawn.helpercall;
    else
        goto cilk.spawn.continuation;
cilk.spawn.helpercall:
    struct struct_anon agg = { n - 1 };
    __cilk_spawn_helper_fib(&agg, &a);
cilk.spawn.continuation:
    int b = fib(n-2);
cilk.sync.savestate:
    if (sf.flags & CILK_FRAME_UNSYNCHED) {
        if (! SETJMP(sf.ctx))
            goto cilk.sync.call;
    }
cilk.sync.exit:
    return a + b;
cilk.sync.call:
    __cilkrts_sync(&sf);
    goto cilk.sync.exit;
}
  • 首先,利用 setjmp 将当前“主线程”执行状态保存,然后 if 分支直接进入 _cilk_spawn_helper_fib 函数执行。

  • 在_cilk_spawn_helper_fib 函数中,先调用 _cilk_helper_prologue,执行工作包括将刚才保存的任务放到任务队列中;然后执行真正的计算任务 fib;结束计算后,执行 _cilk_helper_epilogue 函数检查“主线程”任务是否已被“窃取”,决定后续执行的路径。

  • 若任务没有被”窃取”,即执行“Fast Clone”,那么函数直接返回,继续计算 fib(n-2) …

  • 否则,通过 long_jmp 机制跳转到 cilk.sync.savestate 处,由新任务保存的状态,完成最后的返回操作;另一方面,被窃取执行的新任务则执行 fib(n-2),然后在 cilk.sync.savestate 处用 setjmp 保存执行状态,再执行 sync 操作 —— 之后,这个新任务就结束了,等待与之并行的那个 Spawn 的任务返回完成后续操作。

我把上面这个流程花了一个草图,供大家参考:

总结

Cilkplus的运行时是我目前所知利用 setjmp / long_jmp 机制实现 “N:M” 协程系统的唯一实现,并且经过多年发展已经非常成熟。 目前,Cilkplus不仅为Intel自家的ICC编译器所支持,同时已合并到GCC主干,成为了GCC支持的语言。另外,基于Clang/LLVM的编译器也已经开源并已初具规模。

由于Cilkplus主要面向的高性能计算领域目前还是被Intel架构服务器所主宰,所以仅支持Intel X86 / X64 架构的策略暂时还无伤大雅。 但如果要实现一套更加通用的协程系统,那么依靠“setjmp / long_jmp”机制本身可能就比较困难了。

好了,今天的内容就是这些。下次将介绍基于“ucontext”的协程实现机制,敬请关注!

构建C协程之概述

Published on:

从本篇开始,打算开启一个系列,专门介绍C语言环境下各种协程系统的实现机制,算是对前阶段工作的一个系统总结吧! 目前的计划是根据实现机制的不同,分成两到三篇来介绍,具体的案例主要以 Go 和 Cilkplus 这两门语言的运行时环境为主,同时结合其他一些个人觉得比较有新意的小型开源系统。 另外,最近陆陆续续分析了很多开源并发系统的代码,希望也能抽时间好好总结一下。

什么是“协程”?

“协程”(“coroutine”),有时也叫做“用户线程”、“纤程” (“fiber”)等,是一种轻量级用户执行线索,其特点是调度和切换都发生在用户态,无需内核干预,因此切换代价较小,特别适合实现一些高并发类的系统应用 —— 比如 Web 服务器 —— 每个链接的服务历程都可以用“协程”来实现,当某个链接遇到I/O阻塞时,可以快速切换到其他执行线索,从而大大提高了系统整体的吞吐率。

由于系统的调度器和执行线索都处于“用户态”,调度器通常无法中断某个运行中的“协程”,因此通常来说,“协程”的调度器往往采用“协作调度”策略 —— 即执行中的“协程” 需要显式调用类似yeild 这样的方法来让出处理器资源,以便其他任务执行。 这也就是“协程”得名的原因。

当然,对于一些语言,比如 Erlang,由于采用基于指令虚拟机的实现方式,调度器通常实现在虚拟机层,仍然能够控制用户级任务的执行,因此 Erlang 的轻量进程是采用“分时调度”的。但我们这里仅讨论一般意义上的“协程”,且主要基于C语言的实现,所以仍以“协作调度”为主。

类似系统

一种类似的方案是采用“异步+回调”的方式,比如libeventnode.js之类框架,其本质是将用户任务的粒度降低到以函数为单位,系统后台启动多个工作线程,通过“事件驱动”的方式异步的从任务队列中取出并执行这些回调函数。

这类方案的底层系统实现起来相对比较简单,理论上也非常高效,但要求用户程序以异步方式编写,给用户程序开发、代码维护、调试等带来了一些问题。 而基于“协程”的系统,所有用户任务都是“同步”的,也就是完全按照实际执行时序编程,降低了用户程序开发、调试、维护的开销。

基于C语言的实现模型

早期的构建在C语言之上“协程”库往往仅包含一个执行的OS线程,多个用户任务都在该线程上分时执行,是一种 “N:1” 的映射模式。 典型的例子是libtask。由于调度器实际上是串行执行的,无需考虑复杂的线程同步问题,所以实现起来就比较简单。

近年来,随着多核乃至众核处理器的大规模出现和普及,使得原来基于 “N:1” 的模型无法满足系统的可扩展性需求。 因此业界提出了很多基于多核架构的协程系统方案,即所谓 “M:N” 模型 —— 多个“协程”可以映射到多个OS线程执行,也就是说在多核平台上,不同的“协程”能实现真正意义上的“并行”执行。

本质上来说,Google开发的Go语言和Intel主导开发的Cilkplus语言都是 “M:N” 的代表——虽然它们表面上都是新的语言,但调度器核心,即运行时环境(runtime)都是基于C(及部分C++)语言的。 所以,这两个runtime系统将作为“构建C协程”这个系列里,重点关注的对象。

实现方式分类

总结起来,用C语言实现协程的主要方案包括三类:

  1. 利用标准C提供的setjmp/long_jmp机制,比如libconcurrency,以及前面讲的Cilkplus的运行时环境均属此类。这种方式的优点是可移植性好,理论上只要平台提供C标准库就可以移植,并且“协程”切换效率相对比较高。但同时,对其流程把握通常比较困难,也很难为每个协程实现独立的运行栈。

  2. 利用GNU C库提供的ucontext机制;或者使用Windows平台提供的Fiber机制。这种方式的优点是流程清晰,编程思路简单;但是可移植性和切换效率欠佳。

  3. 利用C的switch/goto等语句的巧妙组合,可以用少量的代码实现简单的协程支持,比如Protothreads项目,其实现非常简单,号称“蝇量级”。但可扩展性不好,不适合移植到多核等复杂系统,因此就不在本系列中详述了,感兴趣者请参考 “Protothreads” 代码及相关分析

在后续的文章中,我会针对前两种实现进行分析,主要的参考是采用“setjmp/long_jmp”实现的 Cilkplus 运行时库 libcilkrts(Linux版),以及采用“ucontext”实现的 GCC Go 前端运行时库 libgo 。 敬请关注!

Go 语言对OS系统调用的处理

Published on:
Tags: Go

上回讲Goroutine状态变换的时候,遗留了一部分关于Syscall处理的内容,这次打算把Go语言对Syscall的处理机制系统的总结一下,放在今天这篇文章中。

Go 语言库对Syscall的封装

我们知道Go是一门面向系统级开发的Native编程语言,与C/C++ 类似,Go的编译器会直接将程序编译、链接成本地可执行文件。理论上,它可以完成任何C/C++语言能完成的。作为支撑该特性的重要方面,Go以标准库形式提供了syscall包,用来支持OS级系统调用。

首先,Go对各种系统调用接口进行了封装,提供给用户一组Go语言函数,方便在程序中直接调用,如:

func Read(fd int, p []byte) (n int, err error)
func Write(fd int, p []byte) (n int, err error)

同时,Go还通过以下函数提供了对Syscall的直接调用支持:

func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno)

func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
func RawSyscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno)

其中,带有Raw前缀的一组操作表示直接调用syscall (注:以Linux为例,在AMD64中是通过syscall指令实现,在X86中是int 0x80软中断,而ARM中则是采用SWI软中断实现系统调用),而不带Raw前缀的操作则在真正调用syscall前会先调用runtime·entersyscall,并在syscall返回后插入runtime·exitsyscall。这两个辅助函数的功能我们在前面介绍调度器时已经说过了,后面还会再提。

这4个函数全都是用汇编语言实现的,并且和具体的硬件架构及OS相关,比如Linux下ARM架构的相应实现,在 src/pkg/syscall/asm_linux_arm.s中。至于其他的如Read/Write这类的函数,其内部基本上都调用上面的4个函数实现的。

运行时支持

我们之前讲了很多次,Go语言runtime为了实现较高的并发度,对OS系统调用做了一些优化,主要就体现在runtime·entersyscall和入runtime·exitsyscall这两个函数上,它们的实现代码在src/pkg/runtime/proc.c之中,之前我们已经多次讨论过这个文件了。

在分析实现代前,我们先来看看函数的声明,位置在src/pkg/runtime/runtime.h中:

void runtime·entersyscall(void);
void runtime·entersyscallblock(void);
void runtime·exitsyscall(void);

这里声明了3个函数,多了一个void runtime·entersyscallblock(void),在后面会分析它的功能和使用情况。

好了,现在来看实现代码。首先,我们很容易找到了void runtime·exitsyscall(void) 的实现,而另外两个却找不到,只是找到了两个与之向接近的函数定义,分别是:

void ·entersyscall(int dummy) { ... }
void ·entersyscallblock(int dummy) { ... }

通过反汇编分析,我发现代码中所有对runtime·entersyscallruntime·entersyscallblock的调用最后都分别映射到了·entersyscall·entersyscallblock,也就是说前面两个函数分别是后面两个函数的别名。至于为什么这样实现,我没有找到相关的文档说明,但感觉应该主要是由于前后两组函数参数不同的关系 —— 函数调用本身是不需要传入参数的,而函数实现时,无中生有了一个dummy参数,其目的就是为了通过该参数指针(地址)方便定位调用者的PC和SP值。

runtime·entersyscall

好了,我们回到函数实现分析上来,看看进入系统调用前,runtime究竟都做了那些特别处理。下面将这个函数分成3段进行分析:

  • 首先,函数通过“pragma”将该函数声明为“NOSPLIT”,令其中的函数调用不触发栈扩展检查。

    刚进入函数,先禁止抢占,然后通过dummy参数获得调用者的SP和PC值(通过save函数保存到g->sched.spg->sched.pc),将其分别保存到groutine的syscallspsyscallpc字段,同时记录的字段还有syscallstacksyscallguard。这些字段的功能主要是使得垃圾收集器明确栈分析的边界 —— 对于正在进行系统调用的任务,只对其进入系统调用前的栈进行“标记-清除”。(实际上,Go语言的cgo机制也利用了entersyscall,因而cgo运行的代码不受垃圾收集机制管理。

    然后,Goroutine的状态切换到Gsyscall状态。

#pragma textflag NOSPLIT
void
·entersyscall(int32 dummy)
{
  // Disable preemption because during this function g is in Gsyscall status,
  // but can have inconsistent g->sched, do not let GC observe it.
  m->locks++;

  // Leave SP around for GC and traceback.
  save(runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));
  g->syscallsp = g->sched.sp;
  g->syscallpc = g->sched.pc;
  g->syscallstack = g->stackbase;
  g->syscallguard = g->stackguard;
  g->status = Gsyscall;
  if(g->syscallsp < g->syscallguard-StackGuard || g->syscallstack < g->syscallsp) {
      // runtime·printf("entersyscall inconsistent %p [%p,%p]\n",
      // g->syscallsp, g->syscallguard-StackGuard, g->syscallstack);
      runtime·throw("entersyscall");
  }
  • 下面的代码是唤醒runtime的后台监控线程sysmon,在之前讲调度器的时候说过,sysmon会监控所有执行syscall的线程M,一旦超过某个时间阈值,就将该M与对应的P解耦。
if(runtime·atomicload(&runtime·sched.sysmonwait)) {  // TODO: fast atomic
  runtime·lock(&runtime·sched);
  if(runtime·atomicload(&runtime·sched.sysmonwait)) {
      runtime·atomicstore(&runtime·sched.sysmonwait, 0);
      runtime·notewakeup(&runtime·sched.sysmonnote);
  }
  runtime·unlock(&runtime·sched);
  save(runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));
}
  • 将M的mcache字段置空,并将P的m字段置空,将P的状态切换到Psyscall(注意,与G类似,P也存在若干状态的切换,PsyscallPgcstop都是其中的状态)。

    检查系统此刻是否需要进行“垃圾收集”,注意,syscall和gc是可以并行执行的。

    由于处于syscall状态的任务是不能进行栈分裂的,因此通过g->stackguard0 = StackPreempt使得后续操作时,一旦出现意外调用了栈分裂操作,都会进入 runtime的morestack函数并捕获到错误。最后别忘记重新使能任务抢占。

m->mcache = nil;
m->p->m = nil;
runtime·atomicstore(&m->p->status, Psyscall);
if(runtime·sched.gcwaiting) {
  runtime·lock(&runtime·sched);
  if (runtime·sched.stopwait > 0 && runtime·cas(&m->p->status, Psyscall, Pgcstop)) {
      if(--runtime·sched.stopwait == 0)
          runtime·notewakeup(&runtime·sched.stopnote);
  }
  runtime·unlock(&runtime·sched);
  save(runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));
}

// Goroutines must not split stacks in Gsyscall status (it would corrupt g->sched).
// We set stackguard to StackPreempt so that first split stack check calls morestack.
// Morestack detects this case and throws.
g->stackguard0 = StackPreempt;
m->locks--;
}

这里提一个问题:为什么每次调用runtime·lock(&runtime.sched)runtime·unlock(&runtime·sched)后,都要重新调用save保存SP和PC值呢?

runtime·entersyscallblock

·entersyscall函数不同,·entersyscallblock在一开始就认为当前执行的syscall 会执行一个相对比较长的时间,因此在进入该函数后,就进行了M和P的解耦操作,无需等待sysmon处理。

  • 该函数第一部分与·entersyscall函数类似:
#pragma textflag NOSPLIT
void
·entersyscallblock(int32 dummy)
{
  P *p;

  m->locks++;  // see comment in entersyscall

  // Leave SP around for GC and traceback.
  save(runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));
  g->syscallsp = g->sched.sp;
  g->syscallpc = g->sched.pc;
  g->syscallstack = g->stackbase;
  g->syscallguard = g->stackguard;
  g->status = Gsyscall;
  if(g->syscallsp < g->syscallguard-StackGuard || g->syscallstack < g->syscallsp) {
      // runtime·printf("entersyscall inconsistent %p [%p,%p]\n",
      // g->syscallsp, g->syscallguard-StackGuard, g->syscallstack);
      runtime·throw("entersyscallblock");
  }
  • 后面的部分就不太一样了,基本上就是直接将当前M与P解耦,P重新回到Pidle状态。
p = releasep();
handoffp(p);
if(g->isbackground)  // do not consider blocked scavenger for deadlock detection
  incidlelocked(1);

// Resave for traceback during blocked call.
save(runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));

g->stackguard0 = StackPreempt;  // see comment in entersyscall
m->locks--;
}

前面说过,所有syscall包中的系统调用封装都只调用了runtime·entersyscall,那么runtime·entersyscallblock的使用场景是什么呢?

通过查找,发现Go1.2中,仅有的一处对runtime·entersyscallblock的使用来自bool runtime.notetsleepg(Note *n, int64 ns)中(当然,针对不同的OS平台有Futex和Sema两种不同的实现)。Note类型在Go中主要提供一种“通知-唤醒”机制,有点类似PThread中的“条件变量”。 为了实现高并发度,Go不但实现了线程级的阻塞,还提供了Goroutine级阻塞,使得一个运行的Goroutine也可以阻塞在一个Note上 —— 对应的P会解耦释放,因此系统整体并发性不会收到影响。

上述机制在runtime中多有使用,比如在“定时器”模块中 —— 后面有机会会详细介绍。

runtime·exitsyscall

该函数主要的功能是从syscall状态恢复,其结构比较清晰,主要分为两个步骤:

  • 尝试调用exitsyscallfast函数,假设对应的M与P没有完全解耦,那么该操作会重新将M与P绑定;否则尝试获取另一个空闲的P并与当前M绑定。如果绑定成功,返回true,否则返回false,留待runtime·exitsyscall做后续处理。 代码如下:
// The goroutine g exited its system call.
// Arrange for it to run on a cpu again.
// This is called only from the go syscall library, not
// from the low-level system calls used by the runtime.
#pragma textflag NOSPLIT
void
runtime·exitsyscall(void)
{
  m->locks++;  // see comment in entersyscall

  if(g->isbackground)  // do not consider blocked scavenger for deadlock detection
      incidlelocked(-1);

  if(exitsyscallfast()) {
      // There's a cpu for us, so we can run.
      m->p->syscalltick++;
      g->status = Grunning;
      // Garbage collector isn't running (since we are),
      // so okay to clear gcstack and gcsp.
      g->syscallstack = (uintptr)nil;
      g->syscallsp = (uintptr)nil;
      m->locks--;
      if(g->preempt) {
          // restore the preemption request in case we've cleared it in newstack
          g->stackguard0 = StackPreempt;
      } else {
          // otherwise restore the real stackguard, we've spoiled it in entersyscall/entersyscallblock
          g->stackguard0 = g->stackguard;
      }
      return;
  }

  m->locks--;
  • 如果exitsyscallfast函数失败,则需要将当前的groutine放回到任务队列中等待被其他“M&P”调度执行,通过上一讲我们知道,类似的操作必须在g0的栈上执行,因此需要使用runtime.mcall来完成,代码如下:
// Call the scheduler.
runtime·mcall(exitsyscall0);

// Scheduler returned, so we're allowed to run now.
// Delete the gcstack information that we left for
// the garbage collector during the system call.
// Must wait until now because until gosched returns
// we don't know for sure that the garbage collector
// is not running.
g->syscallstack = (uintptr)nil;
g->syscallsp = (uintptr)nil;
m->p->syscalltick++;
}
  • 我们再仔细看看exitsyscall0的实现,和runtime的其他部分类似,M对于放弃执行总是有点不太情愿,所以首先还是会先看看有没有空闲的P,如果还是没有,只好将groutine放回全局任务队列中,如果当前M与G是绑定的,那M必须阻塞直到有空闲P可用才能被唤醒执行;如果M没有与G绑定,则M线程结束。 最后,当这个goroutine被再次调度执行时,会返回到runtime.mcall调用后的代码处,做一些后续的清理工作 —— 将syscallstacksyscallsp字段清楚以保证GC的正确执行;对P的syscalltick字段增1。

一点说明

Go语言之所以设计了M及P这两个概念,并对执行syscall的线程进行特别处理,适当进行M和P的解耦,主要是为了提高并发度,降低频繁、长时间的阻塞syscall带来的问题。但是必须意识到,这种机制本身也存在一定的开销,比如任务迁移可能影响CACHE、TLB的性能。

所以在实现中,并非所有的系统调用之前都会先调用·entersyscall

对于runtime中的一些底层syscall,比如所有的底层锁操作 —— 在Linux中使用的是Futex机制 —— 相应的Lock/Unlock操作都使用了底层系统调用,此时线程会直接调用syscall而不需要其他的操作,这样主要是保证底层代码的高效执行。

一些不容易造成执行线程阻塞的系统调用,在Go的syscall包中,通过RawSyscall进行封装,也不会调用runtime·entersyscallruntime·exitsyscall提供的功能。

关于Go语言调度器实现细节的补充分析

Published on:
Tags: Go

在之前的一篇博文中,简单的介绍了一下Go调度器的原理。而进行了一番深入分析后,发现Go的调度器代码中存在许多值得玩味的细节,不仔细体会可能很难发觉作者的匠心。原本打算再写一篇文章系统的分析一下这些细节,但无意中发现了另一位爱好Go的朋友已经做类似的工作,并且结构非常清晰,内容也较准确(链接在这里),为了避免造成雷同就放弃了之前的计划,转而罗列一些个人认为的Go语言中比较有意思的点,分别展开介绍一下。文章结构随性,如果感觉缺少联系,请参考其他资料。

Goroutine 状态的演变

在讲解操作系统进程调度的部分时,几乎所有的书籍都会先列出一张进程的状态迁移图,通过状态图,能很清晰的把进程调度的每个环节串联起来,方便理解。

Go运行时的调度器其实可以看成OS调度器的某种简化版本,一个goroutine在其生命周期之中,同样包含了各种状态的变换。弄清了这些状态及状态间切换的原理,对搞清整个Go调度器会非常有帮助。

好了,以下是我总结的一张goroutine的状态迁移图,圆形框表示状态,箭头及文字信息表示切换的方向和条件:

下面来简单分析一下, 其中状态 Gidle 在Go调度器代码中并没有被真正被使用到,所以直接忽略。事实上,一旦runtime新建了一个goroutine结构,就会将其状态置为Grunnable并加入到任务队列中,因此我们以该状态作为起点进行介绍:

  • Grunnable: Go 语言中,包括用户入口函数main·main的执行goroutine在内的所有任务,都是通过runtime·newproc/runtime·newproc1 这两个函数创建的,前者其实就是对后者的一层封装,提供可变参数支持,Go语言的go关键字最终会被编译器映射为对runtime·newproc的调用。当runtime·newproc1完成了资源的分配及初始化后,新任务的状态会被置为Grunnable,然后被添加到当前 P 的私有任务队列中,等待调度执行。

    从图中我们可以看到,还有几条通向Grunnable的路径:当某个阻塞任务(状态为Gwaiting)的等待条件满足而被唤醒时——如一个任务G#1向某个channel写入数据将唤醒之前等待读取该channel数据的任务G#2——G#1通过调用runtime·readyG#2状态重新置为Grunnable并添加到任务队列中。关于任务阻塞,稍后还很详细介绍。另外的路径是从GrunningGsyscall状态变换到Grunnable,我们也都合并到后面介绍。

    总之,处于Grunnable的任务一定在某个任务队列中,随时等待被调度执行。

  • Grunning: 所有状态为Grunnable的任务都可能通过findrunnable函数被调度器(P&M)获取,进而通过execute函数将其状态切换到Grunning, 最后调用runtime·gogo加载其上下文并执行。

    前面讲过Go本质采用一种协作式调度方案,一个正在运行的任务,需要通过调用yield的方式显式让出处理器;在Go1.2之后,运行时也开始支持一定程度的任务抢占——当系统线程sysmon发现某个任务执行时间过长或者runtime判断需要进行垃圾收集时,会将任务置为”可被抢占“的,当该任务下一次函数调用时,就会让出处理器并重新切会到Grunnable状态。关于Go1.2中抢占机制的实现细节,后面又机会再做介绍。

  • Gsyscall: 这个状态其实在介绍调度器那篇文章中就已经提及了——Go运行时为了保证高的并发性能,当会在任务执行OS系统调用前,先调用runtime·entersyscall函数将自己的状态置为Gsyscall——如果系统调用是阻塞式的或者执行过久,则将当前MP分离——当系统调用返回后,执行线程调用runtime·exitsyscall尝试重新获取P,如果成功且当前任务没有被抢占,则将状态切回Grunning并继续执行;否则将状态置为Grunnable,等待再次被调度执行。

  • Gwaiting: 当一个任务需要的资源或运行条件不能被满足时,需要调用runtime·park函数进入该状态,之后除非等待条件满足,否则任务将一直处于等待状态不能执行。除了之前举过的channel的例子外,Go语言的定时器、网络IO操作都可能引起任务的阻塞。

    runtime·park函数包含3个参数,第一个是解锁函数指针,第二个是一个Lock指针,最后是一个字符串用以描述阻塞的原因。

    很明显,前两个参数是配对的结构——由于任务阻塞前可能获得了某些Lock,这些Lock必须在任务状态保存完成后才能释放,以避免数据竞争。我们知道channel必须通过Lock确保互斥访问,一个阻塞的任务G#1需要将自己放到channel的等待队列中,如果在完成上下文保存前就释放了Lock,则可能导致G#2将未知状态的G#1置为Grunnable,因此释放Lock必须在runtime·park内完成。

    由于阻塞时任务持有的Lock类型不尽相同——如Select操作的锁实际上是一组Lock的集合——因此需要特别指出Unlock的具体方式。

    最后一个参数主要是在gdb调试的时候方便发现任务阻塞的原因。

    顺便说一下,当所有的任务都处于Gwaiting状态时,也就表示当前程序进入了死锁态,不可能继续执行了,那么runtime会检测到这种情况,并输出所有Gwaiting任务的backtrace信息。

  • Gdead: 最后,当一个任务执行结束后,会调用runtime·goexit结束自己的生命——将状态置为Gdead,并将结构体链到一个属于当前P的空闲G链表中,以备后续使用。

    Go语言的并发模型基本上遵照了CSP模型,goroutine间完全靠channel通信,没有像Unix进程的waitwaitpid的等待机制,也没有类似“POSIX Thread”中的pthread_join的汇合机制,更没有像killsignal这类的中断机制。每个goroutine结束后就自行退出销毁,不留一丝痕迹。

深入任务切换 —— m->g0和runtime·mcall的妙用

通过上面的分析,相信大家已经基本理清了goroutine执行的线索。 现在让我们再仔细观察一下状态切换的过程,首先,以Grunning状态为中心来看,把状态切换先粗略的分为两大类——

  1. Grunning变为其他状态,即goroutine退出(“用户态”)执行;
  2. 由其他状态变为Grunning状态,即被调度执行

第一类是由当前活跃的goroutine主动调用runtime相关函数完成的,是主动的;而第二类则是由runtime或其他goroutine完成的,是被动的。

对于第一类,其实还可以细分 —— 到Gsyscall的情况我们以后讨论,先来看其他几种情况,分别是:

  • 调用runtime·sched主动让出处理器,其实就对应于Go语言的yield关键字,状态切换到Grunnable
  • 调用runtime.park阻塞,状态切换到Gwaiting
  • 调用runtime·goexit结束,状态切换到Gdead

看一下这几个函数的实现,发现它们不过是一层封装,以runtime·park为例,最终会调用runtime·mcall(park0)完成真正的任务,代码如下:

void
runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 reason)
{
  m->waitlock = lock;
  m->waitunlockf = unlockf;
  g->waitreason = reason;
  runtime·mcall(park0);
}

其他几个也类似,分别调用了runtime·mcall(sched0)runtime·mcall(goexit0)

先来看看这个runtime·mcall函数的功能。它具体定义在src/pkg/runtime/asm_[arm|amd64|386].s中,是用汇编语言实现的。以下是截取该文件中对mcall函数的注释:

// void mcall(void (*fn)(G*))
// Switch to m->g0's stack, call fn(g).
// Fn must never return. It should gogo(&g->sched) to keep running g.

这段注释意思简明清楚 —— runtime.mcall函数接受一个以G*为参数的函数指针fn,执行时会先将当前任务g的上下文保存到g->sched结构中,然后切换到m->g0的栈空间,再调用fn,参数就是当前任务指针g。对runtime.mcall函数的调用是不会返回的,除非调用gogo(&g->sched)

也就是说,在执行runtime.mcall(park0)之后,会进入park0(g)继续执行。我们再看看park0的实现代码:

static void
park0(G *gp)
{
  gp->status = Gwaitng;
  gp->m = nil;
  m->curg = nil;
  if (m->waitunlockf) {
      m->waitunlockf(m->waitlock);
      m->waitunlockf = nil;
      m->waitlock = nil;
  }
  if (m->lockedg) {
      stoplckedm();
      execute(gp); // Never return
  }
  schedule();
}

这个函数的功能也很简单:

  1. gp状态切换到Gwaitng
  2. gpm分离
  3. 释放在用户态时持有的锁(如果有的话)
  4. 最后,如果mgp是强制绑定的,那么m线程会等待gp状态变为Grunnable后再将其调度执行;否则直接调用schedule重新选择可执行的任务

其他几个函数实现也是类似的结构,只需将步骤3替换成对应的操作 —— 对于sched0就是将gp加到全局任务队列里;对于goexit0就是释放、回收gp的资源。

到这里,我们似乎搞清了前因后果,但好像还有什么地方有点模糊 —— 对了,所有这些xxxx0函数为什么都要先切换到m->g0的栈上执行呢?为什么不可以直接在当前g的栈上执行?那样不是省去了若干次上下文保存、恢复的麻烦吗?

要搞清这个问题,先要看看m->g0这个结构。我们之前说过,Go中的M对应OS线程,每个M分配时会首先创建一个g0任务,并分配大约8KB大小的栈空间。在线程创建时刻 —— 比如在Linux中,通过clone系统调用 —— 会将g0的栈绑定给对应的OS线程。

前面讲了,每个用户goroutine创建后,都会分配独立的栈(初始大小稍小一些,因为Go的栈是动态可扩展的),执行用户任务就会切换到用户任务的栈上。这样,g0的栈空间实际上就是独立于任何用户任务的,因此可以执行一些不适合在用户栈上执行的程序。这个有点类似OS中用户栈和内核栈的关系。

sched0为例说明为什么执行goroutine的切换不能在当前g的栈上完成。

假设我们这么做,那么首先要保存g的上下文状态到g->sched结构体中,然后从任务队列中选择另一个状态为Grunnable的任务g1,在将上下文切换到g1之前,我们需要先将g放回任务队列中,以便它未来还能被调度执行,然后M#1切换到g1 —— 注意,问题来了!由于Go的runtime是多线程的,因此可能同时存在多个执行线程,一旦g进入队列,那么它完全可能被另一个线程M#2调度执行。这时,M#1M#2实际上都运行在g的栈上(goroutine切换不是“原子操作”!),就可能出现数据竞争从而导致错误!

而利用m->g0的栈进行sched0这样的操作,由于不同的线程有各自独立的g0及栈空间,因而不会发生数据竞争问题。

runtime·mcall 的另一个用途

除了进行goroutine切换外,runtime·mcall还有一个功能,就是可以委托处理栈空间分配。具体来说,当一个任务通过go关键字新建任务时 —— 我们知道该操作最终会映射到runtime·newproc1函数 —— 那么就会涉及调用runtime·mstackalloc对新建任务分配栈空间的操作。

Go 1.2 的用户任务采用了“分段式栈”的实现方案,其栈空间是根据需要动态扩展的,每个函数调用点都会判断当前栈空间是否满足需要,如果不够就要追加分配。要确保调用runtime·mstackalloc时不会再出现栈分配的情形,就不能直接在用户空间上运行该函数。现在,我们很容易想到的就是利用runtime·mcall切换到g0上执行栈分配!因为g0的初始栈空间比较大,可以认为能够满足调用需要。

与之前xxxx0函数情况略有不同,在分配完栈空间后,我们希望马上切换会刚才的g,而不是触发新的调度,因此,必须直接调用runtime·gogo(&g->sched)返回——这点在runtime·mcall的注释中也说得很明白。

待续…

今天初步把Go语言任务状态变化串讲了一下,更重要的是把m->g0runtime·mcall这两个结构分析了一下。本来还想继续介绍一下有关Gsyscall状态的一些细节,不过看看时间已经接近凌晨2点了,再不休息估计明天上班会很辛苦。所以就把这个话题,连同之前讲channel时没有深入分析的“定时器”机制一起放在后面完成。


限于个人水平,本文内容不免谬误之处,欢迎大家致信我的邮箱amalcaowei@gmail.com或在讨论版留言!

Erlang & Go 的IO优化策略简介

Published on:

之前关于调度器的对比分析的文章,在结束时遗留了一些问题:当系统出现高并发的IO访问时,如一个网络服务器通常要并发处理成百上千的链接,每个链接可能都是由一个用户任务执行的,那么将会出现大量阻塞的IO操作,如果为每个阻塞操作都单独分配一个OS线程,那么系统很容易就会退化成多OS线程的系统,轻量任务的优势将无从谈起。本文试图回答这个问题,通过分析Go和Erlang对于IO、特别是网络IO的优化机制,了解其对调度器乃至整个系统性能的影响。

Go的IO优化机制 —— netpoller

由于Go是一门主要面向互联网环境的分布式语言,相对于一般的IO,如文件读写等,网络IO的并发性能更加重要。对于一般IO,Go的处理方式就是按上篇所说的,将执行Syscall的OS线程剥离。通常应用场景下,不会出现大量并发Goroutine去同时读写文件的情况,因而上面的方式并不会真正造成调度器的退化。因此主要的IO优化都是针对io/net库的。

无独有偶,Erlang在实现上同样对网络IO提供了不同于一般IO的高效处理方式,后面再作介绍。

Go实现中利用了OS提供的非阻塞IO访问模式,并配合epll/kqueue等IO事件监控机制;但是为了弥合OS的异步机制与Go接口的差异,Go在其库中做了一些封装,并在runtime层提供了一种叫做netpoller,“网络轮询器”的机制,来实现网络IO优化。具体来说:

  • 首先,无论何时,当在Go中打开或接收到一个链接时,其文件句柄都会被设为NONBLOCKING模式。(Go语言库
  • 当调用相应的Read/Write等操作时,无论是否成功,都会直接返回而不会阻塞。当返回值是EAGAIN时,表示IO事件还没有到达,需要等待。这时,Go库函数调用PollServerAddFd()将对应文件句柄加入netpoller的监控池,并将当前Goroutine阻塞。(Go语言库、netpoll.goc中
  • 当系统中存在空闲 P & M (参见这里) 时,runtime 会首先查找本地就绪队列,若其空,则调用netpoller; netpoller通过OS提供的epoll或kqueue机制,检查已到达的IO事件,并唤醒对应的Goroutine返回给runtime,将其再度执行。(runtime/proc.c:findrunnable()
  • 最后,Goroutine再次回到Go语言库上下文时,再调用Read/Write等IO操作时,就可以顺利返回了。(Go语言库)

Erlang的IO优化机制之一 —— “Async Threads Pool”

在Erlang中,所有IO操作都需要以Port驱动的形式提供,所谓Port驱动包含一组C回调函数,用来响应用户进程的访问;用户进程则通过通用的消息传递机制与Port交互。Erlang虚拟机会把Port当做一种特殊的任务加以调度。

真正的系统调用,如read/write/flush等阻塞式操作都被封装在Port的回调函数之中,当调度器调度执行响应的Port时,就会导致当前的调度器执行线程被OS阻塞,从而影响系统的并行性。

Erlang解决该问题的办法是提供了一组OS线程作为异步线程池,阻塞的IO操作(以函数指针形式)会被Port注册到异步线程池的操作队列中。异步线程则执行循环操作,取出当前任务队列的IO任务并执行阻塞操作。

这种方式类似Go对非net类IO及执行阻塞式Syscall的调度方式:用一个单独的OS线程去执行阻塞操作。

Erlang的file IO基本上就是以上述方式实现的。

因为 Erlang 将调度器映射到一个OS线程而说其调度是1:1的其实是不准确的。基于对阻塞IO的异步处理及上篇讲到的负载平衡机制,使得Erlang实际上也实现了M:N的调度,只不过Erlang的官方文档并没有这么说,只是说单纯增加调度器数不会对性能造成影响。

Erlang的IO优化机制之二 —— “System Level Activities”

如前所述,无论时Erlang还是Go,都是针对服务器端设计的语言,因此都提供了不同于一般IO的特殊机制来处理网络IO。

Erlang的做法是提供一种特别的调度单元 —— System Level Activities,来调度异步IO事件。它的思想和Go的netpoller非常类似:

  • 首先,网络链接对应的句柄会被设为NONBLOCKING状态;
  • 一次IO操作如果在响应事件到来前被调用,则会将其等待的事件注册到Erlang虚拟机的IO事件链中;
  • 调度器在调度时,会周期性的调用check_io操作来检查已注册的IO事件是否已经到来(利用OS的poll操作),并唤醒响应事件阻塞的用户任务(进程或Port)。

值得注意的是,Erlang虚拟机在处理IO事件时,还采用了一种 stealing 的机制。具体来说,当一个driver的函数调用IO操作时,如果对应IO事件没有到来时,还会主动调用 select_steal()窃取其他已注册的IO事件,如果该事件已触发,则完成相应的读/写操作,并通知上层进行后续处理。

Libtask中的异步IO机制

作为Go语言的前身,Libtask库同样实现了异步IO机制,并且实现方式更加简洁。

与Go类似,在Libtask中,为用户级task封装了IO操作,提供了fdread/fdwrite/fdwait/fdnoblock等接口实现异步IO。(在libtask提供的例子中,所有IO操作都是针对网络IO的,因此仅就网络IO情况加以分析。

  • 链接句柄首先会通过调用fdblock()被设为NONBLOCKING态;
  • 之后调用fdread/fdwrite时,一旦返回EAGAIN,则调用fdwait,注册等待IO事件并将自身调出;
  • Libtask会在第一次接收到IO事件注册后建立一个系统任务fdtask,该任务通过调用poll系统调用检查新到来的IO事件,并将对应任务重新加到就绪队列中。

总结及参考

通过上文分析,了解到IO优化对调度器乃至语言本身性能的影响。这与两种语言的应用背景——服务器端编程有很大关系。

通常来说,应用程序必须通过Syscall 访问操作的特定功能,这就会涉及底层 OS 的调度机制,作为用户态的任务调度器,Erlang虚拟机或Go的运行时系统都必须对内核调度引入的不确定性加以控制。特别是 IO 操作这类特殊并且会大量访问到的Syscall,必须设计有针对性的优化方案,才能确保高的并发性能。

Go 和 Erlang 的实现方案随不尽相同,但核心的思想都是类似的,通过异步IO 优化基于Socket的操作,而对于一般的文件读写,则直接让执行线程及运行的用户任务阻塞,调度器再将其他可以执行的任务绑定到其他OS线程继续执行。

这篇文章除了参考了Erlang/OTP及Go语言的源代码外,还参考了以下资料:

Erlang & Go 消息传递机制初探

Published on:

上一篇文章介绍了 Go 和 Erlang 在调度器的实现,本文将简要介绍这两种并发语言的消息传递机制

简要对比

Erlang和Go虽然在实现及功能上差异较大,但是都支持高并发的轻量级用户任务(Erlang的轻量进程,Go的Goroutine), 并且都采用了消息传递的方式作为任务间交互的方式。

在Erlang中,采用了一种比较纯粹的消息传递机制,进程间几乎没有任何形式的数据共享,只能通过彼此间发送消息进行通信; 而Go虽然是基于共享内存的,但是也必须通过消息传递来进行共享数据的同步。 可以说消息传递机制是两种语言任务间交互的首要方式。

但是在具体实现中,鉴于两种语言的差异,也表现为不同的形式:

  • 在Erlang中,进程之间以彼此的Pid作为标识进行消息的发送,一切数据都仅可以消息的形式在进程间复制
  • 在Go中,不同的Goroutine间通过共享的channel进行通信,由于Go本质上是建立在共享存储模型上的,因此全局变量、参数甚至是一部分栈变量都是可以共享的,通信一般控制在较小的规模,仅用来保证共享的同步语义

下面将分别就Erlang和Go各自实现分别进行分析介绍。

Erlang中的消息传递机制

语法

消息传递是Erlang语言并发机制的基础。在Erlang中,主要包含以下并发原语:

  • 创建一个新的并发轻量进程,用于执行函数Fun,并返回其进程标识符Pid:
Pid = spawn(Fun)
  • 向标识符为Pid的进程发送消息(注: 由于Pid ! M的返回值是消息M本身,因此可以用类似Pid1 ! Pid2 ! Pid3 ! … M的语法来向多个进程发送同一个消息):
Pid  ! Message
  • receive ... end 来接收一个发送给当前进程的消息,语法如下(注: 当一个进程接收到一个消息时,依次尝试与Pattern1(及Guard1),Pattern2(及Guard2), … 进行模式匹配,若成功,则对相应的Expressions求值,否则继续后续匹配):
receive
  Pattern1 [when Guard1] ->
      Expressions1;
  Pattern2 p[when Guard2] ->
      Expressions2;
  ... ...
end
  • 对于接收操作,我们还可以为其设置一个超时控制,一旦超过某个预设的时长仍没有消息到达,则执行相应的超时操作,语法如下:
receive
  Pattern1 [when Guard1] ->
      Expressions1;
  Pattern2 p[when Guard2] ->
      Expressions2;
        ... ...
after Time ->
  ExpressionTimeout
end
  • 可以建立一个只包含超时的接收操作,事实上就相当于一个延时操作:
sleep(T) ->
  receive
      after T ->
          true
  end

以上就是Erlang中基本的消息传递的接口。

内部实现

相对于Go而言,Erlang的发展历史更加悠久,因此代码复杂程度要大大高于Go这门新型语言。 因此在分析过程中,主要还是以介绍实现机制为主,具体的数据结构及源代码实现就不作过分细致的剖析了。

Erlang的BEAM解释器的代码位于 otp/erts/emulator/beam/ 路径下, 其中与Send相关的代码位于 otp/erts/emulator/beam/erl_message.c 中, 而与Receive相关的代码则位于 otp/erts/emulator/beam/beam_emu.c 中。

之所以不在一处实现,是因为Erlang把接收操作作为一种BEAM基本指令来实现,而发送操作则以内部函数的方式实现。

在具体实现中,Erlang采用了消息Copy的方法实现消息传递——消息从发送进程的堆拷贝到接收进程的堆:

  • 在发送时,如果接收进程正在另一个调度器上执行,或者有其他并发的进程也在向其发送消息时,本次发送就存在竞争风险,不能直接完成
  • 发送者会在堆上为接收进程分配一个临时区域并将消息拷贝到该区(该内存区将在后续进行垃圾收集时合并到接收进程的堆空间)
  • 拷贝完成后,会将指向这块新区域的指针链入接收进程的消息队列
  • 如果接收进程正处于阻塞态,则将其唤醒并添加到就绪队列中

在SMP版Erlang虚拟机中,进程的消息队列由两部分组成—— 一个公共队列和一个私有队列。 公共队列用于接收其他进程发送的消息,通过互斥锁加以保护;私有队列用来减少对锁的争用: 接收进程首先在接收队列中查找符合匹配的消息,如果没有,再从公共队列中复制消息到私有队列中。

在非SMP的Erlang虚拟机中,进程只有一个消息队列。

接收进程发现当前任务队列上没有匹配的消息后,会跳转执行wait_timeout: 设置一个定时器并阻塞在该定时器上—— 当该定时器触发或者有新消息到来时,都会唤醒接收进程。

由于进程的消息缓冲是以队列形式维护的,因此从发送进程角度来看,可以认为消息缓冲的大小是无限的, 因此Send操作一般不会阻塞,这点注意与后面要将的Go消息传递机制相区别。 这也就是为什么Erlang中仅对Receive操作提供超时响应机制的原因了!

高级特性

与Go主要面向SMP服务器应用不同,Erlang是一种面向分布式集群环境的编程语言, 因此消息传递除了支持本地进程间的通信外,还支持分布式环境的进程间通信。

基本流程是:

  • 首先通过Pid(或在分布式环境上的等价概念)查询“名字服务”
  • 找到该远程进程所在的远程主机地址信息,进而通过套接字进行后续发送操作
  • 远程Erlang虚拟机进程接收到消息,再进一步分析,将其派发到指定的进程消息队列中

进一步的实现细节会涉及Erlang分布式处理的机制,这里就不展开分析了,待后续单开主题讨论。

Go中channel机制介绍

语法

在Go中,channel结构是Goroutine间消息传递的基础,属于基本类型,在runtime库中以C语言实现。 Go中针对channel的操作主要包括以下几种:

  • 创建:ch = make(chan int, N)
  • 发送:ch <- l
  • 接收:l <- ch
  • 关闭:close(ch)

另外,还可以通过select语句同时在多个channel上进行收发操作,语法如下:

select {
  case ch01 <- x:
      ... ... /* do something ... */
  case y <- ch02:
      ... ... /* do something ... */
  default:
      ... ... /* no event ... */
}

此外,基于select的操作tai还支持超时控制,具体的语法示例如下:

select {
  case v := <- ch:
      ... ...
  case <- time.After(5 * time.Second):
      ... ...
}

尽管Go以消息传递作为Goroutine间交互的主要方式,但是基于channel的通信又必须依赖channel引用的共享才能得以实现, 因此Go语言绝不是一种纯粹的消息传递语言。 一般而言,channel的引用可以通过以下几种方式在不同Goroutine间共享:

  • “父Goroutine” 的栈变量,通过用Go语句创建Goroutine时的参数进行传递
ch = make (chan int)
go fn (ch)
l <- ch
  • “父Goroutine” 的栈变量,Go创建的Goroutine以闭包作为执行函数,栈变量自动共享
ch = make (chan int)
go func () { i = 1; chan <- i; } ()
x <- chan
  • 由于Go的垃圾收集器认为channel的引用只能在栈上,因此一般不用全局的引用进行共享

另外,在创建channel时,还可以指定是否采用buffer及buffer的大小,默认buffer为0 。 当buffer大于0时,可以进行异步的消息传递:接收方只有在当前buffer为空时才阻塞,而发送方则只有在buffer满时才阻塞。 详细情形将在后面介绍。

内部实现

核心数据结构

Go中channel的实现代码在src/pkg/runtime/chan.c中,其核心数据结构如下 (对于gccgo,其实现代码在libgo/runtime/chan.c中,由于使用不同的C编译器及语法,因而数据结构实现略有不同):

struct Hchan
{
  uintgo   count;        // total data in q
  uintgo   dataqsize;    // size of the circular q
  uint16   elemsize;
  uint16   pad;      // ensures proper alignment of the buffer that follows Hchan in memory
  bool    closed;
  Alg*  elemalg;  // interface for element type
  uintgo   sends;        // send index
  uintgo   recvx;        // receive index
  WaitQ    recvq;        // list of recv waiters
  WaitQ    sendq;        // list of send waiters
  Lock;
}

我们可以按照表示属性还是表示状态将Hchan的内部成员进行分类并逐一分析。

表示channel属性的成员如下,这些成员在用make进行初始化后确定,并且在后续操作中不会变化:

struct Hchan
{
  ... ...
  uintgo   dataqsize;
  uint16   elemsize
  uint16   pad;
  Alg*  elemalg;
  ... ...
}

其中,

  • dataqsize是前文所说的buffer的大小,即make(chan T, N)中的N
  • elemsize 是channel对应单个元素的大小,pad主要用于表示该类型元素的内存对齐边界
  • elemalg 也对应于类型属性,主要是一些列具体操作该类型的函数指针,如copy、hash、equal及print等接口

另外一类成员则用来表示当前channel的状态,是随着程序运行而发生变化的:

struct Hchan
{
  ... ...
  bool    closed;

  uintgo   count;
  uintgo   sendx;
  uintgo   recvx;

  WaitQ    recvq;
  WaitQ    sendq;
  Lock;
  ... ...
}

我们按功能将状态信息分为三类:

  • closed 表示当前channel是否处于关闭状态,在make创建后,此域被置为false,即channel处于打开状态;通过调用close将其置为true,channel关闭,之后的任何send/recv操作都会失败
  • countsendxrecvx一同构成了一个环形buffer的状态域,其中count表示当前buffer中的占用数量,sendxrecvx分别表示当前的发送位置和接收位置,注意,count的大小不能超过dataqsize。另外,还有一个“隐形”的域,即真正暂存数据的buffer空间。它并不在Hchan的域中,而是在make创建channel时被地位在紧随Hchan对象后面的相应大小的内存区域,具体代码通过chanbuf(c, i)这个宏来访问对应的区域。
  • 最后一部分是两个等待队列,分别用来存放在发送和接收过程中被阻塞的任务,由于发送接收必须是互斥操作,因此必须有相应的锁类型(注:这里用到了Plan9的C扩展语法,用内部结构体表示一种类似继承的语义

Send/Recv实现分析

我们以Send操作为例,介绍在单个channel上的具体流程,基于select语句的多channel操作将在下一节进行介绍。

首先,我们来看Send操作在Go runtime中的接口定义:

void runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc);

解释一下主要参数: * ChanType *t : 是channel对应的数据类型 * Hchan * c: 就是当前操作的channel的指针 * byte *ep: 待发送数据的缓冲区指针,由于Send操作时一个统一的接口,因此使用byte型指针 * bool *pres : 这个真针不为NIL时,Send为非阻塞式操作,如果不能发送,就直接返回并设* pres = false * void * pc : 当前操作的PC

在介绍具体函数之前,我们需要先介绍一下SudoG这一结构,该结构是前面介绍的等待队列WaitQ类型的节点元素类型,其定义如下:

struct SudoG
{
  G*        g;            // g and selgen constitute
  uint32   selgen;       // a weak pointer to g  
  SudoG*    link;
  int64    releasetime;
  byte* elem;     // data elment
}

这个结构比较简单,g就是当前阻塞的Goroutine,elem是响应操作调用时传入的数据缓冲指针(*ep), link域就是单链表的下一项,releasetime用于系统性能统计(profiling)。 最后的selgen在select操作时被用到,后面详述。

下面具体分析该函数流程:

  1. 首先,在判断c非空,及一系列初始化操作后,当前任务尝试获得c的锁
  2. 成功锁定当前c后,先判断是否带buffer,若有buffer,则进入async模式,否则继续
  3. 若c已close,则返回(从close态返回是实现在channel上的Range操作的关键!
  4. 否则,尝试从c->recvq中取出一个任务(SudoG类型),如果返回非空,则该任务是一个正在阻塞状态的接收任务,执行:
    • 将当前的ep指针指向的数据copy到取出的SudoG类型元素的elem指针区域
    • SudoG中的releasetime设为当前系统tick值
    • 调用runtime·ready唤醒对应的g,本次Send操作完成
  5. 如果返回NIL,表明当前没有等待的接收任务。这时需要判断pres指针是否为空。若不为空,将其所指向的bool变量设为false,并退出
  6. 反之,则当前任务挂起:
    • 在当前栈上新建SudoG对象,并用当前gep初始化
    • 将该SudoG链入c->sendq
    • 调用runtime·park将自己阻塞,同时释放c的锁
  7. 当前g对象被唤醒后,首先判断是否是c已关闭,进而根据releasetime值唤醒超时事件,最后返回

对于带buffer的channel,将进入async模式进行处理:

  1. 首先判断当前缓冲区是否已满,若满,则阻塞,过程与上面不带buffer的情况类似
  2. 若缓冲区不满,则将ep指针指向的区域按c元素大小复制到环形缓冲区,并修改缓冲区内部状态

以上就是Send操作的全部实现,其中有一个比较重要的函数,即从等待队列中取出一个元素的操作dequeue(WaitQ *)还没有仔细分析。 主要原因是其涉及到下一节要介绍的select机制,因此退后分析。至于其反操作enqueue(WaitQ *)则就是一个简单的链表尾部插入,不需要额外分析。

对于Recv操作,其步骤基本上与Send类似,仅仅是操作的队列和copy的方向有别,这里不再赘述。

为了更好的理解上述操作,我做了一组简单的示意图,以“异步channel”(即带缓冲区的channel)为例,来描述这一过程:

  • 任务 Go#1 向channel发送数据#1,此时channel的缓冲区是空的,并且没有正在阻塞的任务,则数据#1被拷贝到缓冲区,缓冲区指针后移,Go#1 完成操作退出(非阻塞),如图1所示: 

  • 当channel的缓冲区被填满后,当另一个任务 Go#4 再进行发送操作时,就只能将自己及待发送的数据#4以某种方式链入channel的发送等待队列中,并挂起(阻塞),如图2所示:

  • 这时,任务 Go#5 发起针对这个channel的Recv操作,首先将从缓冲区读取并拷贝数据,直到缓冲区为空:

  • 假设此时缓冲区已空,又有一个任务 Go#8 发起一个Recv操作,它将从发送队列取出一个阻塞的数据,即之前Go#4发送的#4数据,然后读取并拷贝该数据,并将Go#4唤醒:

Select操作

介绍

Go在语言级提供了一个select语句用来支持同时在多个channel上进行操作的功能,很类似于Unix系统上的select系统调用,只不过后者操作的类型时文件描述符而已。

与Unix的select操作类似,任意一个注册的channel操作返回则select操作亦返回。 Unix的select通过逐一测试文件描述符集合判断是当前操作,而Go则通过不同的case语句指示当前响应的channel操作。

两外,二者也都支持超时控制。但是Unix的超时控制是建立在分时调度机制上的,更加可靠; 而Go的调度器如前所述,是协作式的,因此超时控制不可能精确的实现。

主要数据结构

Go语言将select操作作为语言一级支持,为此额外定义了ScaseSelect两个结构体来表示一个select语句。

  • 我们先来简单的介绍一下Scase,其对应于select语句中的每个case/default块,其定义如下:
struct Scase
{
  SudoG    sg;           // must be first member (cast to Scase)
  Hchan*    chan;     // chan
  byte* pc;           // return pc
  uint16   kind;
  uint16   so;           // vararg of selected bool
  bool*    receivedp;    // pointer to received bool 
}
  • Scase 结构说明:

    • 该类型可以认为是上节介绍的SudoG类型的派生类型,其“基类型”对象就是sg(因此注释里要求它必须位于起始位置)
    • chan就是当前这个case操作的channel指针
    • pc是这个case成功后的返回地址,在下节中揭晓其用法
    • kind是当前case的操作类型,可以是CaseRecvCaseSendCaseDefault之一
    • recievedp指针用来指示当前操作是否是非阻塞的,和之前介绍的情形类似
    • so这个变量比较有意思,它主要是用来作返回时的判断,在后面详述
  • 接下来介绍Select结构,其定义如下:

struct Select
{
  uint16   tease;        // total count of scare[]
  uint16   ncase;        // currently filled scare[]
  uint16*   pollorder;    // case poll order
  Hcan**    lockorder;    // channel lock order
  Scase    scase[1];  // one per case (in order of appearance )
}
  • Select结构说明:
    • 该类型的中的tease表示该select语句中包含的case/default块的个数,由于select语句的case数在编译时刻就可以确定,因此Select被以定长方式实现
    • ncase表示当前已添加的case块数,虽然tcase在创建时就已知了,但每个case块却是在parse的过程中逐个添加的,因此需要一个描述当前大小的成员
    • pollorder是在case块列表上操作的顺序,考虑到性能因素,Go的每次select操作都采用了随机顺序访问每个case中的channel
    • lockorder 表示在当前select语句上所有channel的加锁顺序,由于select操作是一个互斥操作,并且需要同时获取所有channel的锁,因此必须保证加锁按照一个约定的顺序进行,以避免不同select间造成死锁。Go的实现是按照channel对应虚地址大小作为加锁的顺序
    • scase[1]及其后的连续内存区域是包含全部Scase的数组,大小在创建Select时就已知了,所以用固定大小数组存放

代码生成

我们知道,Go的parser遇到select语句时会将其翻译成对应的runtime库函数调用以实现其功能,那么这种映射是如何实现的呢?

  • 首先,在select开始时调用void runtime·newselect(int size, …)创建一个Select实例
  • 然后,每遇到一个case或default语句,就调用响应类型的操作向Select实例中添加一个Scase元素:
    • void runtime·selectsend(Select *sel, Hchan *c, void *elem, bool selected)
    • void runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected)
    • void runtime·selectdefault*(Select *sel, bool selected)
  • 每次调用上面3个函数,都会将返回PC值记录在相应的Scase实例中
  • 最后,添加完全部case后,调用void * runtime·selectgo(Select ** selp)进入真正的Select操作(算法详见下节)
  • 一旦发现channel操作,则该函数返回对应case记录的返回地址,执行该case对应的函数块

流程分析完了,是否觉得有点奇怪?问题出在哪儿?

对了,就是在selectgo返回时,程序如何判断目前是在注册阶段Scase阶段返回的,还是从selectgo返回的呢?

我们以runtime·selectsend操作为例解释,从Go语言的角度来看,这个函数的Go型接口定义如下:

selectsend(del *byte, hchan chan<- any, elem *any) (selected bool);

这下明白了吧?由于Google Go编译器采用了Plan9 风格的C编译器,使得该函数在Go程序看来实际上是返回了一个bool型的值selected, 该bool值即用来表明当前返回的状态,直接从selectsend返回还是从selectgo间接返回。 还记得刚才没有将明白的Scase中的so成员吗?你现在可以把它理解成一个到selected的指针(实际上是一个到该指针的偏移地址)。

需要注意的是,这里之所以可以这么实现,主要是Google Go编译时甚至使用了一套自己定义的C编译链接工具,对参数和返回值在栈空间的分配做了特别的约定。 而在gccgo中,由于采用gcc 编译,因此就必须按照较为传统的方式,由selectgo返回活跃的Scase的编号,然后再根据编号分别进行处理。

下面列出一个Go中的select语句翻译为C语言后的形式,作为参考 (事实上Go编译器会直接将Go代码翻译成汇编语言,这里用C描述主要是为了简化描述):

// Go 语言版代码片段
... ...
select {
  case ch01 <- x:
      foo() /* do something of case #1 */
  case y <- ch02:
      bar() /* do something of case #2 */
  default:
      def() /* do something for default  */
}
... ...
// 对应C代码片段示意
Select *sel = newselect(3);
... ...
if (selectsend(sel, ch01, &x)) {
  foo() ; // do something of case #1 
} else if (selectrecv(sel, ch02, &y)) {
  bar() ; // do something of case #2
} else if (selectdefault(sel)) {
  def(); // do something of default
} else {
  void (*rpc) (void);
  rpc = selectgo (&sel);
  (*rpc)(); //goto rpc;
}
... ...

算法实现

经过了上面的介绍,我们大概了解了select语句的生成规则,现在将视线集中到其核心函数runtime·selectgo函数的实现上来。

应该说该函数实际上类似于上节介绍的Send、Recv操作在多channel情况下的扩展,基本流程也比较类似。 但是需要特别注意以下几点:

  • 操作是互斥的,因此需要在进行操作前一次性获取全部待处理channel上的锁
  • select语句只要有一个channel响应即可返回,无需等待所有channel响应
  • 如果当前没有channel可以响应且不存在default语句,则当前g必须在所有channel的相应等待队列上挂起
  • 只能有一个响应的channel可以唤醒之前挂起的g
  • 另外,考虑到性能原因,select操作的顺序不一定按照程序中声明的顺序操作

明确了上面的要点后,selectgo 的实现便呼之欲出了。

  1. 首先,确定poll操作顺序和加锁的顺序:poll按照随机排序,加锁则按照channel虚拟地址排序
  2. 然后进入加锁阶段,按照#1中的顺序对每个channel加锁
  3. 进入主循环,按照#1中计算的poll顺序依次遍历所有的channel,如果当前channel可以响应(对应等待队列不空或存在可用的buffer),则跳转到#7
  4. 所有channel都不可响应,返回default的返回PC;如果没有default语句,则进入阻塞态
  5. 根据操作类型将当前g依次加入每个channel的相应WaitQ中,调用runtime·park进入阻塞态,同时释放所有channel锁(注意顺序!
  6. 当前g被唤醒,说明有channel已响应,保存该channel引用,再次获得所有的锁,并从其他channel的等待队列中将当前g删除
  7. 完成响应的copy操作,释放所有锁,返回活跃channel对应Scase中记录的PC地址

整个过程大体如下图所示:

这样,selectgo就基本上介绍完了,但是还遗留了一个问题。某个任务调用select阻塞时,会将自身对应的SudoG添加到所有channel的等待队列上, 那么如果有多个channel在任务被唤醒并完成加锁前发生响应,该如何处理呢?

要解决这个问题,就必须弄清楚chan.c中的dequeue操作是如何实现的,因为每个任务被唤醒前,必须经过dequeue从相应的等待队列上获得, 因此只要保证对于在Select语句中的操作,只能被dequeue成功返回一次即可。

之前讲过,每个SudoG都包含一个selgen成员,其实在每个G中,也包含一个selgen成员。对于一个不在Select中的Send/Recv操作, 其随影的selgen置为常量NOSELGEN,可以被dequeue直接返回。 否则,在Select操作的#5中,将Gselgen值赋给SudoGselgen。 在进行dequeue操作时,一旦发现SudoGselgen不是NOSELGEN,则调用如下原子操作:

SudoG *sgp = q->first
... ...
if ( CAS(&sgp->g->selgen, sgp->g->selgen, sgp->selgen+2) )
  return sgp;
else
... ...

由 CAS 原子操作定义可知,该阻塞的 goroutine 尽可能被唤醒一次。

初窥“超时控制”机制

这里我们先简单介绍一下Go语言select中的超时控制机制,实际上,在Go中,定时控制并非为select语句所独有,而是一种通用的机制。 在这里我们仅针对select的例子简单的介绍一下Go的定时控制机制,后续有空时再作深入分析。

回到开始时那个超时控制的例子:

... ...
select {
  case v := <- ch:
            ... ...
  case <- time.After(5 * time.Second):
            ... ...
}
... ...

在第二个case语句中,我们调用了time.After()函数定义了一个5秒钟的定时器,我们来看看这个函数的原型定义:

fund After(d Duration) <- chan Time {
  return NewTimer(d).C
}

这个函数实际上返回一个channel类型,并在定义的时间到时,向该channel发送一个Time型数据。

了解了这些,上面的例子就被统一起来了,每个case仍然是针对channel的,如果ch在5秒内响应,则执行ch对应case的语句; 否则第二个case的channel响应,也就进入了超时的处理过程中。

在Go的内部实现中,所有相关的定时操作都是通过time.Timertime.Ticker类型实现,它们内部都对应一个channel和一个runtimeTimer类型。 通过runtimeTimer可以在Go语言上下文中注册定时事件到runtime层,Go的runtime层存在一个后台任务(Groutine)来实现定时器事件唤醒功能。简单的说,这个后台任务的工作就是轮训系统中的定时器,一旦发现到期的定时器,就向其绑定的channel发送数据,则阻塞在该channel上的任务就被唤醒了。

关于runtimeTimer的实现详情,请参看Go的源代码src/pkg/runtime/time.goc。 由于这部分本身和channel机制关联并不密切,后面有机会再单开主题讨论。

总结

通过上面分析,我们不难得出两种语言在消息传递上的区别与共性, 我总结了以下几点,请大家补充、指正。

区别:

  • Go基于channel进行通信,而channel引用必须在任务间共享访问; Erlang利用Pid进行发送,而接收时仅根据消息内容区别后续处理的方式,过程中没有任何形式的数据共享

  • Go的Send/Recv操作都可能阻塞; Erlang一般仅有接收操作可能引起阻塞

  • Go的select可以同时等待多channel上的Send/Recv操作,并提供超时处理机制; Erlang的发送操作仅能每次针对一个进程,接收操作针对本进程的消息队列,也提供超时机制

  • Go的消息传递仅支持本机Goroutine间通信; Erlang消息传递支持在分布式环境上的进程间通信

共性:

  • Go和Erlang的消息传递都提供了语法级支持,并且都是语言的重要组成部分
  • Go和Erlang的消息传递都是通过数据的Copy实现的,因此都强调“小消息,大计算”的处理方式

结尾的思考

Go语言的并发模型源自Hoare的CSP模型(Communicating Sequential Processes, 国内译为“通信顺序进程”,台湾译为“交谈循序程序”),可以被视为一种类似 Unix 管道的东西。 它的特点是每个任务程序的编写完全按照其执行的时序进行,方便编程及调试分析。

与之相对的另一种并发编程模式就是基于异步消息及回调的模型,比如 libev 以及前阶段非常火的 Node.js, 这类模式的特点是调度以回调函数为单位,所有的消息发送都是异步的,程序员看到的就是消息类型及其对应的处理函数。

我认为两种方式各有优缺点,Go的方案更符合人类思考问题的习惯,编程和调试效率较高;但由于需要保证高并发性,就要实现用户任务层的调度,例如采用协程(如Go、rust等),或是采用基于虚拟机的方案(如 Erlang、Lua等),这无疑增加了上下文切换的开销,同时也增加了runtime或虚拟机的实现难度。

基于“异步消息/回调函数”的方案恰好相反,由于调度粒度变小为函数,导致实现上比较简单,顺序编程的固有性质使得系统开发者不需要考虑保护上下文或者类似的东西,每个回调就是依次被取出、派发、执行、返回,因此后端实现相对简单;反之,应用程序员就要小心的设计程序,考虑很多诸如功能划分、线程安全之类的细节,同时也给分析及调试程序带来了很大的问题。

参考资料

除了参考Go及Erlang的代码外,本文还参考了以下文献:

Erlang & Go 的并发调度浅析

Published on:

作为当前业界比较关注的两种面向并发领域的编程语言,Erlang和Go的调度是如何实现的?

Go 语言和 Erlang 都是面向并发应用的语言,都采用轻量级线程和消息传递模型。尽管Go在语法上也支持共享,但必须以通信的方式同步方能保证其正确性。Erlang则是完全不支持进程间的共享,状态信息完全需要依靠消息彼此传递。

从底层来看,在 Google 官方编译器中,Go 语言的 Goroutine 是一种类似协程的结构,由于采用了定制的C编译器来构建,因此其上下文切换的效率要高于C库的 coroutine(只需要切换PC和栈帧,其他寄存器由函数调用者负责保存); 而在 Go 的 GCC 前端中,Goroutine 则直接由C库的 coroutine 机制实现。由于 Erlang 是基于 BEAM 虚拟机执行的,因此它的所谓 “轻量进程” 也就仅仅是 BEAM 上的概念,不对应C语言或OS级的概念。

从调度策略来看,Go 完全是协作式调度,一个执行中的 Goroutine 仅在操作被阻塞或显示让出处理器时被切换出去,Goroutine之间也没有优先级之分; Erlang 则采用一种名为“Reduction-Counting”的轮转调度策略,并且存在4个进程优先级。

值得注意的是在 Go 1.2 版之后,增加了一些简单的抢占机制,但仅有用户程序函数调用时刻才可能触发抢占的判断,并不是真正意义上的抢占,具体思想参见这里

Go 的调度器的最新版实现了M:N的调度方式,通过 GOMAXPROCS 指定最大的并行能力; Erlang 的 BEAM 虚拟机也支持SMP方式,一般情况下以系统的核心数或硬件线程数作为其调度器个数,每个调度器会绑定到一个OS线程,IO 等阻塞型操作由单独的系统线程负责调度。

Go 的负载平衡一般是采用 “Work-Stealing” 方式;Erlang则是维护一个“任务迁移队列”,调度器会定期计算任务迁移的路径。此外,Erlang也提供了“Work-Stealing” 方式作为补充。充。

Go的调度模型简介

对于线程调度器,一般有3中模型:

  • N:1,即多个用户线程运行在一个OS线程上
  • 1:1,即用户线程和OS线程一一对应
  • N:M,即一定数量的用户线程映射到一定数量的OS线程上

第一种方式的优点是用户线程切换较快,但可扩展性不好,难以很好发挥多核处理器的并行性(libtask 属于该类型); 而第二种与之相反,其能很好的利用多核并行性,但是用户线程资源开销和调度成本都比较大。 第三种方式理论上能在调度开销和并行性之间取得较好的折衷。

在Go 1.1 中,Dmirty Vyukov 对调度器进行了重新设计,由原来的 1:1 模型进化到 M:N 模型,从而使 Go 在并行编程性能上有了显著的提升。

Go 的新调度器模型主要涉及3个核心概念:M、P及G,如下图所示:

M 代表OS的线程,P代表当前系统的处理器数(一般由GOMAXPROCS 环境变量指定),G代表Go语言的用户级线程,也就是通常所说的 Goroutine。

新的调度器由1:1 进化到 M:N 的关键在于新加了 P 这个抽象结构。在多核平台上,P的数量一般对应处理器核心或硬件线程的数量,调度器需要保证所有的P都有G执行,以保证并行度。

M 必须与P绑定方能执行任务G,如下图所示:

在旧版 Go 调度器实现中,由于缺少P, 一旦运行 G (goroutine)的 M (OS线程)陷入阻塞状态(如调用某个阻塞的系统调用)时,M 对应的 OS 线程就会被操作系统调度出去,从而导致系统中其他就绪的G也不能执行;而添加了P这个逻辑结构后,一旦发生上述情况,阻塞的 M 将被与其对应的 P 剥离,RUNTIME会再分配一个 M 并将其与已经剥离出来的 P 绑定,运行其他就绪的G。这个过程如下图所示:

在实际实现中,考虑到代码执行的局部性因素,一般会倾向于推迟 M 与 P 剥离的时机。具体来说,RUNTIME中存在一个驻留的线程sysmon,用来监控所有进入Syscall 的 M,只有当 Syscall 执行时间超出某个阈值时,才会将 M 与 P 分离。

另外一个保证系统运行稳定性的方式是负载均衡机制,在Go中,用了 “任务窃取” 的方法。

首先介绍一下 Go 的任务队列,每个 P 都有一个私有的任务队列 (实现上是一个用数组表示的循环链表)以及一个公共队列(单链表表示),私有队列的功能是为了减轻公共队列的竞争开销。

当一个 P 的私有任务队列为空时,它会从全局队列中寻找就绪态的 G 执行;如果全局队列也为空,则会随机选择窃取其他 P 私有执行队列中的任务G,从而保证所有线程尽可能以最大负载工作。其示意图如下:

由于 P 的私有队列采用了数组结构,很容易计算出队列中间的位置,因此“窃取者” 采用了与 “被窃取者” 均分任务的方法,以尽可能达到负载均衡。

无论从公共队列取任务还是进行“窃取”,都会引起一定的竞争开销,因此 RUNTIME 会倾向于将新建任务或新转变为就绪态的任务添加到当前执行 P 的私有队列中。 仅当执行的任务调用 yield 机制让出处理器或进入了一个长时间执行的系统调用时,该任务才会被添加到公共队列中。

以上关于Go调度器的部分内容及图片转自:http://morsmachine.dk/go-scheduler

Erlang的调度模型简介

由于 Erlang 程序是运行在 BEAM 虚拟机之上,因此其调度器在实现上和 Go 等 Native 语言存在较大的差异,但其内部涉及的基本原理都是类似的,可以互相参考。

早期的 BEAM 虚拟机是单线程运行的,直到2006年才引入了 SMP 版本的 BEAM 虚拟机,经过了若干早期版本的演化,逐渐形成了今天的版本。最新版本的Erlang可以通过命令行参数指定是否启用 SMP 版本虚拟机。

BEAM 上的调度单位是“轻量进程”,这是一种虚拟机上的轻量级执行线索(由于 Erlang 的 process 是不共享内存的,行为更像进程而非线程,因此我们在这里叫它“轻量进程”)。每个 Erlang 进程包括一个控制块(PCB)、一个栈和私有的堆空间,一些特殊的结构,如二进制数据,ETS 表是进程间共享的,使用全局堆空间。

BEAM 虚拟机里存在一些并行的调度器,一般情况下,一个调度器会映射为一个 OS 线程,这种方式类似于早期的Go语言实现(只有M和G,没有P),每个调度器拥有各自的任务队列,调度器之间的负载平衡通过引入专门的任务迁移机制得以实现。其原理如下图所示:

通常,调度器的数量与运行平台的处理器核数或硬件线程数相等,也可以通过 BEAM 命令行参数指定,或在运行时动态修改。

在BEAM系统中,除了process之外,还存在三种其他的调度单位:端口(ports)、链入式驱动(linkd-in drivers)和系统级活动(system level activities); 这三种特殊的任务形式主要用来进行IO操作和执行其他语言的代码等功能,其部分功能很像 Go 中对执行阻塞 Syscall 任务的“剥离”机制,具体实现方法这里暂时不讨论。我们主要将精力集中在 Erlang 的 process 的调度机制上。

与 Go 不同,Erlang 的调度器是一个轮转而非协作式的调度器,每个进程创建时会被分配一个称为“reduction”的值,是一个计算量的度量(基本上等同于函数调用的次数),类似 OS 的时间片。进程每执行一定量的计算后,reduction值就会累计,一旦达到阈值,该进程就会给切换出去。这种调度方式在 Erlang 中被称为 “reduction-counting”。

采用轮转的调度方式能更好的防止程序设计不当而导致的个别进程饿死的情况,同时能够实现更好的实时性功能。

同时,Erlang还为进程提供了四个不同的优先级:max,high,normal和low。不同优先级进程按优先级调度;同级进程按轮转方式调度。每个调度器包含3个任务队列,Max和High具有单独的队列,normal和low则位于同一个队列 —— 调度器忽略一定次数的low级进程来实现二者间的差别。

Erlang 调度器之所以能够实现优先级轮转调度,主要是得益于其基于虚拟机的执行方式:由于每条Erlang指令都需要经过 BEAM 解释执行,因此 process 的运行完全处于BEAM的监控之下,BEAM可以方便的完成对进程的切换。与之相对,由于 Go 的 Goroutine 与 RUNTIME 都是 Native 执行的,其在执行上的地位是平等的,RUNTIME 没有能力切换一个执行中的 Goroutine,除非其自己调出或调用RUNTIME 功在 ,因而只能实现协作调度。

注: Go 1.2 中,添加了简单的“用户态”任务抢占机制,主要是在系统线程sysmon中监控Goroutine的执行时间,然后借助“动态栈扩展”机制,在函数调用时刻切入RUNTIME并实现抢占。这种方式虽然很巧妙,但对某些特殊的情况,如没有调用非inline函数的耗时计算等,就没有多大效果力了。

Erlang 调度器通过定期进行“任务迁移”来达到负载平衡。“任务迁移”过程在同一时刻只能由一个调度器发起。首先,根据各调度器的任务队列的长度计算一个叫“Migtation limit”的值,这个值就是各调度器就绪队列长度的均值;然后,开始计算“Migataion Path”,算法是:

  1. 计算各队列长度与“Migtation limit”差值
  2. 找到差值中正最大和负最小的队列,记录一个从前者到后者进行任务迁移路径,以达到二者都接近“Migtation limit”
  3. 重复步骤1,直到达到负载均衡

下图显示了上述算法的实例:

“Migatation Path” 计算完成后,在每个调度时刻,调度器都会检查该路径,根据其指导去抓取(pull)或推送(push)相应任务队列的任务。这一步骤完成了真正的负载均衡。

作为“任务迁移”机制的补充,Erlang调度器还支持“任务窃取”机制:当一个活跃的调度器自己的任务队列为空且不能通过“任务迁移路径”抓取任务时,它会主动窃取其他调度器任务队列上的就绪任务,如果仍然没有可供执行的任务,则该调度器进入Waiting状态。

关于Erlang调度模型,主要部分参考了这篇文章的第三章及Erlang/OTP源码。

结论

通过上述简单对比,我们大体上了解了Erlang和Go两种语言在并发任务调度上的异同,可以说二者各有优缺点:Go 的调度模型更加高效(Native)而 Erlang 则提供了更强大的功能(实时性、优先级)。

关于调度器,其实还有很多内容,如 Go 和 Erlang 都支持“垃圾回收”,而GC在两种语言中对调度的影响如何等;同时,讲Go的 M:N 调度时说到 M 一旦陷入Syscall 阻塞后,系统会创建一个新的M(OS 线程)来接管 P 及其任务队列,那么当设计一个高度并发的IO系统时(如 Web 服务器),频繁的Syscall会导致大量 OS 线程创建,从而影响性能。Go如何解决这个问题呢?

在后续分析中,会针对 IO 和 GC 部分进行更加深入的讨论,以解答余下的有关调度器的问题。

特别说明: 由于Go语言正处于高速发展的阶段,因此一些现在分析的内容可能会随时更新,在本文完成时, 其稳定版本是 1.2 , 而包含大量更新的 1.3 版也呼之欲出,因此若本文内容不免出现滞后或错误,请大家及时指正!