-
-
Notifications
You must be signed in to change notification settings - Fork 69
/
Copy pathtask.go
88 lines (74 loc) · 1.55 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package pond
import (
"errors"
"fmt"
"runtime/debug"
"sync"
)
var ErrPanic = errors.New("task panicked")
type subpoolTask[R any] struct {
task any
sem chan struct{}
waitGroup *sync.WaitGroup
updateMetrics func(error)
}
func (t subpoolTask[R]) Run() {
defer func() {
// Release semaphore
<-t.sem
// Decrement wait group
t.waitGroup.Done()
}()
_, err := invokeTask[R](t.task)
if t.updateMetrics != nil {
t.updateMetrics(err)
}
}
type wrappedTask[R any, C func(error) | func(R, error)] struct {
task any
callback C
}
func (t wrappedTask[R, C]) Run() error {
result, err := invokeTask[R](t.task)
switch c := any(t.callback).(type) {
case func(error):
c(err)
case func(R, error):
c(result, err)
default:
panic(fmt.Sprintf("unsupported callback type: %#v", t.callback))
}
return err
}
func wrapTask[R any, C func(error) | func(R, error)](task any, callback C) func() error {
wrapped := &wrappedTask[R, C]{
task: task,
callback: callback,
}
return wrapped.Run
}
func invokeTask[R any](task any) (output R, err error) {
defer func() {
if p := recover(); p != nil {
if e, ok := p.(error); ok {
err = fmt.Errorf("%w: %w, %s", ErrPanic, e, debug.Stack())
} else {
err = fmt.Errorf("%w: %v, %s", ErrPanic, p, debug.Stack())
}
return
}
}()
switch t := any(task).(type) {
case func():
t()
case func() error:
err = t()
case func() R:
output = t()
case func() (R, error):
output, err = t()
default:
panic(fmt.Sprintf("unsupported task type: %#v", task))
}
return
}