golang源码分析 定时器

本文分析golang中的timer实现原理、工作流程、及正确使用方法。

timer造成资源泄露

在项目中发现随着系统运行时间增加,系统资源cpu/memory消耗逐渐增加,使用pprof打印系统资源占用。发现siftdownTimer、timerproc在占用系统资源。

1
2
9.73s 30.36% 66.55%     10.10s 31.51%  runtime.siftdownTimer
0.13s 0.41% 95.48% 16.48s 51.42% runtime.timerproc

很显然,这是错误使用timer造成系统资源泄露,为了解决这个问题需要知道runtime 层timer是如何实现的。

timer 工作原理

runtime中定义了timers变量用来存储系统中所有的timer。timers长度为64,每个含有一个存储timer的timersBucket。CacheLineSize 是CPU假定的cache line 大小。pad用来填充timersBucket到chace line的间距,以避免在不同的 P 之间发生 false sharing,提高多核运行时内存操作效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// timers contains "per-P" timer heaps.
//
// Timers are queued into timersBucket associated with the current P,
// so each P may work with its own timers independently of other P instances.
//
// Each timersBucket may be associated with multiple P
// if GOMAXPROCS > timersLen.
var timers [timersLen]struct {
timersBucket

// The padding should eliminate false sharing
// between timersBucket values.
pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
}

timer操作函数调用流程如下图所示。
timer 函数调用流程图

主要分析一下以下两个问题。
1、是如何将timer加入堆栈。
2、如何进行多timer调度并定时唤醒timer。

将timer加入到timersbucket中的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func addtimer(t *timer) {
tb := t.assignBucket() //分配bucket
lock(&tb.lock)
tb.addtimerLocked(t)
unlock(&tb.lock)
}

// 对g.m.p 的 id 模 64 求余,并分配相应的bucket
func (t *timer) assignBucket() *timersBucket {
id := uint8(getg().m.p.ptr().id) % timersLen
t.tb = &timers[id].timersBucket
return t.tb
}

// 向时间堆中添加一个 timer,如果时间堆第一次被初始化或者当前的 timer 比之前所有的 timers 都要早,那么就启动(首次初始化)或唤醒(最早的 timer) timerproc
// 函数内假设外部已经对 timers 数组加锁了
func (tb *timersBucket) addtimerLocked(t *timer) {
// when 必须大于 0,否则会在计算 delta 的时候溢出并导致其它的 runtime timer 永远没法过期
if t.when < 0 {
t.when = 1<<63 - 1
}
//timer加入末端
t.i = len(tb.t)
tb.t = append(tb.t, t)
//向上调整timer在堆中排序
siftupTimer(tb.t, t.i)
if t.i == 0 {
// 新插入的 timer 比之前所有的都要早
if tb.sleeping {
// 修改 timerBucket 的 sleep 状态
tb.sleeping = false
// 唤醒 timerproc
// 使 timerproc 中的 for 循环不再阻塞在 notesleepg 上
notewakeup(&tb.waitnote)
}
// 同一个 P 上的所有 timers 如果都在 timerproc 中被弹出了
// 该 rescheduling 会被标记为 true
if tb.rescheduling {
// 该标记会在这里和 timejumpLocked 中被设置为 false
tb.rescheduling = false
goready(tb.gp, 0)
}
}
// 如果 timerBucket 是第一次创建,需要启动一个 goroutine
// 来循环弹出时间堆,内部会根据需要最早触发的 timer
// 并进行相应时间的 sleep
if !tb.created {
tb.created = true
go timerproc(tb)
}
}

timerproc是定时器调度goruntine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// timerproc 负责处理时间驱动的事件
// 在堆中的下一个事件需要触发之前,会一直保持 sleep 状态
// 如果 addtimer 插入了一个更早的事件,会提前唤醒 timerproc
func timerproc(tb *timersBucket) {
tb.gp = getg()
for {
// timerBucket 的局部大锁
lock(&tb.lock)
// 被唤醒,所以修改 sleeping 状态为 false
tb.sleeping = false
// 计时
now := nanotime()
delta := int64(-1)
// 在处理完到期的 timer 之前,一直循环
for {
// 如果 timer 已经都弹出了
// 那么不用循环了,跳出后接着睡觉
if len(tb.t) == 0 {
delta = -1
break
}
// 取小顶堆顶部元素
// 即最近会被触发的 timer
t := tb.t[0]
delta = t.when - now // 还差多长时间才需要触发最近的 timer
if delta > 0 {
// 大于 0 说明这个 timer 还没到需要触发的时间
// 跳出循环去睡觉
break
}
if t.period > 0 {
// 这个 timer 还会留在堆里
// 不过要调整它的下次触发时间
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(tb.t, 0)
} else {
// 从堆中移除这个 timer
// 用最后一个 timer 覆盖第 0 个 timer
// 然后向下调整堆
last := len(tb.t) - 1
if last > 0 {
tb.t[0] = tb.t[last]
tb.t[0].i = 0
}
tb.t[last] = nil
tb.t = tb.t[:last]
if last > 0 {
siftdownTimer(tb.t, 0)
}
t.i = -1 // 标记 timer 在堆中的位置已经没有了
}
// timer 触发时需要调用的函数
f := t.f
arg := t.arg
seq := t.seq
unlock(&tb.lock)
// 调用需触发的函数
f(arg, seq)
// 把锁加回来,如果下次 break 了内层的 for 循环
// 能保证 timeBucket 是被锁住的
// 然后在下面的 goparkunlock 中被解锁
lock(&tb.lock)
}
if delta < 0 || faketime > 0 {
// 说明时间堆里已经没有 timer 了
// 让 goroutine 挂起,去睡觉,通过调用goready(gp)将gorouting转换成runnable状态。
tb.rescheduling = true
goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
continue
}
// 说明堆里至少还有一个以上的 timer
// 睡到最近的 timer 时间
tb.sleeping = true
tb.sleepUntil = now + delta
noteclear(&tb.waitnote)
unlock(&tb.lock)
// 内部是 futex sleep
// 时间睡到了会自动醒
// 或者 addtimer 的时候,发现新的 timer 更早,会提前唤醒
notetsleepg(&tb.waitnote, delta)
}
}

以上就是timer加入heap及timerproc定期弹出timer的流程。

siftupTimer,siftdownTimer用于调整timer在heap中的顺序。timer在TimersBucket中的存储结构为四叉堆。四叉堆多数是以数组作为它们底层元素的存储,根节点在数组中的索引是0,存储在第n个位置的父节点它的子节点在数组中的存储位置为4n与4n+1,4n+2,4n+3。对于一个节点的要求只有和其父节点以及子节点之间的大小关系。相邻节点之间没有任何关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
向上调整
func siftupTimer(t []*timer, i int) {
// 先暂存当前刚插入到数组尾部的节点
when := t[i].when
tmp := t[i]

// 从当前插入节点的父节点开始
// 如果最新插入的那个节点的触发时间要比父节点的触发时间更早
// 那么就把这个父节点下移
for i > 0 {
p := (i - 1) / 4 // parent
if when >= t[p].when {
break
}
t[i] = t[p]
t[i].i = i
i = p
}

// 如果发生过移动,用最新插入的节点
// 覆盖掉最后一个下移的父节点
if tmp != t[i] {
t[i] = tmp
t[i].i = i
}
}

向下调整
func siftdownTimer(t []*timer, i int) {
n := len(t)
when := t[i].when
tmp := t[i]
for {
c := i*4 + 1 // 最左孩子节点
c3 := c + 2 // 第三个孩子节点
if c >= n {
break
}
w := t[c].when
if c+1 < n && t[c+1].when < w {
w = t[c+1].when
c++
}
if c3 < n {
w3 := t[c3].when
if c3+1 < n && t[c3+1].when < w3 {
w3 = t[c3+1].when
c3++
}
if w3 < w {
w = w3
c = c3
}
}
if w >= when {
break
}
t[i] = t[c]
t[i].i = i
i = c
}
if tmp != t[i] {
t[i] = tmp
t[i].i = i
}
}

timer使用方式及常见问题

常用的是 time.After() 和 time.NewTicker()。两者都会新建一个timer,time.After 和 time.Tick 不同,区别是After是一次性的定时器,Ticker是周期性的。After触发后 timer 本身会从时间堆中删除。

当系统中启动timer数量过多,需要存储的空间及调度的成本会越来越多。所以在使用timer时需要注意:

1、当使用NewTicker新建一个ticker的时候,一定要用Stop函数将其关闭。

2、不要用以下这种方式使用timer。第一段代码,如果<-ch 这个 case 每次执行的时间都很短,每次进入 select,time.After 都会分配一个新的 timer。因此会在短时间内创建大量的无用 timer,虽然没用的 timer 在触发后会消失,但这种写法会造成无意义的 cpu 资源浪费。

1
2
3
4
5
6
7
for {
select {
case <-time.After(time.Second):
println("time out, and end")
case <-ch:
}
}

而是应该在循环外定义timer。

1
2
3
4
5
6
7
8
timer := time.NewTimer(time.Second)
for {
select {
case <-timer.C:
println("time out, and end")
case <-ch:
}
}