Skip to content

Commit 59facd6

Browse files
authored
Stop self monitor output health reporting if output config is not ack… (#3335)
* Stop self monitor output health reporting if output config is not acked by agents * updated changelog * added test to error scenario
1 parent 950eb52 commit 59facd6

File tree

5 files changed

+120
-3
lines changed

5 files changed

+120
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Self monitor stops output health reporting if output config is not acked by agents
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component:
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: 3335
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: 3334

internal/pkg/bulk/engine.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type Bulk interface {
7474
GetBulker(outputName string) Bulk
7575
GetBulkerMap() map[string]Bulk
7676
CancelFn() context.CancelFunc
77+
RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool
7778

7879
ReadSecrets(ctx context.Context, secretIds []string) (map[string]string, error)
7980
}
@@ -247,17 +248,25 @@ func (b *Bulker) Client() *elasticsearch.Client {
247248
return client
248249
}
249250

250-
// check if remote output cfg changed
251-
func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
251+
func (b *Bulker) RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
252252
curCfg := b.remoteOutputConfigMap[name]
253253

254254
hasChanged := false
255255

256256
// when output config first added, not reporting change
257257
if curCfg != nil && !reflect.DeepEqual(curCfg, newCfg) {
258-
zlog.Info().Str("name", name).Msg("remote output configuration has changed")
259258
hasChanged = true
260259
}
260+
return hasChanged
261+
}
262+
263+
// check if remote output cfg changed
264+
func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
265+
hasChanged := b.RemoteOutputConfigChanged(zlog, name, newCfg)
266+
if hasChanged {
267+
zlog.Debug().Str("name", name).Msg("remote output configuration has changed")
268+
}
269+
261270
newCfgCopy := make(map[string]interface{})
262271
for k, v := range newCfg {
263272
newCfgCopy[k] = v

internal/pkg/policy/self.go

+12
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,22 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
250250
return state, nil
251251
}
252252

253+
func isOutputCfgOutdated(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger, outputName string) bool {
254+
policy, err := dl.QueryOutputFromPolicy(ctx, bulker, outputName)
255+
if err != nil || policy == nil {
256+
return true
257+
}
258+
hasChanged := bulker.RemoteOutputConfigChanged(zlog, outputName, policy.Data.Outputs[outputName])
259+
return hasChanged
260+
}
261+
253262
func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger) {
254263
//pinging logic
255264
bulkerMap := bulker.GetBulkerMap()
256265
for outputName, outputBulker := range bulkerMap {
266+
if isOutputCfgOutdated(ctx, bulker, zlog, outputName) {
267+
continue
268+
}
257269
doc := model.OutputHealth{
258270
Output: outputName,
259271
State: client.UnitStateHealthy.String(),

internal/pkg/policy/self_test.go

+60
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,14 @@ func TestSelfMonitor_reportOutputHealthyState(t *testing.T) {
667667
}
668668
return doc.Message == "" && doc.State == client.UnitStateHealthy.String()
669669
}), mock.Anything).Return("", nil)
670+
bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return(
671+
&es.ResultT{
672+
HitsT: es.HitsT{
673+
Hits: []es.HitT{
674+
{Source: []byte(`{"data": {"outputs":{"remote":{"type":"remote_elasticsearch","hosts":["http://localhost:9200"]}}}}`)},
675+
},
676+
},
677+
}, nil)
670678

671679
reportOutputHealth(ctx, bulker, logger)
672680

@@ -696,6 +704,58 @@ func TestSelfMonitor_reportOutputDegradedState(t *testing.T) {
696704
}
697705
return doc.Message == "remote ES is not reachable due to error: error connecting" && doc.State == client.UnitStateDegraded.String()
698706
}), mock.Anything).Return("", nil)
707+
bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return(
708+
&es.ResultT{
709+
HitsT: es.HitsT{
710+
Hits: []es.HitT{
711+
{Source: []byte(`{"data": {"outputs":{"remote":{"type":"remote_elasticsearch","hosts":["http://localhost:9200"]}}}}`)},
712+
},
713+
},
714+
}, nil)
715+
716+
reportOutputHealth(ctx, bulker, logger)
717+
718+
bulker.AssertExpectations(t)
719+
outputBulker.AssertExpectations(t)
720+
}
721+
722+
func TestSelfMonitor_reportOutputSkipIfOutdated(t *testing.T) {
723+
ctx, cancel := context.WithCancel(context.Background())
724+
defer cancel()
725+
logger := testlog.SetLogger(t)
726+
727+
bulker := ftesting.NewMockBulk()
728+
bulkerMap := make(map[string]bulk.Bulk)
729+
outputBulker := ftesting.NewMockBulk()
730+
bulkerMap["outdated"] = outputBulker
731+
bulker.On("GetBulkerMap").Return(bulkerMap)
732+
bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return(
733+
&es.ResultT{
734+
HitsT: es.HitsT{
735+
Hits: []es.HitT{
736+
{Source: []byte(`{"data": {"outputs":{"outdated":{"type":"remote_elasticsearch","hosts":["http://localhost:9200"]}}}}`)},
737+
},
738+
},
739+
}, nil)
740+
741+
reportOutputHealth(ctx, bulker, logger)
742+
743+
bulker.AssertExpectations(t)
744+
outputBulker.AssertExpectations(t)
745+
}
746+
747+
func TestSelfMonitor_reportOutputSkipIfNotFound(t *testing.T) {
748+
ctx, cancel := context.WithCancel(context.Background())
749+
defer cancel()
750+
logger := testlog.SetLogger(t)
751+
752+
bulker := ftesting.NewMockBulk()
753+
bulkerMap := make(map[string]bulk.Bulk)
754+
outputBulker := ftesting.NewMockBulk()
755+
bulkerMap["outdated"] = outputBulker
756+
bulker.On("GetBulkerMap").Return(bulkerMap)
757+
bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return(
758+
&es.ResultT{}, errors.New("output not found"))
699759

700760
reportOutputHealth(ctx, bulker, logger)
701761

internal/pkg/testing/bulk.go

+4
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,8 @@ func (m *MockBulk) StartTransactionOptions(name, transactionType string, opts ap
148148
return nil
149149
}
150150

151+
func (m *MockBulk) RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
152+
return name == "outdated"
153+
}
154+
151155
var _ bulk.Bulk = (*MockBulk)(nil)

0 commit comments

Comments
 (0)