-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathtime_wheel_node.go
114 lines (97 loc) · 2.67 KB
/
time_wheel_node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 2020-2024 guonaihong, antlabs. All rights reserved.
//
// mit license
package timer
import (
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/antlabs/stl/list"
)
const (
haveStop = uint32(1)
)
// 先使用sync.Mutex实现功能
// 后面使用cas优化
type Time struct {
timeNode
sync.Mutex
// |---16bit---|---16bit---|------32bit-----|
// |---level---|---index---|-------seq------|
// level 在near盘子里就是1, 在T2ToTt[0]盘子里就是2起步
// index 就是各自盘子的索引值
// seq 自增id
version atomic.Uint64
}
func newTimeHead(level uint64, index uint64) *Time {
head := &Time{}
head.version.Store(genVersionHeight(level, index))
head.Init()
return head
}
func genVersionHeight(level uint64, index uint64) uint64 {
return level<<(32+16) | index<<32
}
func (t *Time) lockPushBack(node *timeNode, level uint64, index uint64) {
t.Lock()
defer t.Unlock()
if node.stop.Load() == haveStop {
return
}
t.AddTail(&node.Head)
atomic.StorePointer(&node.list, unsafe.Pointer(t))
//更新节点的version信息
node.version.Store(t.version.Load())
}
type timeNode struct {
expire uint64
userExpire time.Duration
callback func()
stop atomic.Uint32
list unsafe.Pointer //存放表头信息
version atomic.Uint64 //保存节点版本信息
isSchedule bool
root *timeWheel
list.Head
}
// 一个timeNode节点有4个状态
// 1.存在于初始化链表中
// 2.被移动到tmp链表
// 3.1 和 3.2是if else的状态
//
// 3.1被移动到new链表
// 3.2直接执行
//
// 1和3.1状态是没有问题的
// 2和3.2状态会是没有锁保护下的操作,会有数据竞争
func (t *timeNode) Stop() bool {
t.stop.Store(haveStop)
// 使用版本号算法让timeNode知道自己是否被移动了
// timeNode的version和表头的version一样表示没有被移动可以直接删除
// 如果不一样,可能在第2或者3.2状态,使用惰性删除
cpyList := (*Time)(atomic.LoadPointer(&t.list))
cpyList.Lock()
defer cpyList.Unlock()
if t.version.Load() != cpyList.version.Load() {
return false
}
cpyList.Del(&t.Head)
return true
}
// warning: 该函数目前没有稳定
func (t *timeNode) Reset(expire time.Duration) bool {
cpyList := (*Time)(atomic.LoadPointer(&t.list))
cpyList.Lock()
defer cpyList.Unlock()
// TODO: 这里有一个问题,如果在执行Reset的时候,这个节点已经被移动到tmp链表
// if atomic.LoadUint64(&t.version) != atomic.LoadUint64(&cpyList.version) {
// return
// }
cpyList.Del(&t.Head)
jiffies := atomic.LoadUint64(&t.root.jiffies)
expire = expire/(time.Millisecond*10) + time.Duration(jiffies)
t.expire = uint64(expire)
t.root.add(t, jiffies)
return true
}