Erlang & Go 消息传递机制初探
上一篇文章介绍了 Go 和 Erlang 在调度器的实现,本文将简要介绍这两种并发语言的消息传递机制
简要对比
Erlang和Go虽然在实现及功能上差异较大,但是都支持高并发的轻量级用户任务(Erlang的轻量进程,Go的Goroutine), 并且都采用了消息传递的方式作为任务间交互的方式。
在Erlang中,采用了一种比较纯粹的消息传递机制,进程间几乎没有任何形式的数据共享,只能通过彼此间发送消息进行通信; 而Go虽然是基于共享内存的,但是也必须通过消息传递来进行共享数据的同步。 可以说消息传递机制是两种语言任务间交互的首要方式。
但是在具体实现中,鉴于两种语言的差异,也表现为不同的形式:
- 在Erlang中,进程之间以彼此的Pid作为标识进行消息的发送,一切数据都仅可以消息的形式在进程间复制
- 在Go中,不同的Goroutine间通过共享的channel进行通信,由于Go本质上是建立在共享存储模型上的,因此全局变量、参数甚至是一部分栈变量都是可以共享的,通信一般控制在较小的规模,仅用来保证共享的同步语义
下面将分别就Erlang和Go各自实现分别进行分析介绍。
Erlang中的消息传递机制
语法
消息传递是Erlang语言并发机制的基础。在Erlang中,主要包含以下并发原语:
- 创建一个新的并发轻量进程,用于执行函数Fun,并返回其进程标识符Pid:
|
- 向标识符为Pid的进程发送消息(注: 由于
Pid ! M
的返回值是消息M本身,因此可以用类似Pid1 ! Pid2 ! Pid3 ! … M
的语法来向多个进程发送同一个消息):
|
- 用
receive ... end
来接收一个发送给当前进程的消息,语法如下(注: 当一个进程接收到一个消息时,依次尝试与Pattern1
(及Guard1
),Pattern2
(及Guard2
), … 进行模式匹配,若成功,则对相应的Expressions
求值,否则继续后续匹配):
|
- 对于接收操作,我们还可以为其设置一个超时控制,一旦超过某个预设的时长仍没有消息到达,则执行相应的超时操作,语法如下:
|
- 可以建立一个只包含超时的接收操作,事实上就相当于一个延时操作:
|
以上就是Erlang中基本的消息传递的接口。
内部实现
相对于Go而言,Erlang的发展历史更加悠久,因此代码复杂程度要大大高于Go这门新型语言。 因此在分析过程中,主要还是以介绍实现机制为主,具体的数据结构及源代码实现就不作过分细致的剖析了。
Erlang的BEAM解释器的代码位于 otp/erts/emulator/beam/
路径下,
其中与Send相关的代码位于 otp/erts/emulator/beam/erl_message.c
中,
而与Receive相关的代码则位于 otp/erts/emulator/beam/beam_emu.c
中。
之所以不在一处实现,是因为Erlang把接收操作作为一种BEAM基本指令来实现,而发送操作则以内部函数的方式实现。
在具体实现中,Erlang采用了消息Copy的方法实现消息传递——消息从发送进程的堆拷贝到接收进程的堆:
- 在发送时,如果接收进程正在另一个调度器上执行,或者有其他并发的进程也在向其发送消息时,本次发送就存在竞争风险,不能直接完成
- 发送者会在堆上为接收进程分配一个临时区域并将消息拷贝到该区(该内存区将在后续进行垃圾收集时合并到接收进程的堆空间)
- 拷贝完成后,会将指向这块新区域的指针链入接收进程的消息队列
- 如果接收进程正处于阻塞态,则将其唤醒并添加到就绪队列中
在SMP版Erlang虚拟机中,进程的消息队列由两部分组成—— 一个公共队列和一个私有队列。 公共队列用于接收其他进程发送的消息,通过互斥锁加以保护;私有队列用来减少对锁的争用: 接收进程首先在接收队列中查找符合匹配的消息,如果没有,再从公共队列中复制消息到私有队列中。
(在非SMP的Erlang虚拟机中,进程只有一个消息队列。)
接收进程发现当前任务队列上没有匹配的消息后,会跳转执行wait_timeout: 设置一个定时器并阻塞在该定时器上—— 当该定时器触发或者有新消息到来时,都会唤醒接收进程。
由于进程的消息缓冲是以队列形式维护的,因此从发送进程角度来看,可以认为消息缓冲的大小是无限的, 因此Send操作一般不会阻塞,这点注意与后面要将的Go消息传递机制相区别。 这也就是为什么Erlang中仅对Receive操作提供超时响应机制的原因了!
高级特性
与Go主要面向SMP服务器应用不同,Erlang是一种面向分布式集群环境的编程语言, 因此消息传递除了支持本地进程间的通信外,还支持分布式环境的进程间通信。
基本流程是:
- 首先通过Pid(或在分布式环境上的等价概念)查询“名字服务”
- 找到该远程进程所在的远程主机地址信息,进而通过套接字进行后续发送操作
- 远程Erlang虚拟机进程接收到消息,再进一步分析,将其派发到指定的进程消息队列中
进一步的实现细节会涉及Erlang分布式处理的机制,这里就不展开分析了,待后续单开主题讨论。
Go中channel机制介绍
语法
在Go中,channel结构是Goroutine间消息传递的基础,属于基本类型,在runtime库中以C语言实现。 Go中针对channel的操作主要包括以下几种:
- 创建:
ch = make(chan int, N)
- 发送:
ch <- l
- 接收:
l <- ch
- 关闭:
close(ch)
另外,还可以通过select语句同时在多个channel上进行收发操作,语法如下:
|
此外,基于select的操作tai还支持超时控制,具体的语法示例如下:
|
尽管Go以消息传递作为Goroutine间交互的主要方式,但是基于channel的通信又必须依赖channel引用的共享才能得以实现, 因此Go语言绝不是一种纯粹的消息传递语言。 一般而言,channel的引用可以通过以下几种方式在不同Goroutine间共享:
- “父Goroutine” 的栈变量,通过用Go语句创建Goroutine时的参数进行传递
|
- “父Goroutine” 的栈变量,Go创建的Goroutine以闭包作为执行函数,栈变量自动共享
|
- 由于Go的垃圾收集器认为channel的引用只能在栈上,因此一般不用全局的引用进行共享
另外,在创建channel时,还可以指定是否采用buffer及buffer的大小,默认buffer为0 。 当buffer大于0时,可以进行异步的消息传递:接收方只有在当前buffer为空时才阻塞,而发送方则只有在buffer满时才阻塞。 详细情形将在后面介绍。
内部实现
核心数据结构
Go中channel的实现代码在src/pkg/runtime/chan.c
中,其核心数据结构如下
(对于gccgo,其实现代码在libgo/runtime/chan.c中,由于使用不同的C编译器及语法,因而数据结构实现略有不同):
|
我们可以按照表示属性还是表示状态将Hchan的内部成员进行分类并逐一分析。
表示channel属性的成员如下,这些成员在用make进行初始化后确定,并且在后续操作中不会变化:
|
其中,
dataqsize
是前文所说的buffer的大小,即make(chan T, N)
中的Nelemsize
是channel对应单个元素的大小,pad
主要用于表示该类型元素的内存对齐边界elemalg
也对应于类型属性,主要是一些列具体操作该类型的函数指针,如copy、hash、equal及print等接口
另外一类成员则用来表示当前channel的状态,是随着程序运行而发生变化的:
|
我们按功能将状态信息分为三类:
closed
表示当前channel是否处于关闭状态,在make创建后,此域被置为false,即channel处于打开状态;通过调用close将其置为true,channel关闭,之后的任何send/recv操作都会失败- 由
count
、sendx
和recvx
一同构成了一个环形buffer的状态域,其中count
表示当前buffer中的占用数量,sendx
和recvx
分别表示当前的发送位置和接收位置,注意,count
的大小不能超过dataqsize
。另外,还有一个“隐形”的域,即真正暂存数据的buffer空间。它并不在Hchan的域中,而是在make创建channel时被地位在紧随Hchan对象后面的相应大小的内存区域,具体代码通过chanbuf(c, i)
这个宏来访问对应的区域。 - 最后一部分是两个等待队列,分别用来存放在发送和接收过程中被阻塞的任务,由于发送接收必须是互斥操作,因此必须有相应的锁类型(注:这里用到了Plan9的C扩展语法,用内部结构体表示一种类似继承的语义)
Send/Recv实现分析
我们以Send操作为例,介绍在单个channel上的具体流程,基于select语句的多channel操作将在下一节进行介绍。
首先,我们来看Send操作在Go runtime中的接口定义:
|
解释一下主要参数:
* ChanType *t
: 是channel对应的数据类型
* Hchan * c
: 就是当前操作的channel的指针
* byte *ep
: 待发送数据的缓冲区指针,由于Send操作时一个统一的接口,因此使用byte型指针
* bool *pres
: 这个真针不为NIL时,Send为非阻塞式操作,如果不能发送,就直接返回并设* pres = false
* void * pc
: 当前操作的PC
在介绍具体函数之前,我们需要先介绍一下SudoG
这一结构,该结构是前面介绍的等待队列WaitQ
类型的节点元素类型,其定义如下:
|
这个结构比较简单,g
就是当前阻塞的Goroutine,elem
是响应操作调用时传入的数据缓冲指针(*ep),
link
域就是单链表的下一项,releasetime
用于系统性能统计(profiling)。 最后的selgen
在select操作时被用到,后面详述。
下面具体分析该函数流程:
- 首先,在判断c非空,及一系列初始化操作后,当前任务尝试获得c的锁
- 成功锁定当前c后,先判断是否带buffer,若有buffer,则进入async模式,否则继续
- 若c已close,则返回(从close态返回是实现在channel上的Range操作的关键!)
- 否则,尝试从
c->recvq
中取出一个任务(SudoG
类型),如果返回非空,则该任务是一个正在阻塞状态的接收任务,执行:- 将当前的ep指针指向的数据copy到取出的
SudoG
类型元素的elem
指针区域 - 将
SudoG
中的releasetime
设为当前系统tick值 - 调用
runtime·ready
唤醒对应的g,本次Send操作完成
- 将当前的ep指针指向的数据copy到取出的
- 如果返回
NIL
,表明当前没有等待的接收任务。这时需要判断pres
指针是否为空。若不为空,将其所指向的bool
变量设为false
,并退出 - 反之,则当前任务挂起:
- 在当前栈上新建
SudoG
对象,并用当前g
和ep
初始化 - 将该
SudoG
链入c->sendq
- 调用
runtime·park
将自己阻塞,同时释放c的锁
- 在当前栈上新建
- 当前
g
对象被唤醒后,首先判断是否是c已关闭,进而根据releasetime
值唤醒超时事件,最后返回
对于带buffer的channel,将进入async模式进行处理:
- 首先判断当前缓冲区是否已满,若满,则阻塞,过程与上面不带buffer的情况类似
- 若缓冲区不满,则将ep指针指向的区域按c元素大小复制到环形缓冲区,并修改缓冲区内部状态
以上就是Send操作的全部实现,其中有一个比较重要的函数,即从等待队列中取出一个元素的操作dequeue(WaitQ *)
还没有仔细分析。
主要原因是其涉及到下一节要介绍的select机制,因此退后分析。至于其反操作enqueue(WaitQ *)
则就是一个简单的链表尾部插入,不需要额外分析。
对于Recv操作,其步骤基本上与Send类似,仅仅是操作的队列和copy的方向有别,这里不再赘述。
为了更好的理解上述操作,我做了一组简单的示意图,以“异步channel”(即带缓冲区的channel)为例,来描述这一过程:
- 任务
Go#1
向channel发送数据#1
,此时channel的缓冲区是空的,并且没有正在阻塞的任务,则数据#1
被拷贝到缓冲区,缓冲区指针后移,Go#1
完成操作退出(非阻塞),如图1所示:
- 当channel的缓冲区被填满后,当另一个任务
Go#4
再进行发送操作时,就只能将自己及待发送的数据#4
以某种方式链入channel的发送等待队列中,并挂起(阻塞),如图2所示:
- 这时,任务
Go#5
发起针对这个channel的Recv操作,首先将从缓冲区读取并拷贝数据,直到缓冲区为空:
- 假设此时缓冲区已空,又有一个任务
Go#8
发起一个Recv操作,它将从发送队列取出一个阻塞的数据,即之前Go#4
发送的#4
数据,然后读取并拷贝该数据,并将Go#4
唤醒:
Select操作
介绍
Go在语言级提供了一个select语句用来支持同时在多个channel上进行操作的功能,很类似于Unix系统上的select系统调用,只不过后者操作的类型时文件描述符而已。
与Unix的select操作类似,任意一个注册的channel操作返回则select操作亦返回。 Unix的select通过逐一测试文件描述符集合判断是当前操作,而Go则通过不同的case语句指示当前响应的channel操作。
两外,二者也都支持超时控制。但是Unix的超时控制是建立在分时调度机制上的,更加可靠; 而Go的调度器如前所述,是协作式的,因此超时控制不可能精确的实现。
主要数据结构
Go语言将select操作作为语言一级支持,为此额外定义了Scase
和Select
两个结构体来表示一个select语句。
- 我们先来简单的介绍一下
Scase
,其对应于select语句中的每个case/default块,其定义如下:
|
Scase
结构说明:- 该类型可以认为是上节介绍的
SudoG
类型的派生类型,其“基类型”对象就是sg
(因此注释里要求它必须位于起始位置) chan
就是当前这个case操作的channel指针pc
是这个case成功后的返回地址,在下节中揭晓其用法kind
是当前case的操作类型,可以是CaseRecv
、CaseSend
和CaseDefault
之一recievedp
指针用来指示当前操作是否是非阻塞的,和之前介绍的情形类似so
这个变量比较有意思,它主要是用来作返回时的判断,在后面详述
- 该类型可以认为是上节介绍的
接下来介绍
Select
结构,其定义如下:
|
Select
结构说明:- 该类型的中的
tease
表示该select语句中包含的case/default块的个数,由于select语句的case数在编译时刻就可以确定,因此Select
被以定长方式实现 ncase
表示当前已添加的case块数,虽然tcase
在创建时就已知了,但每个case块却是在parse的过程中逐个添加的,因此需要一个描述当前大小的成员pollorder
是在case块列表上操作的顺序,考虑到性能因素,Go的每次select操作都采用了随机顺序访问每个case中的channellockorder
表示在当前select语句上所有channel的加锁顺序,由于select操作是一个互斥操作,并且需要同时获取所有channel的锁,因此必须保证加锁按照一个约定的顺序进行,以避免不同select间造成死锁。Go的实现是按照channel对应虚地址大小作为加锁的顺序scase[1]
及其后的连续内存区域是包含全部Scase
的数组,大小在创建Select
时就已知了,所以用固定大小数组存放
- 该类型的中的
代码生成
我们知道,Go的parser遇到select语句时会将其翻译成对应的runtime库函数调用以实现其功能,那么这种映射是如何实现的呢?
- 首先,在select开始时调用
void runtime·newselect(int size, …)
创建一个Select
实例 - 然后,每遇到一个case或default语句,就调用响应类型的操作向
Select
实例中添加一个Scase
元素:void runtime·selectsend(Select *sel, Hchan *c, void *elem, bool selected)
void runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected)
void runtime·selectdefault*(Select *sel, bool selected)
- 每次调用上面3个函数,都会将返回PC值记录在相应的Scase实例中
- 最后,添加完全部case后,调用
void * runtime·selectgo(Select ** selp)
进入真正的Select操作(算法详见下节) - 一旦发现channel操作,则该函数返回对应case记录的返回地址,执行该case对应的函数块
流程分析完了,是否觉得有点奇怪?问题出在哪儿?
对了,就是在selectgo返回时,程序如何判断目前是在注册阶段Scase阶段返回的,还是从selectgo返回的呢?
我们以runtime·selectsend
操作为例解释,从Go语言的角度来看,这个函数的Go型接口定义如下:
|
这下明白了吧?由于Google Go编译器采用了Plan9 风格的C编译器,使得该函数在Go程序看来实际上是返回了一个bool型的值selected,
该bool值即用来表明当前返回的状态,直接从selectsend
返回还是从selectgo
间接返回。
还记得刚才没有将明白的Scase
中的so
成员吗?你现在可以把它理解成一个到selected的指针(实际上是一个到该指针的偏移地址)。
需要注意的是,这里之所以可以这么实现,主要是Google Go编译时甚至使用了一套自己定义的C编译链接工具,对参数和返回值在栈空间的分配做了特别的约定。
而在gccgo中,由于采用gcc 编译,因此就必须按照较为传统的方式,由selectgo
返回活跃的Scase
的编号,然后再根据编号分别进行处理。
下面列出一个Go中的select语句翻译为C语言后的形式,作为参考 (事实上Go编译器会直接将Go代码翻译成汇编语言,这里用C描述主要是为了简化描述):
|
|
算法实现
经过了上面的介绍,我们大概了解了select语句的生成规则,现在将视线集中到其核心函数runtime·selectgo
函数的实现上来。
应该说该函数实际上类似于上节介绍的Send、Recv操作在多channel情况下的扩展,基本流程也比较类似。 但是需要特别注意以下几点:
- 操作是互斥的,因此需要在进行操作前一次性获取全部待处理channel上的锁
- select语句只要有一个channel响应即可返回,无需等待所有channel响应
- 如果当前没有channel可以响应且不存在default语句,则当前
g
必须在所有channel的相应等待队列上挂起 - 只能有一个响应的channel可以唤醒之前挂起的
g
- 另外,考虑到性能原因,select操作的顺序不一定按照程序中声明的顺序操作
明确了上面的要点后,selectgo
的实现便呼之欲出了。
- 首先,确定poll操作顺序和加锁的顺序:poll按照随机排序,加锁则按照channel虚拟地址排序
- 然后进入加锁阶段,按照#1中的顺序对每个channel加锁
- 进入主循环,按照#1中计算的poll顺序依次遍历所有的channel,如果当前channel可以响应(对应等待队列不空或存在可用的buffer),则跳转到#7
- 所有channel都不可响应,返回default的返回PC;如果没有default语句,则进入阻塞态
- 根据操作类型将当前g依次加入每个channel的相应WaitQ中,调用
runtime·park
进入阻塞态,同时释放所有channel锁(注意顺序!) - 当前g被唤醒,说明有channel已响应,保存该channel引用,再次获得所有的锁,并从其他channel的等待队列中将当前g删除
- 完成响应的copy操作,释放所有锁,返回活跃channel对应
Scase
中记录的PC地址
整个过程大体如下图所示:
这样,selectgo
就基本上介绍完了,但是还遗留了一个问题。某个任务调用select阻塞时,会将自身对应的SudoG
添加到所有channel的等待队列上,
那么如果有多个channel在任务被唤醒并完成加锁前发生响应,该如何处理呢?
要解决这个问题,就必须弄清楚chan.c中的dequeue
操作是如何实现的,因为每个任务被唤醒前,必须经过dequeue
从相应的等待队列上获得,
因此只要保证对于在Select语句中的操作,只能被dequeue
成功返回一次即可。
之前讲过,每个SudoG
都包含一个selgen
成员,其实在每个G
中,也包含一个selgen
成员。对于一个不在Select中的Send/Recv操作,
其随影的selgen
置为常量NOSELGEN
,可以被dequeue
直接返回。
否则,在Select操作的#5中,将G
的selgen
值赋给SudoG
的selgen
。
在进行dequeue
操作时,一旦发现SudoG
的selgen
不是NOSELGEN
,则调用如下原子操作:
|
由 CAS 原子操作定义可知,该阻塞的 goroutine 尽可能被唤醒一次。
初窥“超时控制”机制
这里我们先简单介绍一下Go语言select中的超时控制机制,实际上,在Go中,定时控制并非为select语句所独有,而是一种通用的机制。 在这里我们仅针对select的例子简单的介绍一下Go的定时控制机制,后续有空时再作深入分析。
回到开始时那个超时控制的例子:
|
在第二个case语句中,我们调用了time.After()函数定义了一个5秒钟的定时器,我们来看看这个函数的原型定义:
|
这个函数实际上返回一个channel类型,并在定义的时间到时,向该channel发送一个Time型数据。
了解了这些,上面的例子就被统一起来了,每个case仍然是针对channel的,如果ch在5秒内响应,则执行ch对应case的语句; 否则第二个case的channel响应,也就进入了超时的处理过程中。
在Go的内部实现中,所有相关的定时操作都是通过time.Timer
或time.Ticker
类型实现,它们内部都对应一个channel和一个runtimeTimer
类型。
通过runtimeTimer
可以在Go语言上下文中注册定时事件到runtime层,Go的runtime层存在一个后台任务(Groutine)来实现定时器事件唤醒功能。简单的说,这个后台任务的工作就是轮训系统中的定时器,一旦发现到期的定时器,就向其绑定的channel发送数据,则阻塞在该channel上的任务就被唤醒了。
关于runtimeTimer的实现详情,请参看Go的源代码src/pkg/runtime/time.goc
。
由于这部分本身和channel机制关联并不密切,后面有机会再单开主题讨论。
总结
通过上面分析,我们不难得出两种语言在消息传递上的区别与共性, 我总结了以下几点,请大家补充、指正。
区别:
Go基于channel进行通信,而channel引用必须在任务间共享访问; Erlang利用Pid进行发送,而接收时仅根据消息内容区别后续处理的方式,过程中没有任何形式的数据共享
Go的Send/Recv操作都可能阻塞; Erlang一般仅有接收操作可能引起阻塞
Go的select可以同时等待多channel上的Send/Recv操作,并提供超时处理机制; Erlang的发送操作仅能每次针对一个进程,接收操作针对本进程的消息队列,也提供超时机制
Go的消息传递仅支持本机Goroutine间通信; Erlang消息传递支持在分布式环境上的进程间通信
共性:
- Go和Erlang的消息传递都提供了语法级支持,并且都是语言的重要组成部分
- Go和Erlang的消息传递都是通过数据的Copy实现的,因此都强调“小消息,大计算”的处理方式
结尾的思考
Go语言的并发模型源自Hoare的CSP模型(Communicating Sequential Processes, 国内译为“通信顺序进程”,台湾译为“交谈循序程序”),可以被视为一种类似 Unix 管道的东西。 它的特点是每个任务程序的编写完全按照其执行的时序进行,方便编程及调试分析。
与之相对的另一种并发编程模式就是基于异步消息及回调的模型,比如 libev 以及前阶段非常火的 Node.js, 这类模式的特点是调度以回调函数为单位,所有的消息发送都是异步的,程序员看到的就是消息类型及其对应的处理函数。
我认为两种方式各有优缺点,Go的方案更符合人类思考问题的习惯,编程和调试效率较高;但由于需要保证高并发性,就要实现用户任务层的调度,例如采用协程(如Go、rust等),或是采用基于虚拟机的方案(如 Erlang、Lua等),这无疑增加了上下文切换的开销,同时也增加了runtime或虚拟机的实现难度。
基于“异步消息/回调函数”的方案恰好相反,由于调度粒度变小为函数,导致实现上比较简单,顺序编程的固有性质使得系统开发者不需要考虑保护上下文或者类似的东西,每个回调就是依次被取出、派发、执行、返回,因此后端实现相对简单;反之,应用程序员就要小心的设计程序,考虑很多诸如功能划分、线程安全之类的细节,同时也给分析及调试程序带来了很大的问题。
参考资料
除了参考Go及Erlang的代码外,本文还参考了以下文献:
- Joe Arnstrong “Programming Erlang — Software for a Concurrent World”
- E·Dervishi Evaluate the benefits of SMP support for IO-intensive Erlang applications
- Ivo Balbaert “The Way to Go”
- “Go channel 实现” http://alpha-blog.wanglianghome.org/2012/04/13/go-channel-implementation/