@@ -279,6 +279,11 @@ type Coordinator struct {
279
279
280
280
// mx sync.RWMutex
281
281
// protection protection.Config
282
+
283
+ // a sync channel that can be called by other components to check if the main coordinator
284
+ // loop in runLoopIteration() is active and listening.
285
+ // Should only be interacted with via CoordinatorActive() or runLoopIteration()
286
+ heartbeatChan chan struct {}
282
287
}
283
288
284
289
// The channels Coordinator reads to receive updates from the various managers.
@@ -372,6 +377,7 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
372
377
logLevelCh : make (chan logp.Level ),
373
378
overrideStateChan : make (chan * coordinatorOverrideState ),
374
379
upgradeDetailsChan : make (chan * details.Details ),
380
+ heartbeatChan : make (chan struct {}),
375
381
}
376
382
// Setup communication channels for any non-nil components. This pattern
377
383
// lets us transparently accept nil managers / simulated events during
@@ -412,6 +418,22 @@ func (c *Coordinator) State() State {
412
418
return c .stateBroadcaster .Get ()
413
419
}
414
420
421
+ // IsActive is a blocking method that waits for a channel response
422
+ // from the coordinator loop. This can be used to as a basic health check,
423
+ // as we'll timeout and return false if the coordinator run loop doesn't
424
+ // respond to our channel.
425
+ func (c * Coordinator ) IsActive (timeout time.Duration ) bool {
426
+ ctx , cancel := context .WithTimeout (context .Background (), timeout )
427
+ defer cancel ()
428
+
429
+ select {
430
+ case <- c .heartbeatChan :
431
+ return true
432
+ case <- ctx .Done ():
433
+ return false
434
+ }
435
+ }
436
+
415
437
func (c * Coordinator ) RegisterMonitoringServer (s configReloader ) {
416
438
c .monitoringServerReloader = s
417
439
}
@@ -977,6 +999,8 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
977
999
case upgradeDetails := <- c .upgradeDetailsChan :
978
1000
c .setUpgradeDetails (upgradeDetails )
979
1001
1002
+ case c .heartbeatChan <- struct {}{}:
1003
+
980
1004
case componentState := <- c .managerChans .runtimeManagerUpdate :
981
1005
// New component change reported by the runtime manager via
982
1006
// Coordinator.watchRuntimeComponents(), merge it with the
0 commit comments