Skip to content

Commit 7f83ddd

Browse files
authored
Revert "Add metrics-monitoring beats to resource monitoring (#4326)" (#4451)
This reverts commit 4594088.
1 parent e6001d8 commit 7f83ddd

File tree

8 files changed

+59
-1059
lines changed

8 files changed

+59
-1059
lines changed

changelog/fragments/1708628530-add-monitoring-beats-to-metrics.yaml

-32
This file was deleted.

internal/pkg/agent/application/monitoring/v1_monitor.go

+18-31
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,6 @@ const (
5151
defaultMonitoringNamespace = "default"
5252
agentName = "elastic-agent"
5353

54-
monitoringMetricsUnitID = "metrics-monitoring"
55-
monitoringFilesUnitsID = "filestream-monitoring"
56-
5754
windowsOS = "windows"
5855

5956
// metricset execution period used for the monitoring metrics inputs
@@ -304,7 +301,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
304301

305302
streams := []interface{}{
306303
map[string]interface{}{
307-
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
304+
idKey: "filestream-monitoring-agent",
308305
"type": "filestream",
309306
"paths": []interface{}{
310307
filepath.Join(logsDrop, agentName+"-*.ndjson"),
@@ -442,7 +439,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
442439
fixedBinaryName := strings.ReplaceAll(strings.ReplaceAll(comp.InputSpec.BinaryName, "-", "_"), "/", "_") // conform with index naming policy
443440
dataset := fmt.Sprintf("elastic_agent.%s", fixedBinaryName)
444441
streams = append(streams, map[string]interface{}{
445-
idKey: fmt.Sprintf("%s-%s", monitoringFilesUnitsID, comp.ID),
442+
idKey: fmt.Sprintf("filestream-monitoring-%s", comp.ID),
446443
"type": "filestream",
447444
"paths": []interface{}{
448445
comp.InputSpec.Spec.Service.Log.Path,
@@ -495,8 +492,8 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
495492

496493
inputs := []interface{}{
497494
map[string]interface{}{
498-
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
499-
"name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
495+
idKey: "filestream-monitoring-agent",
496+
"name": "filestream-monitoring-agent",
500497
"type": "filestream",
501498
useOutputKey: monitoringOutput,
502499
"streams": streams,
@@ -525,13 +522,14 @@ func (b *BeatsMonitor) monitoringNamespace() string {
525522
}
526523

527524
func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component) error {
525+
528526
metricsCollectionIntervalString := metricsCollectionInterval.String()
529527
monitoringNamespace := b.monitoringNamespace()
530528
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")
531529
beatsStreams := make([]interface{}, 0, len(componentIDToBinary))
532530
streams := []interface{}{
533531
map[string]interface{}{
534-
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
532+
idKey: "metrics-monitoring-agent",
535533
"data_stream": map[string]interface{}{
536534
"type": "metrics",
537535
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
@@ -608,18 +606,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
608606
},
609607
},
610608
}
611-
612-
//create a new map with the monitoring beats included
613-
componentListWithMonitoring := map[string]string{
614-
fmt.Sprintf("beat/%s", monitoringMetricsUnitID): "metricbeat",
615-
fmt.Sprintf("http/%s", monitoringMetricsUnitID): "metricbeat",
616-
monitoringFilesUnitsID: "filebeat",
617-
}
618-
for k, v := range componentIDToBinary {
619-
componentListWithMonitoring[k] = v
620-
}
621-
622-
for unit, binaryName := range componentListWithMonitoring {
609+
for unit, binaryName := range componentIDToBinary {
623610
if !isSupportedMetricsBinary(binaryName) {
624611
continue
625612
}
@@ -629,7 +616,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
629616

630617
if isSupportedBeatsBinary(binaryName) {
631618
beatsStreams = append(beatsStreams, map[string]interface{}{
632-
idKey: fmt.Sprintf("%s-", monitoringMetricsUnitID) + name,
619+
idKey: "metrics-monitoring-" + name,
633620
"data_stream": map[string]interface{}{
634621
"type": "metrics",
635622
"dataset": fmt.Sprintf("elastic_agent.%s", name),
@@ -691,7 +678,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
691678
}
692679

693680
streams = append(streams, map[string]interface{}{
694-
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
681+
idKey: "metrics-monitoring-" + name + "-1",
695682
"data_stream": map[string]interface{}{
696683
"type": "metrics",
697684
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
@@ -761,7 +748,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
761748
if strings.EqualFold(name, "filebeat") {
762749
fbDataStreamName := "filebeat_input"
763750
streams = append(streams, map[string]interface{}{
764-
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
751+
idKey: "metrics-monitoring-" + name + "-1",
765752
"data_stream": map[string]interface{}{
766753
"type": "metrics",
767754
"dataset": fmt.Sprintf("elastic_agent.%s", fbDataStreamName),
@@ -845,7 +832,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
845832
// note: this doesn't fetch anything from the /state endpoint, as it doesn't report much beyond name/version,
846833
// the equivalent of the beat /state metrics end up in /shipper
847834
shipperHTTPStreams = append(shipperHTTPStreams, map[string]interface{}{
848-
idKey: fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
835+
idKey: "metrics-monitoring-shipper",
849836
"data_stream": map[string]interface{}{
850837
"type": "metrics",
851838
"dataset": fmt.Sprintf("elastic_agent.%s", name),
@@ -859,7 +846,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
859846
"processors": createProcessorsForJSONInput(name, comp.ID, monitoringNamespace, b.agentInfo),
860847
},
861848
map[string]interface{}{
862-
idKey: fmt.Sprintf("%s-shipper-stats", monitoringMetricsUnitID),
849+
idKey: "metrics-monitoring-shipper-stats",
863850
"data_stream": map[string]interface{}{
864851
"type": "metrics",
865852
"dataset": fmt.Sprintf("elastic_agent.%s", name),
@@ -877,8 +864,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
877864

878865
inputs := []interface{}{
879866
map[string]interface{}{
880-
idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
881-
"name": fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
867+
idKey: "metrics-monitoring-beats",
868+
"name": "metrics-monitoring-beats",
882869
"type": "beat/metrics",
883870
useOutputKey: monitoringOutput,
884871
"data_stream": map[string]interface{}{
@@ -887,8 +874,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
887874
"streams": beatsStreams,
888875
},
889876
map[string]interface{}{
890-
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
891-
"name": fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
877+
idKey: "metrics-monitoring-agent",
878+
"name": "metrics-monitoring-agent",
892879
"type": "http/metrics",
893880
useOutputKey: monitoringOutput,
894881
"data_stream": map[string]interface{}{
@@ -901,8 +888,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
901888
// if we have shipper data, inject the extra inputs
902889
if len(shipperHTTPStreams) > 0 {
903890
inputs = append(inputs, map[string]interface{}{
904-
idKey: fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
905-
"name": fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
891+
idKey: "metrics-monitoring-shipper",
892+
"name": "metrics-monitoring-shipper",
906893
"type": "http/metrics",
907894
useOutputKey: monitoringOutput,
908895
"data_stream": map[string]interface{}{

internal/pkg/agent/application/monitoring/v1_monitor_test.go

+10-19
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,16 @@ func TestMonitoringConfigComponentFields(t *testing.T) {
165165
if _, exists := processor["add_fields"]; !exists {
166166
continue
167167
}
168-
streamProc := Processor{}
169-
if err := json.Unmarshal([]byte(mapstr.M(processor).String()), &streamProc); err != nil {
168+
p := Processor{}
169+
if err := json.Unmarshal([]byte(mapstr.M(processor).String()), &p); err != nil {
170170
t.Errorf("could not decode processor config: %q, err: %s", "foo", err)
171171
}
172-
if streamProc.AddFields.Target != "component" {
172+
if p.AddFields.Target != "component" {
173173
continue
174174
}
175175

176-
binary := streamProc.AddFields.Fields.Binary
177-
componentID := streamProc.AddFields.Fields.ID
176+
binary := p.AddFields.Fields.Binary
177+
componentID := p.AddFields.Fields.ID
178178

179179
// The elastic-Agent is a special case, handle it first
180180
if strings.Contains(streamID, "monitoring-agent") {
@@ -186,20 +186,11 @@ func TestMonitoringConfigComponentFields(t *testing.T) {
186186
}
187187
continue
188188
}
189-
if !strings.Contains(componentID, "monitoring") {
190-
if binary != "filebeat" {
191-
t.Errorf("expecting fields['binary'] = 'filebeat', got %q", binary)
192-
}
193-
if componentID != "filestream-default" {
194-
t.Errorf("expecting fields['id'] = 'filestream-default', got %q", componentID)
195-
}
196-
} else {
197-
if binary != "filebeat" && binary != "metricbeat" {
198-
t.Errorf("expected monitoring compoent to be metricbeat or filebeat, got %s", binary)
199-
}
200-
if componentID != monitoringFilesUnitsID && componentID != "beat/metrics-monitoring" && componentID != "http/metrics-monitoring" {
201-
t.Errorf("got unxpected monitoring component ID: %s", componentID)
202-
}
189+
if binary != "filebeat" {
190+
t.Errorf("expecting fields['binary'] = 'filebeat', got %q", binary)
191+
}
192+
if componentID != "filestream-default" {
193+
t.Errorf("expecting fields['id'] = 'filestream-default', got %q", componentID)
203194
}
204195

205196
}

pkg/testing/tools/estools/elasticsearch.go

+7-9
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport
222222
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
223223
}
224224

225-
return PerformQueryForRawQuery(ctx, queryRaw, indexPattern, client)
225+
return performQueryForRawQuery(ctx, queryRaw, indexPattern, client)
226226
}
227227

228228
// GetIndexTemplatesForPattern lists all index templates on the system
@@ -362,7 +362,7 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor
362362
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
363363
}
364364

365-
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
365+
return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
366366

367367
}
368368

@@ -434,7 +434,7 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor
434434
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
435435
}
436436

437-
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
437+
return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
438438
}
439439

440440
// GetLogsForDataset returns any logs associated with the datastream
@@ -525,7 +525,7 @@ func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.I
525525
},
526526
}
527527

528-
return PerformQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client)
528+
return performQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client)
529529
}
530530

531531
// GetLogsForIndexWithContext returns any logs that match the given condition
@@ -536,7 +536,7 @@ func GetLogsForIndexWithContext(ctx context.Context, client elastictransport.Int
536536
},
537537
}
538538

539-
return PerformQueryForRawQuery(ctx, indexQuery, index, client)
539+
return performQueryForRawQuery(ctx, indexQuery, index, client)
540540
}
541541

542542
// GetPing performs a basic ping and returns ES config info
@@ -561,8 +561,7 @@ func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, erro
561561

562562
}
563563

564-
// PerformQueryForRawQuery executes the ES query specified by queryRaw
565-
func PerformQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) {
564+
func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) {
566565
var buf bytes.Buffer
567566
err := json.NewEncoder(&buf).Encode(queryRaw)
568567
if err != nil {
@@ -577,7 +576,6 @@ func PerformQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{
577576
es.Search.WithTrackTotalHits(true),
578577
es.Search.WithPretty(),
579578
es.Search.WithContext(ctx),
580-
es.Search.WithSize(300),
581579
)
582580
if err != nil {
583581
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
@@ -615,7 +613,7 @@ func FindMatchingLogLinesForAgentWithContext(ctx context.Context, client elastic
615613
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
616614
}
617615

618-
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
616+
return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
619617
}
620618

621619
// GetLogsForDatastream returns any logs associated with the datastream

pkg/testing/tools/kibana.go

-32
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"fmt"
1111
"net/http"
1212
"net/url"
13-
"os"
1413
"time"
1514

1615
"github.com/elastic/elastic-agent-libs/kibana"
@@ -90,34 +89,3 @@ func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, err
9089

9190
return dashboards, nil
9291
}
93-
94-
// InstallPackageFromDefaultFile allows for a test ideom where a JSON policy file can be loaded, and then updated with variables that are specific to a given test.
95-
// This can allow a single JSON policy file to be reused across multiple tests.
96-
// existingPolicyID should be the ID of an agent policy that was already created with InstallAgentWithPolicy()
97-
func InstallPackageFromDefaultFile(ctx context.Context, client *kibana.Client, packagePolicyName string, packageVersion string, policyJsonPath string, policyUUID string, existingPolicyID string) (kibana.PackagePolicyResponse, error) {
98-
installPackage := kibana.PackagePolicyRequest{}
99-
100-
jsonRaw, err := os.ReadFile(policyJsonPath)
101-
if err != nil {
102-
return kibana.PackagePolicyResponse{}, fmt.Errorf("error reading JSON policy file: %w", err)
103-
}
104-
105-
err = json.Unmarshal(jsonRaw, &installPackage)
106-
if err != nil {
107-
return kibana.PackagePolicyResponse{}, fmt.Errorf("error unmarshaling json: %w", err)
108-
}
109-
110-
installPackage.Package.Version = packageVersion
111-
installPackage.ID = policyUUID
112-
installPackage.PolicyID = existingPolicyID
113-
installPackage.Namespace = "default"
114-
installPackage.Name = fmt.Sprintf("%s-test-%s", packagePolicyName, policyUUID)
115-
installPackage.Vars = map[string]interface{}{}
116-
117-
resp, err := client.InstallFleetPackage(ctx, installPackage)
118-
if err != nil {
119-
return kibana.PackagePolicyResponse{}, fmt.Errorf("error installing fleet package: %w", err)
120-
}
121-
122-
return resp, nil
123-
}

testing/integration/agent_long_running_leak_test.go

+24-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package integration
99
import (
1010
"context"
1111
"encoding/json"
12+
"fmt"
1213
"io"
1314
"net"
1415
"net/http"
@@ -121,12 +122,33 @@ func (runner *ExtendedRunner) SetupSuite() {
121122
policyResp, err := tools.InstallAgentWithPolicy(ctx, runner.T(), installOpts, runner.agentFixture, runner.info.KibanaClient, basePolicy)
122123
require.NoError(runner.T(), err)
123124

124-
_, err = tools.InstallPackageFromDefaultFile(ctx, runner.info.KibanaClient, "system", "1.53.1", "agent_long_test_base_system_integ.json", uuid.New().String(), policyResp.ID)
125+
// install system package
126+
runner.InstallPackage(ctx, "system", "1.53.1", "agent_long_test_base_system_integ.json", uuid.New().String(), policyResp.ID)
127+
128+
// install cef
129+
runner.InstallPackage(ctx, "apache", "1.17.0", "agent_long_test_apache.json", uuid.New().String(), policyResp.ID)
130+
131+
}
132+
133+
func (runner *ExtendedRunner) InstallPackage(ctx context.Context, name string, version string, cfgFile string, policyUUID string, policyID string) {
134+
installPackage := kibana.PackagePolicyRequest{}
135+
136+
jsonRaw, err := os.ReadFile(cfgFile)
125137
require.NoError(runner.T(), err)
126138

127-
_, err = tools.InstallPackageFromDefaultFile(ctx, runner.info.KibanaClient, "apache", "1.17.0", "agent_long_test_apache.json", uuid.New().String(), policyResp.ID)
139+
err = json.Unmarshal(jsonRaw, &installPackage)
128140
require.NoError(runner.T(), err)
129141

142+
installPackage.Package.Version = version
143+
installPackage.ID = policyUUID
144+
installPackage.PolicyID = policyID
145+
installPackage.Namespace = "default"
146+
installPackage.Name = fmt.Sprintf("%s-long-test-%s", name, policyUUID)
147+
installPackage.Vars = map[string]interface{}{}
148+
149+
runner.T().Logf("Installing %s package....", name)
150+
_, err = runner.info.KibanaClient.InstallFleetPackage(ctx, installPackage)
151+
require.NoError(runner.T(), err, "error creating fleet package")
130152
}
131153

132154
func (runner *ExtendedRunner) TestHandleLeak() {

0 commit comments

Comments
 (0)