Skip to content

Commit 824cc7b

Browse files
update metrics setup
1 parent 6771de5 commit 824cc7b

File tree

6 files changed

+121
-222
lines changed

6 files changed

+121
-222
lines changed

internal/pkg/agent/application/coordinator/coordinator.go

+27-12
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"reflect"
1212
"strings"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/hashicorp/go-multierror"
@@ -285,7 +286,13 @@ type Coordinator struct {
285286
// Should only be interacted with via CoordinatorActive() or runLoopIteration()
286287
heartbeatChan chan struct{}
287288

288-
compPidUpdate chan struct{}
289+
// if a component (mostly endpoint) has a new PID, we need to update
290+
// the monitoring components so they have a PID to monitor
291+
// however, if endpoint is in some kind of restart loop,
292+
// we could DOS the config system. Instead,
293+
// run a ticker that checks to see if we have a new PID.
294+
componentPIDTicker *time.Ticker
295+
componentPidRequiresUpdate *atomic.Bool
289296
}
290297

291298
// The channels Coordinator reads to receive updates from the various managers.
@@ -376,11 +383,12 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
376383
// synchronization in the subscriber API, just set the input buffer to 0.
377384
stateBroadcaster: broadcaster.New(state, 64, 32),
378385

379-
logLevelCh: make(chan logp.Level),
380-
overrideStateChan: make(chan *coordinatorOverrideState),
381-
upgradeDetailsChan: make(chan *details.Details),
382-
heartbeatChan: make(chan struct{}),
383-
compPidUpdate: make(chan struct{}, 1),
386+
logLevelCh: make(chan logp.Level),
387+
overrideStateChan: make(chan *coordinatorOverrideState),
388+
upgradeDetailsChan: make(chan *details.Details),
389+
heartbeatChan: make(chan struct{}),
390+
componentPIDTicker: time.NewTicker(time.Second * 30),
391+
componentPidRequiresUpdate: &atomic.Bool{},
384392
}
385393
// Setup communication channels for any non-nil components. This pattern
386394
// lets us transparently accept nil managers / simulated events during
@@ -923,6 +931,8 @@ func (c *Coordinator) runner(ctx context.Context) error {
923931
ctx, cancel := context.WithCancel(ctx)
924932
defer cancel()
925933

934+
defer c.componentPIDTicker.Stop()
935+
926936
// We run nil checks before starting the various managers so that unit tests
927937
// only have to initialize / mock the specific components they're testing.
928938
// If a manager is nil, we prebuffer its return channel with nil also so
@@ -1035,12 +1045,17 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
10351045

10361046
case c.heartbeatChan <- struct{}{}:
10371047

1038-
case <-c.compPidUpdate:
1039-
err := c.refreshComponentModel(ctx)
1040-
if err != nil {
1041-
err = fmt.Errorf("error refreshing component model for PID update: %w", err)
1042-
c.setConfigManagerError(err)
1043-
c.logger.Errorf("%s", err)
1048+
case <-c.componentPIDTicker.C:
1049+
// if we hit the ticker and we've got a new PID,
1050+
// reload the component model
1051+
if c.componentPidRequiresUpdate.Load() {
1052+
c.componentPidRequiresUpdate.Store(false)
1053+
err := c.refreshComponentModel(ctx)
1054+
if err != nil {
1055+
err = fmt.Errorf("error refreshing component model for PID update: %w", err)
1056+
c.setConfigManagerError(err)
1057+
c.logger.Errorf("%s", err)
1058+
}
10441059
}
10451060

10461061
case componentState := <-c.managerChans.runtimeManagerUpdate:

internal/pkg/agent/application/coordinator/coordinator_state.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ func (c *Coordinator) refreshState() {
136136
// Must be called on the main Coordinator goroutine.
137137
func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) {
138138
found := false
139+
// check for any component updates to the PID, so we can update the component monitoring
139140
pidRequiresUpdate := false
140141
for i, other := range c.state.Components {
141142
if other.Component.ID == state.Component.ID {
@@ -169,7 +170,7 @@ func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState)
169170
c.stateNeedsRefresh = true
170171

171172
if pidRequiresUpdate {
172-
c.compPidUpdate <- struct{}{}
173+
c.componentPidRequiresUpdate.Store(true)
173174
}
174175
}
175176

internal/pkg/agent/application/monitoring/v1_monitor.go

+69-64
Original file line numberDiff line numberDiff line change
@@ -910,84 +910,89 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
910910
},
911911
}
912912

913+
for _, comp := range componentList {
914+
logp.L().Infof("input component %s: %#v", comp.ID, comp.InputSpec.Spec.Service)
915+
}
916+
913917
// add system/process metrics for services that can't be monitored via json/beats metrics
914-
// If there's a checkin PID and it contains the "endpoint" string, assume we want to monitor it
915-
for id, comp := range existingStateServicePids {
916-
logp.L().Infof("component/pid monitoring map is: %#v", existingStateServicePids)
917-
if comp != 0 && strings.Contains(id, "endpoint") {
918-
logp.L().Infof("creating system/process watcher for pid %d", comp)
919-
inputs = append(inputs, map[string]interface{}{
920-
idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID),
921-
"name": fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID),
922-
"type": "system/metrics",
923-
useOutputKey: monitoringOutput,
924-
"data_stream": map[string]interface{}{
925-
"namespace": monitoringNamespace,
926-
},
927-
"streams": []interface{}{
928-
map[string]interface{}{
929-
idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID),
930-
"data_stream": map[string]interface{}{
931-
"type": "metrics",
932-
"dataset": "elastic_agent.endpoint_security",
933-
"namespace": monitoringNamespace,
934-
},
935-
"metricsets": []interface{}{"process"},
936-
"period": metricsCollectionIntervalString,
937-
"index": fmt.Sprintf("metrics-elastic_agent.endpoint_security-%s", monitoringNamespace),
938-
"process.pid": comp,
939-
"process.cgroups.enabled": false,
940-
"processors": []interface{}{
941-
map[string]interface{}{
942-
"add_fields": map[string]interface{}{
943-
"target": "data_stream",
944-
"fields": map[string]interface{}{
945-
"type": "metrics",
946-
"dataset": "elastic_agent.endpoint_security",
947-
"namespace": monitoringNamespace,
918+
// If there's a checkin PID and the corrisponding component has a service spec section, add a system/process config
919+
for _, compState := range componentList {
920+
if compState.InputSpec != nil && compState.InputSpec.Spec.Service != nil {
921+
if comp, ok := existingStateServicePids[compState.ID]; ok && comp != 0 {
922+
inputs = append(inputs, map[string]interface{}{
923+
idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID),
924+
"name": fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID),
925+
"type": "system/metrics",
926+
useOutputKey: monitoringOutput,
927+
"data_stream": map[string]interface{}{
928+
"namespace": monitoringNamespace,
929+
},
930+
"streams": []interface{}{
931+
map[string]interface{}{
932+
idKey: fmt.Sprintf("%s-endpoint_security", monitoringMetricsUnitID),
933+
"data_stream": map[string]interface{}{
934+
"type": "metrics",
935+
"dataset": "elastic_agent.endpoint_security",
936+
"namespace": monitoringNamespace,
937+
},
938+
"metricsets": []interface{}{"process"},
939+
"period": metricsCollectionIntervalString,
940+
"index": fmt.Sprintf("metrics-elastic_agent.endpoint_security-%s", monitoringNamespace),
941+
"process.pid": comp,
942+
"process.cgroups.enabled": false,
943+
"processors": []interface{}{
944+
map[string]interface{}{
945+
"add_fields": map[string]interface{}{
946+
"target": "data_stream",
947+
"fields": map[string]interface{}{
948+
"type": "metrics",
949+
"dataset": "elastic_agent.endpoint_security",
950+
"namespace": monitoringNamespace,
951+
},
948952
},
949953
},
950-
},
951-
map[string]interface{}{
952-
"add_fields": map[string]interface{}{
953-
"target": "event",
954-
"fields": map[string]interface{}{
955-
"dataset": "elastic_agent.endpoint_security",
954+
map[string]interface{}{
955+
"add_fields": map[string]interface{}{
956+
"target": "event",
957+
"fields": map[string]interface{}{
958+
"dataset": "elastic_agent.endpoint_security",
959+
},
956960
},
957961
},
958-
},
959-
map[string]interface{}{
960-
"add_fields": map[string]interface{}{
961-
"target": "elastic_agent",
962-
"fields": map[string]interface{}{
963-
"id": b.agentInfo.AgentID(),
964-
"version": b.agentInfo.Version(),
965-
"snapshot": b.agentInfo.Snapshot(),
966-
"process": "endpoint_security",
962+
map[string]interface{}{
963+
"add_fields": map[string]interface{}{
964+
"target": "elastic_agent",
965+
"fields": map[string]interface{}{
966+
"id": b.agentInfo.AgentID(),
967+
"version": b.agentInfo.Version(),
968+
"snapshot": b.agentInfo.Snapshot(),
969+
"process": "endpoint_security",
970+
},
967971
},
968972
},
969-
},
970-
map[string]interface{}{
971-
"add_fields": map[string]interface{}{
972-
"target": "agent",
973-
"fields": map[string]interface{}{
974-
"id": b.agentInfo.AgentID(),
973+
map[string]interface{}{
974+
"add_fields": map[string]interface{}{
975+
"target": "agent",
976+
"fields": map[string]interface{}{
977+
"id": b.agentInfo.AgentID(),
978+
},
975979
},
976980
},
977-
},
978-
map[string]interface{}{
979-
"add_fields": map[string]interface{}{
980-
"target": "component",
981-
"fields": map[string]interface{}{
982-
"binary": "endpoint_security",
983-
"id": id,
981+
map[string]interface{}{
982+
"add_fields": map[string]interface{}{
983+
"target": "component",
984+
"fields": map[string]interface{}{
985+
"binary": "endpoint_security",
986+
"id": compState.ID,
987+
},
984988
},
985989
},
986990
},
987991
},
988992
},
989-
},
990-
})
993+
})
994+
}
995+
991996
}
992997
}
993998

internal/pkg/agent/application/monitoring/v1_monitor_test.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
)
2424

2525
func TestMonitoringWithEndpoint(t *testing.T) {
26-
2726
agentInfo, err := info.NewAgentInfo(context.Background(), false)
2827
require.NoError(t, err, "Error creating agent info")
2928

@@ -59,7 +58,18 @@ func TestMonitoringWithEndpoint(t *testing.T) {
5958
// manually declaring all the MonitoringConfig() args since there's a lot of them, and this makes
6059
// the test a little more self-describing
6160

62-
var compList []component.Component
61+
compList := []component.Component{
62+
{
63+
ID: "endpoint-default",
64+
InputSpec: &component.InputRuntimeSpec{
65+
Spec: component.InputSpec{
66+
Service: &component.ServiceSpec{
67+
CPort: 7688,
68+
},
69+
},
70+
},
71+
},
72+
}
6373

6474
compIdToBinary := map[string]string{
6575
"endpoint-default": "endpoint-security",

0 commit comments

Comments
 (0)