六游的博客小站
Channel&Select源码分析
发布于: 2019-08-27 更新于: 2019-08-28 阅读次数: 

Golang中使用了channel这个概念来实现了基于消息传递的并发,借用了CSP中channel与process这两个概念,其中process在Go语言中表现为Goroutine,各个Goroutine之间通过channel通讯来实现数据共享

在此篇文章中,我们首先会介绍Golang中channel的基本结构,以及结构中各个对象的用途。然后会介绍Golang中channel的接受、发送数据以及select的zero-case,one-case,multi-case等常见情况的大致的执行过程,抛弃运行中的大部分细节,大致了解整个过程对之后的源码阅读会有很大的帮助。最后会给出各个情况下源码执行的过程以及分析。

Channel基本结构

1.png
上图中的hchan结构体就是Golang中的channel在底层所对应的结构

  • hchan中的buf, recvx, sendx这三个字段构成了RingBuffer(下文中会有介绍)这种数据结构,用来存储Channel中的数据
  • closed用来标注该channel是否已经被关闭,我们执行关闭channel的操作的时候,并不是立马销毁hchan结构体,hchan结构体还需要继续存活来处理内部剩余的一些数据以及阻塞挂起在自身上的GoRoutine
  • recvq与sendq分别是记录在接受channel数据时与在向channel中发送数据时阻塞的GoRoutine的队列,当GoRoutine在接受或者发送数据时发生阻塞时,会将这个发生阻塞的GoRoutine包装成sudog对象,放入对应的队列,等待相关资源可用之后再释放
  • sudog中的elem是指向数据的一个指针,如果是发送消息的GoRoutine,elem指向要发送的数据,如果是接收消息的GoRoutine,elem指向数据将被读取到的位置

hchan中的RingBuffer

为了可以更好的理解在channel数据存取过程中底层RingBuffer的工作过程,我画了一幅图,图画的不是很好,但可以看懂其中的原理。首先我们给出一个容量为3的channel,图中的圆环代表channel的底层数组,r与s箭头分别代表recvx(读取索引)与sendx(发送索引)。在初始化状态中,r与s的值都指向数组头部。向channel中写入一个数据,会数据存到当前s指向的位置,并将s前移一位;读取channel中的一个数据,会读取当前r指向的位置上的数据,并将r前移一位,此时r与s重合,说明buffer中数据已空,此时再进行读取的操作会进入阻塞状态。随后向channle中写入两个数据,s会向前移两位,此时s指针若再向前移动一位则会与r“相遇”,说明buffer中数据已满,此时再进行发送的操作会进入阻塞状态

2.png

对Channel进行的各种操作

1.创建channel

  1. 分配hchan(从堆中分配,所有字段均为零值)
  2. 分配ring buffer(从堆中根据给定大小分配,如果是无缓冲channel则不需要这一步)

2.向有缓冲channel中发送数据(未满)

  1. 对channel上锁
  2. 将数据存入ring buffer
  3. 对channel解锁

3.从有缓冲channel中读取数据(非空)

  1. 对channel上锁
  2. 从ring buffer中读取数据
  3. 对channel解锁

4.向有缓冲channel中发送数据(已满)

  1. 将执行操作的GoRoutine与要发送的数据封装为sudog对象
  2. sudog对象加入sendq队列
  3. gopark(通知GoRoutine的调度器放弃这个GoRoutine,继续循环调度其他GoRoutine)

若此时出现一个新的接受方接收channel中的数据

  1. sendq队列中队首的sudog出队
  2. 将sudog中的数据放入ring buffer
  3. goready(将GoRoutine重新放入调度队列,等待被调度器调度)

5.从有缓冲channel中读取数据(空)

  1. 将执行操作的GoRoutine与接收数据的位置封装为sudog对象
  2. sudog对象加入recvq队列
  3. gopark

若此时出现一个新的发送方向channel中发送数据

  1. recvq队列中对手的sudog出队
  2. 直接将发送方的要发送的数据写入刚刚出队的sudog中的元素位置处
  3. goready

6.关闭channel

  1. 对channel加锁
  2. hchan中的closed字段修改为true
  3. ready所有sendq与recvq中的sudog
  4. 解锁

7.从已经关闭的channel中读取数据

  1. 查看底层的ring buffer中是否为空
  2. 如果为空,清空ring buffer
  3. 如果不为空,继续从buffer中读数据

channel相关操作的源码

channel各种操作的翻译

上述对channel的各种操作在编译时会被翻译成runtime包中的对应操作channel的函数,在阅读源码之前,我们要先了解一下各个操作所对应的函数,阅读源码的第一步是先要找到源码的位置

1
2
3
4
5
6
make(chan interface{}, size) => runtime.makechan(interface{}, size)
make(chan interface{}) => runtime.makechan(interface{}, 0)
channel <- value => runtime.chansend1(channel, &value)
value <- channel => runtime.chanrecv1(channel, &value)
value, ok <- channel => ok := runtime.chanrecv2(channel, &value)
close(channel) => runtime.closechan(channel)

创建channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func makechan(t *chantype, size int) *hchan {
elem := t.elem

//下面使一些安全检查,目的是确保编译器不出错误
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//计算底层 ring buffer需要的内存
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

var c *hchan
switch {
case mem == 0:
//当channel的长度为0的情况,也就是申请无缓冲channel的情况,直接申请一个hchan大小的内存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
//当channel中的元素不包含指针时,将hchan以及ring buffer的内存一次性分配
//也就是:此时hchan结构体本身与底层ring buffer是位于一块连续内存中的,这是针对Golang gc的优化
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
//当channel中的元素包含指针时,则hchan与ring buffer的内存是分开分配的
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
//初始化hchan中的一些元素
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}

发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//如果channel不存在,发送方GoRoutine将出错,并永远处于不能被调度的情况
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//一些调试信息以及安全检查
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//给channel上锁
lock(&c.lock)
//如果channel已经关闭,报错,因为不能向以及关闭的channel中发送数据
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//如果在接受阻塞队列中存在sudog,那么就取出sudog,将这个要发送的数据给这个sudog
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//如果底层ring buffer中的数据没有满,则直接将数据塞入ring buffer
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}


//以下逻辑处理channel中数据已满时发送数据的情况
//封装了当前GoRoutine对象(gp)与sudog对象(mysg),并将sudog加入sendq队列,以及gopark掉当前GoRoutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
//keepAlive是为了保活要发送到channel的数据,防止被垃圾回收掉
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}

接受数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
//channel接受数据分有ok返回值,与没有ok返回值的两个函数,分别为chanrecv1与chanrecv2,这两个函数都调用了chanrecv函数
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
//如果channel为空,报错
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//给channel加锁
lock(&c.lock)

if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

//查看sendq队列中是否有sudog,如果有并且channel的size为0,直接取得sudog的值。如果channel的size不为0,则取ring buffer中的值
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//如果sendq队列中没有sudog且ring buffer中有数据,则直接从ring buffer中拿数据
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

//如果ring buffer中没有数据,且recvq中没有sudog,那就封装当前GoRoutine与sudog,并添加进sendq队列
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

Select的原理

Select的翻译工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//零case的select
select{} => runtime.block()
//只含一个case的select,实质为chanrecv1
select{
case value <- channel: if value <- channel {
(...) => (...)
} }
//含有一个case和default的select,实质为chanrecv2
select{
case value <- channel: if value, ok <- channel; ok {
(...) => (...)
default: } else {
(...) (...)
} }
//含有多个case 的select
select{
case value <- channel1:
(...) => runtime.selectgo()
case value <- channel2:
(...)
}
--- 本文结束 The End ---