Skip to content

Commit 60a1859

Browse files
committed
monitor: add stat for worker queue
1 parent 94a613a commit 60a1859

File tree

5 files changed

+72
-7
lines changed

5 files changed

+72
-7
lines changed

cmd/go-judge/main.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ func newEnvPool(b pool.EnvBuilder, enableMetrics bool) worker.EnvironmentPool {
508508
}
509509

510510
func newWorker(conf *config.Config, envPool worker.EnvironmentPool, fs filestore.FileStore) worker.Worker {
511-
return worker.New(worker.Config{
511+
w := worker.New(worker.Config{
512512
FileStore: fs,
513513
EnvironmentPool: envPool,
514514
Parallelism: conf.Parallelism,
@@ -520,6 +520,10 @@ func newWorker(conf *config.Config, envPool worker.EnvironmentPool, fs filestore
520520
OpenFileLimit: uint64(conf.OpenFileLimit),
521521
ExecObserver: execObserve,
522522
})
523+
if conf.EnableMetrics {
524+
w = newMetricsWorker(w)
525+
}
526+
return w
523527
}
524528

525529
func newForceGCWorker(conf *config.Config) {

cmd/go-judge/metrics.go

+40
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const (
1616
execSubsystem = "exec"
1717
filestoreSubsystem = "file"
1818
environmentSubsystem = "environment"
19+
workerSubsystem = "worker"
1920
)
2021

2122
var (
@@ -88,6 +89,16 @@ var (
8889
Name: "current_count",
8990
Help: "Total number of environment currently in use",
9091
})
92+
93+
workerQueue = prometheus.NewDesc(
94+
prometheus.BuildFQName(metricsNamespace, workerSubsystem, "queue_count"),
95+
"Number of requests waiting in worker queue", nil, nil,
96+
)
97+
98+
workerRunning = prometheus.NewDesc(
99+
prometheus.BuildFQName(metricsNamespace, workerSubsystem, "running_count"),
100+
"Number of request running by workers", nil, nil,
101+
)
91102
)
92103

93104
func init() {
@@ -205,3 +216,32 @@ func (p *metricsEnvPool) Put(env envexec.Environment) {
205216
p.EnvironmentPool.Put(env)
206217
envInUse.Dec()
207218
}
219+
220+
var _ worker.Worker = &metricsWorker{}
221+
var _ prometheus.Collector = &metricsWorker{}
222+
223+
type metricsWorker struct {
224+
worker.Worker
225+
}
226+
227+
// Collect implements prometheus.Collector.
228+
func (m *metricsWorker) Collect(ch chan<- prometheus.Metric) {
229+
s := m.Stat()
230+
ch <- prometheus.MustNewConstMetric(
231+
workerQueue, prometheus.GaugeValue, float64(s.Queue),
232+
)
233+
ch <- prometheus.MustNewConstMetric(
234+
workerRunning, prometheus.GaugeValue, float64(s.Running),
235+
)
236+
}
237+
238+
// Describe implements prometheus.Collector.
239+
func (m *metricsWorker) Describe(ch chan<- *prometheus.Desc) {
240+
prometheus.DescribeByCollect(m, ch)
241+
}
242+
243+
func newMetricsWorker(w worker.Worker) worker.Worker {
244+
rt := &metricsWorker{w}
245+
prometheus.MustRegister(rt)
246+
return rt
247+
}

env/env_linux.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -261,15 +261,17 @@ func newSystemdProperty(name string, units any) dbus.Property {
261261
}
262262

263263
type credGen struct {
264-
cur uint32
264+
cur atomic.Uint32
265265
}
266266

267267
func newCredGen(start uint32) *credGen {
268-
return &credGen{cur: start}
268+
rt := &credGen{}
269+
rt.cur.Store(start)
270+
return rt
269271
}
270272

271273
func (c *credGen) Get() syscall.Credential {
272-
n := atomic.AddUint32(&c.cur, 1)
274+
n := c.cur.Add(1)
273275
return syscall.Credential{
274276
Uid: n,
275277
Gid: n,

envexec/file_util_linux.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ import (
1212

1313
const memfdName = "input"
1414

15-
var enableMemFd int32
15+
var enableMemFd atomic.Int32
1616

1717
func readerToFile(reader io.Reader) (*os.File, error) {
18-
if atomic.LoadInt32(&enableMemFd) == 0 {
18+
if enableMemFd.Load() == 0 {
1919
f, err := memfd.DupToMemfd(memfdName, reader)
2020
if err == nil {
2121
return f, err
2222
}
23-
atomic.StoreInt32(&enableMemFd, 1)
23+
enableMemFd.Store(1)
2424
}
2525
r, w, err := os.Pipe()
2626
if err != nil {

worker/worker.go

+19
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"path"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"github.com/criyle/go-judge/envexec"
@@ -39,9 +40,16 @@ type Worker interface {
3940
Start()
4041
Submit(context.Context, *Request) (<-chan Response, <-chan struct{})
4142
Execute(context.Context, *Request) <-chan Response
43+
Stat() Stat
4244
Shutdown()
4345
}
4446

47+
// Stat stores the statistic of the Worker
48+
type Stat struct {
49+
Queue int
50+
Running int
51+
}
52+
4553
// worker defines executor worker
4654
type worker struct {
4755
fs filestore.FileStore
@@ -62,6 +70,7 @@ type worker struct {
6270
wg sync.WaitGroup
6371
workCh chan workRequest
6472
done chan struct{}
73+
running atomic.Int32
6574
}
6675

6776
type workRequest struct {
@@ -131,6 +140,13 @@ func (w *worker) Execute(ctx context.Context, req *Request) <-chan Response {
131140
return ch
132141
}
133142

143+
func (w *worker) Stat() Stat {
144+
return Stat{
145+
Queue: len(w.workCh),
146+
Running: int(w.running.Load()),
147+
}
148+
}
149+
134150
// Shutdown waits all worker to finish
135151
func (w *worker) Shutdown() {
136152
w.stopOnce.Do(func() {
@@ -166,6 +182,9 @@ func (w *worker) loop() {
166182
}
167183

168184
func (w *worker) workDoCmd(ctx context.Context, req *Request) Response {
185+
w.running.Add(1)
186+
defer w.running.Add(-1)
187+
169188
var rt Response
170189
if len(req.Cmd) == 1 {
171190
rt = w.workDoSingle(ctx, req.Cmd[0])

0 commit comments

Comments
 (0)