-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsupervisor.go
148 lines (131 loc) · 4.25 KB
/
supervisor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Package supervisor is a very simple implementation of the Supervisor pattern
// used in Erlang/OTP. It provides a mechanism for controlling/coordinating
// go-routines, and encourages the principle of failing early by ensuring the
// timely restart after any failures.
package supervisor
import (
"context"
"sync"
"time"
)
// Supervisable specifies the required signature of a Worker function. To
// correctly manage a Supervisable there are three requirements:
//
// 1. The Supervisable **must** handle context cancellation correctly;
//
// 2. The Supervisable **must** defer the close of `chan struct{}`;
//
// 3. The Supervisable **must** ensure that `recover()` is called.
type Supervisable func(context.Context, chan struct{})
// Supervisor is the basic Supervision Tree supervisor node. It's capable
// of monitoring a given goroutine and restarting it upon failure, as well
// as terminating or restarting it upon request.
type Supervisor struct {
isSimple bool
workers []Supervisable
ctx context.Context
stop context.CancelFunc
wg *sync.WaitGroup
workerCount int
runningWorkers int
}
// NewSimpleSupervisor returns a supervisor which can only run a single
// instance of a single worker goroutine. For a lot of uses this will be
// enough.
func NewSimpleSupervisor(ctx context.Context, worker Supervisable) *Supervisor {
supervisorCtx, cancel := context.WithCancel(ctx)
return &Supervisor{
isSimple: true,
workers: []Supervisable{worker},
ctx: supervisorCtx,
stop: cancel,
}
}
// Options holds basic configuration information for the Supervisor.
type Options struct {
// WorkerCount determines how many instances *of each* worker should
// be executed.
WorkerCount int
// Workers is a slice of different Supervisable workers, these will
// all be executed with WorkerCount instances
Workers []Supervisable
// Context allows a parent context.Context object to be used, useful
// where there are external timeouts or cancellations that may occur
// further up the call chain.
Context context.Context
// Waiter allows the caller to block until the Supervisor has completed.
Waiter *sync.WaitGroup
}
// NewSupervisorWithOptions configures a new Supervisor using any options
// specified by the Options struct.
func NewSupervisorWithOptions(opts *Options) *Supervisor {
ctx := opts.Context
if ctx == nil {
ctx = context.Background()
}
supervisorCtx, cancel := context.WithCancel(ctx)
return &Supervisor{
workers: opts.Workers,
workerCount: opts.WorkerCount,
ctx: supervisorCtx,
stop: cancel,
}
}
// Run is the entrypoint for the supervisor; calling run will configure
// all the supplied Supervisables at the specified number of instances.
func (s *Supervisor) Run() {
for _, worker := range s.workers {
go s.runLoop(worker)
}
}
func (s *Supervisor) runLoop(worker Supervisable) {
if s.wg != nil {
s.wg.Add(1)
defer s.wg.Done()
}
// BUG(): This is a quick hack, and should be handled via the WaitGroup
// Just need to work out how to handle `.WithWaitGroup(sync.WaitGroup)`
// calls that happen in conjunction with an internal pre-existing one.
s.runningWorkers++
defer func() {
s.runningWorkers--
}()
for {
isDone := make(chan struct{})
go worker(s.ctx, isDone)
<-isDone
if s.ctx.Err() != nil {
break
}
}
}
// Restart terminates the current worker goroutines, and then executes
// them again. This is a convenience wrapper around calling `Stop` and
// `Run` consecutively.
func (s *Supervisor) Restart() {
s.Stop()
defer s.Run()
for {
// @todo - come on, man. This isn't the way.
<-time.After(time.Millisecond * 250)
if s.HasStopped() {
return
}
}
}
// Stop terminates any current goroutines by simply invoking the context
// cancellation function.
func (s *Supervisor) Stop() {
s.stop()
}
// HasStopped returns a boolean stating wheter the Supervisor is running.
func (s *Supervisor) HasStopped() bool {
return (s.runningWorkers == 0)
}
// WithWaitGroup allows a WaitGroup to be specified and incremented
// for each Supervisable supplied; when the WaitGroup is Done this
// means that all Supervisables have completed for good, and there
// will be no attempt at restarting them.
func (s *Supervisor) WithWaitGroup(wg *sync.WaitGroup) {
s.wg = wg
}