diff --git a/beater/beater.go b/beater/beater.go index 98876e0eada..f872446d540 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -221,6 +221,15 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * ctx, reload.ReloadableFunc(reloader.reloadOutput), ) }) + // Create a debouncer to limit the number of repeated calls to + // reloader.reload(). + d := &debouncer{ + active: make(chan struct{}, 1), + triggerc: make(chan chan<- error), + timeout: 500 * time.Millisecond, + fn: reloader.reload, + } + reloader.debouncer = d // Start the manager after all the hooks are initialized // and defined this ensure reloading consistency.. @@ -264,6 +273,73 @@ type reloader struct { outputConfig agentconfig.Namespace fleetConfig *config.Fleet runner *serverRunner + debouncer *debouncer +} + +// debouncer wraps a function fn that is to be debounced for the length of +// timeout. Additional calls to fire fn will reset the timer. After timeout +// elapses, fn is fired. +type debouncer struct { + triggerc chan chan<- error + timeout time.Duration + fn func() error + + // Used to show execution of fn is currently being debounced. Must be a + // buffered channel with length 1. + active chan struct{} +} + +// trigger sends a request to fire the function fn. a buffered channel which +// will contain the return value of fn is returned to the caller, which they +// are responsible for draining. +func (d *debouncer) trigger(ctx context.Context) <-chan error { + res := make(chan error, 1) + select { + case d.active <- struct{}{}: + // Update the internal state to show we're currently debouncing + // fn execution and start the debounce method. + go func() { + d.debounce(ctx) + // release lock on debouncing + <-d.active + }() + default: + } + d.triggerc <- res + return res +} + +// debounce debounces function fn, so that repeated calls to trigger() will +// only execute fn a single time after timeout has elapsed. Repeated calls to +// trigger() reset the timer. Additionally, each call to trigger() sends a chan +// error on triggerc; the return value of fn is sent to all callers once fn has +// executed. +func (d *debouncer) debounce(ctx context.Context) (err error) { + t := time.NewTimer(d.timeout) + callers := []chan<- error{} + defer func() { + for _, res := range callers { + select { + case res <- err: + close(res) + default: + } + } + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + return d.fn() + case res := <-d.triggerc: + callers = append(callers, res) + if !t.Stop() { + <-t.C + } + t.Reset(d.timeout) + } + } } func (r *reloader) stop() { @@ -301,7 +377,9 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { } r.fleetConfig = &integrationConfig.Fleet r.mu.Unlock() - return r.reload() + // debouncer is wrapping r.reload(), ensuring that a rapid succession + // of config changes will only reload the apm-server once. + return <-r.debouncer.trigger(r.runServerContext) } func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { @@ -314,7 +392,9 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { r.mu.Lock() r.outputConfig = outputConfig r.mu.Unlock() - return r.reload() + // debouncer is wrapping r.reload(), ensuring that a rapid succession + // of config changes will only reload the apm-server once. + return <-r.debouncer.trigger(r.runServerContext) } func (r *reloader) reload() error { diff --git a/beater/debouncer_test.go b/beater/debouncer_test.go new file mode 100644 index 00000000000..df50145814b --- /dev/null +++ b/beater/debouncer_test.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "context" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +func TestDebouncer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fired := make(chan struct{}) + d := &debouncer{ + active: make(chan struct{}, 1), + triggerc: make(chan chan<- error), + timeout: 50 * time.Millisecond, + fn: func() error { + close(fired) + return errors.New("err") + }, + } + + c1 := d.trigger(ctx) + select { + case <-fired: + t.Fatal("didn't debounce") + case <-time.After(30 * time.Millisecond): + } + c2 := d.trigger(ctx) + select { + case <-fired: + t.Fatal("didn't debounce") + case <-time.After(30 * time.Millisecond): + } + c3 := d.trigger(ctx) + select { + case <-fired: + case <-time.After(time.Second): + t.Fatal("fn did not fire") + } + + assert.Error(t, <-c1) + assert.Error(t, <-c2) + assert.Error(t, <-c3) +}