Skip to content

Commit 0dc98db

Browse files
Revert removal of metrics-monitoring change, fix tests to account for fleet naming changes (#4462)
* Reapply "Add metrics-monitoring beats to resource monitoring (#4326)" (#4451) This reverts commit 7f83ddd. * revert removal, make it easier to adjust unit ID output name * update unit names
1 parent 6e4b379 commit 0dc98db

File tree

8 files changed

+1061
-59
lines changed

8 files changed

+1061
-59
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: enhancement
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: add monitoring beats to usage metrics reporting
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: monitoring
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/elastic/elastic-agent/pull/4326
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/elastic/elastic-agent/issues/4082

internal/pkg/agent/application/monitoring/v1_monitor.go

+31-18
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ const (
5151
defaultMonitoringNamespace = "default"
5252
agentName = "elastic-agent"
5353

54+
monitoringMetricsUnitID = "metrics-monitoring"
55+
monitoringFilesUnitsID = "filestream-monitoring"
56+
5457
windowsOS = "windows"
5558

5659
// metricset execution period used for the monitoring metrics inputs
@@ -301,7 +304,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
301304

302305
streams := []interface{}{
303306
map[string]interface{}{
304-
idKey: "filestream-monitoring-agent",
307+
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
305308
"type": "filestream",
306309
"paths": []interface{}{
307310
filepath.Join(logsDrop, agentName+"-*.ndjson"),
@@ -439,7 +442,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
439442
fixedBinaryName := strings.ReplaceAll(strings.ReplaceAll(comp.InputSpec.BinaryName, "-", "_"), "/", "_") // conform with index naming policy
440443
dataset := fmt.Sprintf("elastic_agent.%s", fixedBinaryName)
441444
streams = append(streams, map[string]interface{}{
442-
idKey: fmt.Sprintf("filestream-monitoring-%s", comp.ID),
445+
idKey: fmt.Sprintf("%s-%s", monitoringFilesUnitsID, comp.ID),
443446
"type": "filestream",
444447
"paths": []interface{}{
445448
comp.InputSpec.Spec.Service.Log.Path,
@@ -492,8 +495,8 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
492495

493496
inputs := []interface{}{
494497
map[string]interface{}{
495-
idKey: "filestream-monitoring-agent",
496-
"name": "filestream-monitoring-agent",
498+
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
499+
"name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
497500
"type": "filestream",
498501
useOutputKey: monitoringOutput,
499502
"streams": streams,
@@ -522,14 +525,13 @@ func (b *BeatsMonitor) monitoringNamespace() string {
522525
}
523526

524527
func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component) error {
525-
526528
metricsCollectionIntervalString := metricsCollectionInterval.String()
527529
monitoringNamespace := b.monitoringNamespace()
528530
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")
529531
beatsStreams := make([]interface{}, 0, len(componentIDToBinary))
530532
streams := []interface{}{
531533
map[string]interface{}{
532-
idKey: "metrics-monitoring-agent",
534+
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
533535
"data_stream": map[string]interface{}{
534536
"type": "metrics",
535537
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
@@ -606,7 +608,18 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
606608
},
607609
},
608610
}
609-
for unit, binaryName := range componentIDToBinary {
611+
612+
//create a new map with the monitoring beats included
613+
componentListWithMonitoring := map[string]string{
614+
fmt.Sprintf("beat/%s", monitoringMetricsUnitID): "metricbeat",
615+
fmt.Sprintf("http/%s", monitoringMetricsUnitID): "metricbeat",
616+
monitoringFilesUnitsID: "filebeat",
617+
}
618+
for k, v := range componentIDToBinary {
619+
componentListWithMonitoring[k] = v
620+
}
621+
622+
for unit, binaryName := range componentListWithMonitoring {
610623
if !isSupportedMetricsBinary(binaryName) {
611624
continue
612625
}
@@ -616,7 +629,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
616629

617630
if isSupportedBeatsBinary(binaryName) {
618631
beatsStreams = append(beatsStreams, map[string]interface{}{
619-
idKey: "metrics-monitoring-" + name,
632+
idKey: fmt.Sprintf("%s-", monitoringMetricsUnitID) + name,
620633
"data_stream": map[string]interface{}{
621634
"type": "metrics",
622635
"dataset": fmt.Sprintf("elastic_agent.%s", name),
@@ -678,7 +691,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
678691
}
679692

680693
streams = append(streams, map[string]interface{}{
681-
idKey: "metrics-monitoring-" + name + "-1",
694+
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
682695
"data_stream": map[string]interface{}{
683696
"type": "metrics",
684697
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
@@ -748,7 +761,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
748761
if strings.EqualFold(name, "filebeat") {
749762
fbDataStreamName := "filebeat_input"
750763
streams = append(streams, map[string]interface{}{
751-
idKey: "metrics-monitoring-" + name + "-1",
764+
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
752765
"data_stream": map[string]interface{}{
753766
"type": "metrics",
754767
"dataset": fmt.Sprintf("elastic_agent.%s", fbDataStreamName),
@@ -832,7 +845,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
832845
// note: this doesn't fetch anything from the /state endpoint, as it doesn't report much beyond name/version,
833846
// the equivalent of the beat /state metrics end up in /shipper
834847
shipperHTTPStreams = append(shipperHTTPStreams, map[string]interface{}{
835-
idKey: "metrics-monitoring-shipper",
848+
idKey: fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
836849
"data_stream": map[string]interface{}{
837850
"type": "metrics",
838851
"dataset": fmt.Sprintf("elastic_agent.%s", name),
@@ -846,7 +859,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
846859
"processors": createProcessorsForJSONInput(name, comp.ID, monitoringNamespace, b.agentInfo),
847860
},
848861
map[string]interface{}{
849-
idKey: "metrics-monitoring-shipper-stats",
862+
idKey: fmt.Sprintf("%s-shipper-stats", monitoringMetricsUnitID),
850863
"data_stream": map[string]interface{}{
851864
"type": "metrics",
852865
"dataset": fmt.Sprintf("elastic_agent.%s", name),
@@ -864,8 +877,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
864877

865878
inputs := []interface{}{
866879
map[string]interface{}{
867-
idKey: "metrics-monitoring-beats",
868-
"name": "metrics-monitoring-beats",
880+
idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
881+
"name": fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
869882
"type": "beat/metrics",
870883
useOutputKey: monitoringOutput,
871884
"data_stream": map[string]interface{}{
@@ -874,8 +887,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
874887
"streams": beatsStreams,
875888
},
876889
map[string]interface{}{
877-
idKey: "metrics-monitoring-agent",
878-
"name": "metrics-monitoring-agent",
890+
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
891+
"name": fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
879892
"type": "http/metrics",
880893
useOutputKey: monitoringOutput,
881894
"data_stream": map[string]interface{}{
@@ -888,8 +901,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
888901
// if we have shipper data, inject the extra inputs
889902
if len(shipperHTTPStreams) > 0 {
890903
inputs = append(inputs, map[string]interface{}{
891-
idKey: "metrics-monitoring-shipper",
892-
"name": "metrics-monitoring-shipper",
904+
idKey: fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
905+
"name": fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
893906
"type": "http/metrics",
894907
useOutputKey: monitoringOutput,
895908
"data_stream": map[string]interface{}{

internal/pkg/agent/application/monitoring/v1_monitor_test.go

+19-10
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,16 @@ func TestMonitoringConfigComponentFields(t *testing.T) {
165165
if _, exists := processor["add_fields"]; !exists {
166166
continue
167167
}
168-
p := Processor{}
169-
if err := json.Unmarshal([]byte(mapstr.M(processor).String()), &p); err != nil {
168+
streamProc := Processor{}
169+
if err := json.Unmarshal([]byte(mapstr.M(processor).String()), &streamProc); err != nil {
170170
t.Errorf("could not decode processor config: %q, err: %s", "foo", err)
171171
}
172-
if p.AddFields.Target != "component" {
172+
if streamProc.AddFields.Target != "component" {
173173
continue
174174
}
175175

176-
binary := p.AddFields.Fields.Binary
177-
componentID := p.AddFields.Fields.ID
176+
binary := streamProc.AddFields.Fields.Binary
177+
componentID := streamProc.AddFields.Fields.ID
178178

179179
// The elastic-Agent is a special case, handle it first
180180
if strings.Contains(streamID, "monitoring-agent") {
@@ -186,11 +186,20 @@ func TestMonitoringConfigComponentFields(t *testing.T) {
186186
}
187187
continue
188188
}
189-
if binary != "filebeat" {
190-
t.Errorf("expecting fields['binary'] = 'filebeat', got %q", binary)
191-
}
192-
if componentID != "filestream-default" {
193-
t.Errorf("expecting fields['id'] = 'filestream-default', got %q", componentID)
189+
if !strings.Contains(componentID, "monitoring") {
190+
if binary != "filebeat" {
191+
t.Errorf("expecting fields['binary'] = 'filebeat', got %q", binary)
192+
}
193+
if componentID != "filestream-default" {
194+
t.Errorf("expecting fields['id'] = 'filestream-default', got %q", componentID)
195+
}
196+
} else {
197+
if binary != "filebeat" && binary != "metricbeat" {
198+
t.Errorf("expected monitoring compoent to be metricbeat or filebeat, got %s", binary)
199+
}
200+
if componentID != monitoringFilesUnitsID && componentID != "beat/metrics-monitoring" && componentID != "http/metrics-monitoring" {
201+
t.Errorf("got unxpected monitoring component ID: %s", componentID)
202+
}
194203
}
195204

196205
}

pkg/testing/tools/estools/elasticsearch.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport
222222
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
223223
}
224224

225-
return performQueryForRawQuery(ctx, queryRaw, indexPattern, client)
225+
return PerformQueryForRawQuery(ctx, queryRaw, indexPattern, client)
226226
}
227227

228228
// GetIndexTemplatesForPattern lists all index templates on the system
@@ -362,7 +362,7 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor
362362
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
363363
}
364364

365-
return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
365+
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
366366

367367
}
368368

@@ -434,7 +434,7 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor
434434
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
435435
}
436436

437-
return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
437+
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
438438
}
439439

440440
// GetLogsForDataset returns any logs associated with the datastream
@@ -525,7 +525,7 @@ func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.I
525525
},
526526
}
527527

528-
return performQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client)
528+
return PerformQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client)
529529
}
530530

531531
// GetLogsForIndexWithContext returns any logs that match the given condition
@@ -536,7 +536,7 @@ func GetLogsForIndexWithContext(ctx context.Context, client elastictransport.Int
536536
},
537537
}
538538

539-
return performQueryForRawQuery(ctx, indexQuery, index, client)
539+
return PerformQueryForRawQuery(ctx, indexQuery, index, client)
540540
}
541541

542542
// GetPing performs a basic ping and returns ES config info
@@ -561,7 +561,8 @@ func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, erro
561561

562562
}
563563

564-
func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) {
564+
// PerformQueryForRawQuery executes the ES query specified by queryRaw
565+
func PerformQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) {
565566
var buf bytes.Buffer
566567
err := json.NewEncoder(&buf).Encode(queryRaw)
567568
if err != nil {
@@ -576,6 +577,7 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{
576577
es.Search.WithTrackTotalHits(true),
577578
es.Search.WithPretty(),
578579
es.Search.WithContext(ctx),
580+
es.Search.WithSize(300),
579581
)
580582
if err != nil {
581583
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
@@ -613,7 +615,7 @@ func FindMatchingLogLinesForAgentWithContext(ctx context.Context, client elastic
613615
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
614616
}
615617

616-
return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
618+
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
617619
}
618620

619621
// GetLogsForDatastream returns any logs associated with the datastream

pkg/testing/tools/kibana.go

+32
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"net/http"
1212
"net/url"
13+
"os"
1314
"time"
1415

1516
"github.com/elastic/elastic-agent-libs/kibana"
@@ -89,3 +90,34 @@ func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, err
8990

9091
return dashboards, nil
9192
}
93+
94+
// InstallPackageFromDefaultFile allows for a test ideom where a JSON policy file can be loaded, and then updated with variables that are specific to a given test.
95+
// This can allow a single JSON policy file to be reused across multiple tests.
96+
// existingPolicyID should be the ID of an agent policy that was already created with InstallAgentWithPolicy()
97+
func InstallPackageFromDefaultFile(ctx context.Context, client *kibana.Client, packagePolicyName string, packageVersion string, policyJsonPath string, policyUUID string, existingPolicyID string) (kibana.PackagePolicyResponse, error) {
98+
installPackage := kibana.PackagePolicyRequest{}
99+
100+
jsonRaw, err := os.ReadFile(policyJsonPath)
101+
if err != nil {
102+
return kibana.PackagePolicyResponse{}, fmt.Errorf("error reading JSON policy file: %w", err)
103+
}
104+
105+
err = json.Unmarshal(jsonRaw, &installPackage)
106+
if err != nil {
107+
return kibana.PackagePolicyResponse{}, fmt.Errorf("error unmarshaling json: %w", err)
108+
}
109+
110+
installPackage.Package.Version = packageVersion
111+
installPackage.ID = policyUUID
112+
installPackage.PolicyID = existingPolicyID
113+
installPackage.Namespace = "default"
114+
installPackage.Name = fmt.Sprintf("%s-test-%s", packagePolicyName, policyUUID)
115+
installPackage.Vars = map[string]interface{}{}
116+
117+
resp, err := client.InstallFleetPackage(ctx, installPackage)
118+
if err != nil {
119+
return kibana.PackagePolicyResponse{}, fmt.Errorf("error installing fleet package: %w", err)
120+
}
121+
122+
return resp, nil
123+
}

testing/integration/agent_long_running_leak_test.go

+2-24
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ package integration
99
import (
1010
"context"
1111
"encoding/json"
12-
"fmt"
1312
"io"
1413
"net"
1514
"net/http"
@@ -122,33 +121,12 @@ func (runner *ExtendedRunner) SetupSuite() {
122121
policyResp, err := tools.InstallAgentWithPolicy(ctx, runner.T(), installOpts, runner.agentFixture, runner.info.KibanaClient, basePolicy)
123122
require.NoError(runner.T(), err)
124123

125-
// install system package
126-
runner.InstallPackage(ctx, "system", "1.53.1", "agent_long_test_base_system_integ.json", uuid.New().String(), policyResp.ID)
127-
128-
// install cef
129-
runner.InstallPackage(ctx, "apache", "1.17.0", "agent_long_test_apache.json", uuid.New().String(), policyResp.ID)
130-
131-
}
132-
133-
func (runner *ExtendedRunner) InstallPackage(ctx context.Context, name string, version string, cfgFile string, policyUUID string, policyID string) {
134-
installPackage := kibana.PackagePolicyRequest{}
135-
136-
jsonRaw, err := os.ReadFile(cfgFile)
124+
_, err = tools.InstallPackageFromDefaultFile(ctx, runner.info.KibanaClient, "system", "1.53.1", "agent_long_test_base_system_integ.json", uuid.New().String(), policyResp.ID)
137125
require.NoError(runner.T(), err)
138126

139-
err = json.Unmarshal(jsonRaw, &installPackage)
127+
_, err = tools.InstallPackageFromDefaultFile(ctx, runner.info.KibanaClient, "apache", "1.17.0", "agent_long_test_apache.json", uuid.New().String(), policyResp.ID)
140128
require.NoError(runner.T(), err)
141129

142-
installPackage.Package.Version = version
143-
installPackage.ID = policyUUID
144-
installPackage.PolicyID = policyID
145-
installPackage.Namespace = "default"
146-
installPackage.Name = fmt.Sprintf("%s-long-test-%s", name, policyUUID)
147-
installPackage.Vars = map[string]interface{}{}
148-
149-
runner.T().Logf("Installing %s package....", name)
150-
_, err = runner.info.KibanaClient.InstallFleetPackage(ctx, installPackage)
151-
require.NoError(runner.T(), err, "error creating fleet package")
152130
}
153131

154132
func (runner *ExtendedRunner) TestHandleLeak() {

0 commit comments

Comments
 (0)