-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathtask_manager.go
129 lines (116 loc) · 2.76 KB
/
task_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package crontab
import (
"context"
"github.com/gohouse/golib/t"
"github.com/sirupsen/logrus"
"sync"
"sync/atomic"
)
var incId int64
func GetId() int64 { return atomic.AddInt64(&incId, 1) }
type TaskManager struct {
store *sync.Map
wg *sync.WaitGroup
ctx context.Context
opt *Options
done chan struct{}
}
func NewTaskManager(opts ...OptionHandleFunc) *TaskManager {
var opt = &Options{}
for _, item := range opts {
item(opt)
}
if opt.logger == nil {
opt.logger = logrus.New()
}
return newTaskManager(opt)
}
func newTaskManager(opt *Options) *TaskManager {
return &TaskManager{&sync.Map{}, &sync.WaitGroup{}, context.Background(), opt, make(chan struct{})}
}
func (job *TaskManager) Add(title string, cron *CronTab, callback HandleFunc, args ...interface{}) string {
var taskId = t.New(GetId()).String()
args = append(args, taskId, "-", title)
cron.opt = job.opt
var so = TaskObject{
cron: cron,
callback: callback,
args: args,
title: title,
taskId: taskId,
}
job.store.Store(taskId, &so)
job.opt.logger.Infof("添加任务:%s - %s", taskId, title)
return taskId
}
func (job *TaskManager) AddGroup(tl func(*TaskManager)) {
tl(job)
}
func (job *TaskManager) Start(keys ...string) {
if len(keys) > 0 {
if r, ok := job.store.Load(keys[0]); ok {
var so = r.(*TaskObject)
job.wg.Add(1)
go so.start()
job.wg.Done()
job.opt.logger.Infof("开始任务:%s - %s", so.taskId, so.title)
}
} else {
job.store.Range(func(key, value interface{}) bool {
job.wg.Add(1)
var so = value.(*TaskObject)
go so.start()
job.wg.Done()
job.opt.logger.Infof("开始任务:%s - %s", so.taskId, so.title)
return true
})
}
}
func (job *TaskManager) Wait() {
job.wg.Wait()
select {}
}
func (job *TaskManager) Stop(keys ...string) {
if len(keys) > 0 {
if r, ok := job.store.Load(keys[0]); ok {
var so = r.(*TaskObject)
if so.IsRunning() {
so.stop()
job.opt.logger.Infof("停止任务:%s - %s", so.taskId, so.title)
}
}
} else {
job.store.Range(func(key, value interface{}) bool {
var so = value.(*TaskObject)
so.stop()
if so.IsRunning() {
so.stop()
job.opt.logger.Infof("停止任务:%s - %s", so.taskId, so.title)
}
return true
})
}
//// 判断是否还有任务
//var jobs int
//job.store.Range(func(key, value interface{}) bool {
// jobs++
// return true
//})
//if jobs == 0 {
// job.done <- struct{}{}
//}
}
func (job *TaskManager) Remove(keys ...string) {
if len(keys) > 0 {
job.Stop(keys[0])
job.store.Delete(keys[0])
job.opt.logger.Infof("删除任务:%s", keys[0])
} else {
job.opt.logger.Infof("删除所有任务")
job.Stop()
*job = *newTaskManager(job.opt)
}
}
func (job *TaskManager) Range(f func(key, value interface{}) bool) {
job.store.Range(f)
}