Skip to content

Commit 5bd118d

Browse files
authored
Merge branch 'main' into drop/old-apm-agent
2 parents a131593 + c929f79 commit 5bd118d

17 files changed

+744
-146
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Add monitoring of endpoint usage metrics
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component:
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/elastic-agent/pull/4789
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/elastic/elastic-agent/issues/4083

internal/pkg/agent/application/coordinator/coordinator.go

+41-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"reflect"
1212
"strings"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/hashicorp/go-multierror"
@@ -82,7 +83,12 @@ type MonitorManager interface {
8283
Reload(rawConfig *config.Config) error
8384

8485
// MonitoringConfig injects monitoring configuration into resolved ast tree.
85-
MonitoringConfig(map[string]interface{}, []component.Component, map[string]string) (map[string]interface{}, error)
86+
// args:
87+
// - the existing config policy
88+
// - a list of the expected running components
89+
// - a map of component IDs to binary names
90+
// - a map of component IDs to the PIDs of the running components.
91+
MonitoringConfig(map[string]interface{}, []component.Component, map[string]string, map[string]uint64) (map[string]interface{}, error)
8692
}
8793

8894
// Runner provides interface to run a manager and receive running errors.
@@ -284,6 +290,14 @@ type Coordinator struct {
284290
// loop in runLoopIteration() is active and listening.
285291
// Should only be interacted with via CoordinatorActive() or runLoopIteration()
286292
heartbeatChan chan struct{}
293+
294+
// if a component (mostly endpoint) has a new PID, we need to update
295+
// the monitoring components so they have a PID to monitor
296+
// however, if endpoint is in some kind of restart loop,
297+
// we could DOS the config system. Instead,
298+
// run a ticker that checks to see if we have a new PID.
299+
componentPIDTicker *time.Ticker
300+
componentPidRequiresUpdate *atomic.Bool
287301
}
288302

289303
// The channels Coordinator reads to receive updates from the various managers.
@@ -374,10 +388,12 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
374388
// synchronization in the subscriber API, just set the input buffer to 0.
375389
stateBroadcaster: broadcaster.New(state, 64, 32),
376390

377-
logLevelCh: make(chan logp.Level),
378-
overrideStateChan: make(chan *coordinatorOverrideState),
379-
upgradeDetailsChan: make(chan *details.Details),
380-
heartbeatChan: make(chan struct{}),
391+
logLevelCh: make(chan logp.Level),
392+
overrideStateChan: make(chan *coordinatorOverrideState),
393+
upgradeDetailsChan: make(chan *details.Details),
394+
heartbeatChan: make(chan struct{}),
395+
componentPIDTicker: time.NewTicker(time.Second * 30),
396+
componentPidRequiresUpdate: &atomic.Bool{},
381397
}
382398
// Setup communication channels for any non-nil components. This pattern
383399
// lets us transparently accept nil managers / simulated events during
@@ -926,6 +942,8 @@ func (c *Coordinator) runner(ctx context.Context) error {
926942
ctx, cancel := context.WithCancel(ctx)
927943
defer cancel()
928944

945+
defer c.componentPIDTicker.Stop()
946+
929947
// We run nil checks before starting the various managers so that unit tests
930948
// only have to initialize / mock the specific components they're testing.
931949
// If a manager is nil, we prebuffer its return channel with nil also so
@@ -1038,6 +1056,18 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
10381056

10391057
case c.heartbeatChan <- struct{}{}:
10401058

1059+
case <-c.componentPIDTicker.C:
1060+
// if we hit the ticker and we've got a new PID,
1061+
// reload the component model
1062+
if c.componentPidRequiresUpdate.Swap(false) {
1063+
err := c.refreshComponentModel(ctx)
1064+
if err != nil {
1065+
err = fmt.Errorf("error refreshing component model for PID update: %w", err)
1066+
c.setConfigManagerError(err)
1067+
c.logger.Errorf("%s", err)
1068+
}
1069+
}
1070+
10411071
case componentState := <-c.managerChans.runtimeManagerUpdate:
10421072
// New component change reported by the runtime manager via
10431073
// Coordinator.watchRuntimeComponents(), merge it with the
@@ -1277,11 +1307,17 @@ func (c *Coordinator) generateComponentModel() (err error) {
12771307
configInjector = c.monitorMgr.MonitoringConfig
12781308
}
12791309

1310+
var existingCompState = make(map[string]uint64, len(c.state.Components))
1311+
for _, comp := range c.state.Components {
1312+
existingCompState[comp.Component.ID] = comp.State.Pid
1313+
}
1314+
12801315
comps, err := c.specs.ToComponents(
12811316
cfg,
12821317
configInjector,
12831318
c.state.LogLevel,
12841319
c.agentInfo,
1320+
existingCompState,
12851321
)
12861322
if err != nil {
12871323
return fmt.Errorf("failed to render components: %w", err)

internal/pkg/agent/application/coordinator/coordinator_state.go

+9
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,24 @@ func (c *Coordinator) refreshState() {
135135
// Coordinator state and sets stateNeedsRefresh.
136136
// Must be called on the main Coordinator goroutine.
137137
func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) {
138+
139+
// check for any component updates to the known PID, so we can update the component monitoring
138140
found := false
139141
for i, other := range c.state.Components {
140142
if other.Component.ID == state.Component.ID {
143+
if other.State.Pid != state.State.Pid {
144+
c.componentPidRequiresUpdate.Store(true)
145+
}
141146
c.state.Components[i] = state
142147
found = true
143148
break
144149
}
145150
}
146151
if !found {
147152
c.state.Components = append(c.state.Components, state)
153+
if state.State.Pid != 0 {
154+
c.componentPidRequiresUpdate.Store(true)
155+
}
148156
}
149157

150158
// In the case that the component has stopped, it is now removed.
@@ -160,6 +168,7 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState)
160168
}
161169

162170
c.stateNeedsRefresh = true
171+
163172
}
164173

165174
// generateReportableState aggregates the internal state of the Coordinator

internal/pkg/agent/application/coordinator/coordinator_test.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,6 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) {
587587
resultChan <- ctx.Err()
588588
return
589589
case state := <-subChan:
590-
t.Logf("%+v", state)
591590
if len(state.Components) == 3 {
592591
compState0 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-0")
593592
compState1 := getComponentState(state.Components, "fake-isolated-units-default-fake-isolated-units-1")
@@ -599,6 +598,11 @@ func TestCoordinator_StateSubscribeIsolatedUnits(t *testing.T) {
599598
(unit1.State == client.UnitStateHealthy && unit1.Message == "Healthy From Fake Isolated Units 1 Config") {
600599
resultChan <- nil
601600
return
601+
} else if unit0.State == client.UnitStateFailed && unit1.State == client.UnitStateFailed {
602+
// if you get a really strange failed state, check to make sure the mock binaries in
603+
// elastic-agent/pkg/component/fake/ are updated
604+
t.Fail()
605+
t.Logf("got units with failed state: %#v / %#v", unit1, unit0)
602606
}
603607
}
604608
}
@@ -1007,7 +1011,7 @@ func (*testMonitoringManager) Prepare(_ string) error
10071011
func (*testMonitoringManager) Cleanup(string) error { return nil }
10081012
func (*testMonitoringManager) Enabled() bool { return false }
10091013
func (*testMonitoringManager) Reload(rawConfig *config.Config) error { return nil }
1010-
func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string) (map[string]interface{}, error) {
1014+
func (*testMonitoringManager) MonitoringConfig(_ map[string]interface{}, _ []component.Component, _ map[string]string, _ map[string]uint64) (map[string]interface{}, error) {
10111015
return nil, nil
10121016
}
10131017

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

+25-14
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func TestVarsManagerError(t *testing.T) {
5959
managerChans: managerChans{
6060
varsManagerError: varsErrorChan,
6161
},
62+
componentPIDTicker: time.NewTicker(time.Second * 30),
6263
}
6364
// Send an error via the vars manager channel, and let Coordinator update
6465
const errorStr = "force error"
@@ -110,6 +111,7 @@ func TestCoordinatorReportsUnhealthyComponents(t *testing.T) {
110111
managerChans: managerChans{
111112
runtimeManagerUpdate: runtimeChan,
112113
},
114+
componentPIDTicker: time.NewTicker(time.Second * 30),
113115
}
114116

115117
unhealthyComponent := runtime.ComponentComponentState{
@@ -186,6 +188,7 @@ func TestCoordinatorComponentStatesAreSeparate(t *testing.T) {
186188
managerChans: managerChans{
187189
runtimeManagerUpdate: runtimeChan,
188190
},
191+
componentPIDTicker: time.NewTicker(time.Second * 30),
189192
}
190193

191194
comp1 := runtime.ComponentComponentState{
@@ -256,6 +259,7 @@ func TestCoordinatorReportsUnhealthyUnits(t *testing.T) {
256259
managerChans: managerChans{
257260
runtimeManagerUpdate: runtimeChan,
258261
},
262+
componentPIDTicker: time.NewTicker(time.Second * 30),
259263
}
260264

261265
// Create a healthy component with healthy input and output units
@@ -375,8 +379,9 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) {
375379
runtimeMgr: &fakeRuntimeManager{},
376380

377381
// Set valid but empty initial values for ast and vars
378-
vars: emptyVars(t),
379-
ast: emptyAST(t),
382+
vars: emptyVars(t),
383+
ast: emptyAST(t),
384+
componentPIDTicker: time.NewTicker(time.Second * 30),
380385
}
381386

382387
// Send an invalid config update and confirm that Coordinator reports
@@ -389,7 +394,6 @@ agent.download.sourceURI:
389394
cfgChange := &configChange{cfg: cfg}
390395
configChan <- cfgChange
391396
coord.runLoopIteration(ctx)
392-
393397
assert.True(t, cfgChange.failed, "Policy with invalid field should have reported failed config change")
394398
require.ErrorContainsf(t,
395399
cfgChange.err,
@@ -420,6 +424,7 @@ agent.download.sourceURI:
420424
// (This check is based on a previous bug in which a vars update could
421425
// discard active policy errors.)
422426
varsChan <- emptyVars(t)
427+
t.Logf("after emptyVars statement")
423428
coord.runLoopIteration(ctx)
424429

425430
assert.Error(t, coord.configErr, "Vars update shouldn't affect configErr")
@@ -489,8 +494,9 @@ func TestCoordinatorReportsComponentModelError(t *testing.T) {
489494
runtimeMgr: &fakeRuntimeManager{},
490495

491496
// Set valid but empty initial values for ast and vars
492-
vars: emptyVars(t),
493-
ast: emptyAST(t),
497+
vars: emptyVars(t),
498+
ast: emptyAST(t),
499+
componentPIDTicker: time.NewTicker(time.Second * 30),
494500
}
495501

496502
// This configuration produces a valid AST but its EQL condition is
@@ -583,8 +589,9 @@ func TestCoordinatorPolicyChangeUpdatesMonitorReloader(t *testing.T) {
583589
managerChans: managerChans{
584590
configManagerUpdate: configChan,
585591
},
586-
runtimeMgr: runtimeManager,
587-
vars: emptyVars(t),
592+
runtimeMgr: runtimeManager,
593+
vars: emptyVars(t),
594+
componentPIDTicker: time.NewTicker(time.Second * 30),
588595
}
589596
coord.RegisterMonitoringServer(monitoringReloader)
590597

@@ -711,8 +718,9 @@ func TestCoordinatorPolicyChangeUpdatesRuntimeManager(t *testing.T) {
711718
managerChans: managerChans{
712719
configManagerUpdate: configChan,
713720
},
714-
runtimeMgr: runtimeManager,
715-
vars: emptyVars(t),
721+
runtimeMgr: runtimeManager,
722+
vars: emptyVars(t),
723+
componentPIDTicker: time.NewTicker(time.Second * 30),
716724
}
717725

718726
// Create a policy with one input and one output
@@ -798,8 +806,9 @@ func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
798806
// manager, so it receives the update result.
799807
runtimeManagerError: updateErrChan,
800808
},
801-
runtimeMgr: runtimeManager,
802-
vars: emptyVars(t),
809+
runtimeMgr: runtimeManager,
810+
vars: emptyVars(t),
811+
componentPIDTicker: time.NewTicker(time.Second * 30),
803812
}
804813

805814
// Send an empty policy which should forward an empty component model to
@@ -860,8 +869,9 @@ func TestCoordinatorAppliesVarsToPolicy(t *testing.T) {
860869
configManagerUpdate: configChan,
861870
varsManagerUpdate: varsChan,
862871
},
863-
runtimeMgr: runtimeManager,
864-
vars: emptyVars(t),
872+
runtimeMgr: runtimeManager,
873+
vars: emptyVars(t),
874+
componentPIDTicker: time.NewTicker(time.Second * 30),
865875
}
866876

867877
// Create a policy with one input and one output
@@ -936,7 +946,8 @@ func TestCoordinatorReportsOverrideState(t *testing.T) {
936946
stateBroadcaster: &broadcaster.Broadcaster[State]{
937947
InputChan: stateChan,
938948
},
939-
overrideStateChan: overrideStateChan,
949+
overrideStateChan: overrideStateChan,
950+
componentPIDTicker: time.NewTicker(time.Second * 30),
940951
}
941952
// Send an error via the vars manager channel, and let Coordinator update
942953
overrideStateChan <- &coordinatorOverrideState{

internal/pkg/agent/application/coordinator/diagnostics_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ log_level: "warning"
488488
components:
489489
- id: "comp-1"
490490
state:
491+
pid: 0
491492
state: 3
492493
message: "degraded message"
493494
features_idx: 0
@@ -570,6 +571,7 @@ log_level: "warning"
570571
components:
571572
- id: "comp-1"
572573
state:
574+
pid: 0
573575
state: 3
574576
message: "degraded message"
575577
features_idx: 0

0 commit comments

Comments
 (0)