@@ -36,11 +36,15 @@ import (
36
36
"github.com/elastic/elastic-agent-libs/testing"
37
37
)
38
38
39
- // Expvar metric names.
40
39
const (
41
- successesKey = "success"
42
- failuresKey = "failures"
43
- eventsKey = "events"
40
+ // Expvar metric names.
41
+ successesKey = "success"
42
+ failuresKey = "failures"
43
+ eventsKey = "events"
44
+ consecutiveFailuresKey = "consecutive_failures"
45
+
46
+ // Failure threshold config key
47
+ failureThresholdKey = "failure_threshold"
44
48
)
45
49
46
50
var (
@@ -70,16 +74,18 @@ type metricSetWrapper struct {
70
74
module * Wrapper // Parent Module.
71
75
stats * stats // stats for this MetricSet.
72
76
73
- periodic bool // Set to true if this metricset is a periodic fetcher
77
+ periodic bool // Set to true if this metricset is a periodic fetcher
78
+ failureThreshold uint // threshold of consecutive errors needed to set the stream as degraded
74
79
}
75
80
76
81
// stats bundles common metricset stats.
77
82
type stats struct {
78
- key string // full stats key
79
- ref uint32 // number of modules/metricsets reusing stats instance
80
- success * monitoring.Int // Total success events.
81
- failures * monitoring.Int // Total error events.
82
- events * monitoring.Int // Total events published.
83
+ key string // full stats key
84
+ ref uint32 // number of modules/metricsets reusing stats instance
85
+ success * monitoring.Int // Total success events.
86
+ failures * monitoring.Int // Total error events.
87
+ events * monitoring.Int // Total events published.
88
+ consecutiveFailures * monitoring.Uint // Consecutive failures fetching this metricset
83
89
}
84
90
85
91
// NewWrapper creates a new module and its associated metricsets based on the given configuration.
@@ -106,11 +112,28 @@ func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Optio
106
112
applyOption (wrapper )
107
113
}
108
114
115
+ failureThreshold := uint (1 )
116
+
117
+ var streamHealthSettings struct {
118
+ FailureThreshold * uint `config:"failure_threshold"`
119
+ }
120
+
121
+ err := module .UnpackConfig (& streamHealthSettings )
122
+
123
+ if err != nil {
124
+ return nil , fmt .Errorf ("unpacking raw config: %w" , err )
125
+ }
126
+
127
+ if streamHealthSettings .FailureThreshold != nil {
128
+ failureThreshold = * streamHealthSettings .FailureThreshold
129
+ }
130
+
109
131
for i , metricSet := range metricSets {
110
132
wrapper .metricSets [i ] = & metricSetWrapper {
111
- MetricSet : metricSet ,
112
- module : wrapper ,
113
- stats : getMetricSetStats (wrapper .Name (), metricSet .Name ()),
133
+ MetricSet : metricSet ,
134
+ module : wrapper ,
135
+ stats : getMetricSetStats (wrapper .Name (), metricSet .Name ()),
136
+ failureThreshold : failureThreshold ,
114
137
}
115
138
}
116
139
return wrapper , nil
@@ -254,35 +277,11 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
254
277
case mb.ReportingMetricSetV2Error :
255
278
reporter .StartFetchTimer ()
256
279
err := fetcher .Fetch (reporter .V2 ())
257
- if err != nil {
258
- reporter .V2 ().Error (err )
259
- if errors .As (err , & mb.PartialMetricsError {}) {
260
- // mark module as running if metrics are partially available and display the error message
261
- msw .module .UpdateStatus (status .Running , fmt .Sprintf ("Error fetching data for metricset %s.%s: %v" , msw .module .Name (), msw .MetricSet .Name (), err ))
262
- } else {
263
- // mark it as degraded for any other issue encountered
264
- msw .module .UpdateStatus (status .Degraded , fmt .Sprintf ("Error fetching data for metricset %s.%s: %v" , msw .module .Name (), msw .MetricSet .Name (), err ))
265
- }
266
- logp .Err ("Error fetching data for metricset %s.%s: %s" , msw .module .Name (), msw .Name (), err )
267
- } else {
268
- msw .module .UpdateStatus (status .Running , "" )
269
- }
280
+ msw .handleFetchError (err , reporter .V2 ())
270
281
case mb.ReportingMetricSetV2WithContext :
271
282
reporter .StartFetchTimer ()
272
283
err := fetcher .Fetch (ctx , reporter .V2 ())
273
- if err != nil {
274
- reporter .V2 ().Error (err )
275
- if errors .As (err , & mb.PartialMetricsError {}) {
276
- // mark module as running if metrics are partially available and display the error message
277
- msw .module .UpdateStatus (status .Running , fmt .Sprintf ("Error fetching data for metricset %s.%s: %v" , msw .module .Name (), msw .MetricSet .Name (), err ))
278
- } else {
279
- // mark it as degraded for any other issue encountered
280
- msw .module .UpdateStatus (status .Degraded , fmt .Sprintf ("Error fetching data for metricset %s.%s: %v" , msw .module .Name (), msw .MetricSet .Name (), err ))
281
- }
282
- logp .Err ("Error fetching data for metricset %s.%s: %s" , msw .module .Name (), msw .Name (), err )
283
- } else {
284
- msw .module .UpdateStatus (status .Running , "" )
285
- }
284
+ msw .handleFetchError (err , reporter .V2 ())
286
285
default :
287
286
panic (fmt .Sprintf ("unexpected fetcher type for %v" , msw ))
288
287
}
@@ -311,6 +310,31 @@ func (msw *metricSetWrapper) Test(d testing.Driver) {
311
310
})
312
311
}
313
312
313
+ func (msw * metricSetWrapper ) handleFetchError (err error , reporter mb.PushReporterV2 ) {
314
+ switch {
315
+ case err == nil :
316
+ msw .stats .consecutiveFailures .Set (0 )
317
+ msw .module .UpdateStatus (status .Running , "" )
318
+
319
+ case errors .As (err , & mb.PartialMetricsError {}):
320
+ reporter .Error (err )
321
+ msw .stats .consecutiveFailures .Set (0 )
322
+ // mark module as running if metrics are partially available and display the error message
323
+ msw .module .UpdateStatus (status .Running , fmt .Sprintf ("Error fetching data for metricset %s.%s: %v" , msw .module .Name (), msw .MetricSet .Name (), err ))
324
+ logp .Err ("Error fetching data for metricset %s.%s: %s" , msw .module .Name (), msw .Name (), err )
325
+
326
+ default :
327
+ reporter .Error (err )
328
+ msw .stats .consecutiveFailures .Inc ()
329
+ if msw .failureThreshold > 0 && msw .stats .consecutiveFailures != nil && uint (msw .stats .consecutiveFailures .Get ()) >= msw .failureThreshold {
330
+ // mark it as degraded for any other issue encountered
331
+ msw .module .UpdateStatus (status .Degraded , fmt .Sprintf ("Error fetching data for metricset %s.%s: %v" , msw .module .Name (), msw .MetricSet .Name (), err ))
332
+ }
333
+ logp .Err ("Error fetching data for metricset %s.%s: %s" , msw .module .Name (), msw .Name (), err )
334
+
335
+ }
336
+ }
337
+
314
338
type reporter interface {
315
339
StartFetchTimer ()
316
340
V1 () mb.PushReporter //nolint:staticcheck // PushReporter is deprecated but not removed
@@ -437,11 +461,12 @@ func getMetricSetStats(module, name string) *stats {
437
461
438
462
reg := monitoring .Default .NewRegistry (key )
439
463
s := & stats {
440
- key : key ,
441
- ref : 1 ,
442
- success : monitoring .NewInt (reg , successesKey ),
443
- failures : monitoring .NewInt (reg , failuresKey ),
444
- events : monitoring .NewInt (reg , eventsKey ),
464
+ key : key ,
465
+ ref : 1 ,
466
+ success : monitoring .NewInt (reg , successesKey ),
467
+ failures : monitoring .NewInt (reg , failuresKey ),
468
+ events : monitoring .NewInt (reg , eventsKey ),
469
+ consecutiveFailures : monitoring .NewUint (reg , consecutiveFailuresKey ),
445
470
}
446
471
447
472
fetches [key ] = s
0 commit comments