From f3c2cc89521d2d3040c63c9ce796e1844efdd84f Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 22 Jun 2022 12:46:49 +0200 Subject: [PATCH 1/6] add config reload debouncer prevent multiple reloads in rapid succession. --- beater/beater.go | 66 ++++++++++++++++++++++++++++++++++++++-- beater/debouncer_test.go | 49 +++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 beater/debouncer_test.go diff --git a/beater/beater.go b/beater/beater.go index 98876e0eada..399dcf8bd45 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -221,6 +221,15 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * ctx, reload.ReloadableFunc(reloader.reloadOutput), ) }) + d := &debouncer{ + triggerc: make(chan chan error), + timeout: 500 * time.Millisecond, + fn: reloader.reload, + } + reloader.debouncer = d + g.Go(func() error { + return d.loop(ctx) + }) // Start the manager after all the hooks are initialized // and defined this ensure reloading consistency.. @@ -264,6 +273,59 @@ type reloader struct { outputConfig agentconfig.Namespace fleetConfig *config.Fleet runner *serverRunner + debouncer *debouncer +} + +type debouncer struct { + triggerc chan chan error + timeout time.Duration + fn func() error +} + +func (d *debouncer) trigger() chan error { + res := make(chan error, 1) + d.triggerc <- res + return res +} + +func (d *debouncer) loop(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case res := <-d.triggerc: + res <- d.debounce(ctx) + close(res) + } + } +} + +func (d *debouncer) debounce(ctx context.Context) (err error) { + t := time.NewTimer(d.timeout) + callers := []chan error{} + defer func() { + for _, res := range callers { + select { + case res <- err: + default: + } + close(res) + } + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + return d.fn() + case res := <-d.triggerc: + callers = append(callers, res) + if !t.Stop() { + <-t.C + } + t.Reset(d.timeout) + } + } } func (r *reloader) stop() { @@ -301,7 +363,7 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { } r.fleetConfig = &integrationConfig.Fleet r.mu.Unlock() - return r.reload() + return <-r.debouncer.trigger() } func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { @@ -314,7 +376,7 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { r.mu.Lock() r.outputConfig = outputConfig r.mu.Unlock() - return r.reload() + return <-r.debouncer.trigger() } func (r *reloader) reload() error { diff --git a/beater/debouncer_test.go b/beater/debouncer_test.go new file mode 100644 index 00000000000..85f8546adbb --- /dev/null +++ b/beater/debouncer_test.go @@ -0,0 +1,49 @@ +package beater + +import ( + "context" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +func TestDebouncer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fired := make(chan struct{}) + d := &debouncer{ + triggerc: make(chan chan error), + timeout: 50 * time.Millisecond, + fn: func() error { + close(fired) + return errors.New("err") + }, + } + + go d.loop(ctx) + + c1 := d.trigger() + select { + case <-fired: + t.Fatal("didn't debounce") + case <-time.After(30 * time.Millisecond): + } + c2 := d.trigger() + select { + case <-fired: + t.Fatal("didn't debounce") + case <-time.After(30 * time.Millisecond): + } + c3 := d.trigger() + select { + case <-fired: + case <-time.After(time.Second): + t.Fatal("fn did not fire") + } + + assert.Error(t, <-c1) + assert.Error(t, <-c2) + assert.Error(t, <-c3) +} From aca10fb1736323e43c458758e80b1a9e022a239d Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Thu, 23 Jun 2022 15:30:43 +0200 Subject: [PATCH 2/6] add license header --- beater/debouncer_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/beater/debouncer_test.go b/beater/debouncer_test.go index 85f8546adbb..9b61f99f5ac 100644 --- a/beater/debouncer_test.go +++ b/beater/debouncer_test.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package beater import ( From be8600d63a13fc38ab6aca9b5fa40df0711a0f8e Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Thu, 23 Jun 2022 15:55:08 +0200 Subject: [PATCH 3/6] Add comments explaining the debouncer Provide some context as to how the debouncer is working within the code. --- beater/beater.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/beater/beater.go b/beater/beater.go index 399dcf8bd45..3f362c771d5 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -221,6 +221,8 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * ctx, reload.ReloadableFunc(reloader.reloadOutput), ) }) + // Create a debouncer to limit the number of repeated calls to + // reloader.reload(). d := &debouncer{ triggerc: make(chan chan error), timeout: 500 * time.Millisecond, @@ -228,6 +230,8 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * } reloader.debouncer = d g.Go(func() error { + // Start the debouncer loop. This listens and debounces + // requests from reloader to reload the apm-server. return d.loop(ctx) }) @@ -276,18 +280,29 @@ type reloader struct { debouncer *debouncer } +// debouncer wraps a function fn that is to be debounced for the length of +// timeout. Additional calls to fire fn will reset the timer. After timeout +// elapses, fn is fired. type debouncer struct { triggerc chan chan error timeout time.Duration fn func() error } +// trigger sends a request to fire the function fn. a buffered channel which +// will contain the return value of fn is returned to the caller, which they +// are responsible for draining. func (d *debouncer) trigger() chan error { res := make(chan error, 1) d.triggerc <- res return res } +// loop waits for requests to fire the debounced function fn sent on channel +// triggerc. When the first request is received, it starts the debounce() +// method. While debounce() is active, all additional sends on triggerc will be +// received within that function. The return value of debounce() is sent to the +// buffered channel res. func (d *debouncer) loop(ctx context.Context) error { for { select { @@ -300,6 +315,11 @@ func (d *debouncer) loop(ctx context.Context) error { } } +// debounce debounces function fn, so that repeated calls to trigger() will +// only execute fn a single time after timeout has elapsed. Repeated calls to +// trigger() reset the timer. Additionally, each call to trigger() sends a chan +// error on triggerc; the return value of fn is sent to all callers once fn has +// executed. func (d *debouncer) debounce(ctx context.Context) (err error) { t := time.NewTimer(d.timeout) callers := []chan error{} @@ -363,6 +383,8 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { } r.fleetConfig = &integrationConfig.Fleet r.mu.Unlock() + // debouncer is wrapping r.reload(), ensuring that a rapid succession + // of config changes will only reload the apm-server once. return <-r.debouncer.trigger() } @@ -376,6 +398,8 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { r.mu.Lock() r.outputConfig = outputConfig r.mu.Unlock() + // debouncer is wrapping r.reload(), ensuring that a rapid succession + // of config changes will only reload the apm-server once. return <-r.debouncer.trigger() } From ce190b062db922658e52ad1a4e0093155071ec61 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Thu, 23 Jun 2022 16:13:10 +0200 Subject: [PATCH 4/6] returned chan err is receive only --- beater/beater.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beater/beater.go b/beater/beater.go index 3f362c771d5..1cb6497fe81 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -292,7 +292,7 @@ type debouncer struct { // trigger sends a request to fire the function fn. a buffered channel which // will contain the return value of fn is returned to the caller, which they // are responsible for draining. -func (d *debouncer) trigger() chan error { +func (d *debouncer) trigger() <-chan error { res := make(chan error, 1) d.triggerc <- res return res @@ -327,9 +327,9 @@ func (d *debouncer) debounce(ctx context.Context) (err error) { for _, res := range callers { select { case res <- err: + close(res) default: } - close(res) } }() for { From c3a7e9bd24cf2672327c1618bcb2310f44753ef3 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Thu, 23 Jun 2022 16:22:26 +0200 Subject: [PATCH 5/6] make chan usage explicit --- beater/beater.go | 6 +++--- beater/debouncer_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/beater/beater.go b/beater/beater.go index 1cb6497fe81..f248fce869e 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -224,7 +224,7 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * // Create a debouncer to limit the number of repeated calls to // reloader.reload(). d := &debouncer{ - triggerc: make(chan chan error), + triggerc: make(chan chan<- error), timeout: 500 * time.Millisecond, fn: reloader.reload, } @@ -284,7 +284,7 @@ type reloader struct { // timeout. Additional calls to fire fn will reset the timer. After timeout // elapses, fn is fired. type debouncer struct { - triggerc chan chan error + triggerc chan chan<- error timeout time.Duration fn func() error } @@ -322,7 +322,7 @@ func (d *debouncer) loop(ctx context.Context) error { // executed. func (d *debouncer) debounce(ctx context.Context) (err error) { t := time.NewTimer(d.timeout) - callers := []chan error{} + callers := []chan<- error{} defer func() { for _, res := range callers { select { diff --git a/beater/debouncer_test.go b/beater/debouncer_test.go index 9b61f99f5ac..1069b4ea73f 100644 --- a/beater/debouncer_test.go +++ b/beater/debouncer_test.go @@ -31,7 +31,7 @@ func TestDebouncer(t *testing.T) { defer cancel() fired := make(chan struct{}) d := &debouncer{ - triggerc: make(chan chan error), + triggerc: make(chan chan<- error), timeout: 50 * time.Millisecond, fn: func() error { close(fired) From 9cd7a12ab01c77fd633f988a760424895f3f1c7d Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Fri, 24 Jun 2022 06:15:17 +0200 Subject: [PATCH 6/6] merge loop() into trigger() start the debounce() method on the first call to trigger(), using an internal buffered channel to indicate that fn is currently being debounced. --- beater/beater.go | 44 +++++++++++++++++----------------------- beater/debouncer_test.go | 9 ++++---- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/beater/beater.go b/beater/beater.go index f248fce869e..f872446d540 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -224,16 +224,12 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * // Create a debouncer to limit the number of repeated calls to // reloader.reload(). d := &debouncer{ + active: make(chan struct{}, 1), triggerc: make(chan chan<- error), timeout: 500 * time.Millisecond, fn: reloader.reload, } reloader.debouncer = d - g.Go(func() error { - // Start the debouncer loop. This listens and debounces - // requests from reloader to reload the apm-server. - return d.loop(ctx) - }) // Start the manager after all the hooks are initialized // and defined this ensure reloading consistency.. @@ -287,34 +283,32 @@ type debouncer struct { triggerc chan chan<- error timeout time.Duration fn func() error + + // Used to show execution of fn is currently being debounced. Must be a + // buffered channel with length 1. + active chan struct{} } // trigger sends a request to fire the function fn. a buffered channel which // will contain the return value of fn is returned to the caller, which they // are responsible for draining. -func (d *debouncer) trigger() <-chan error { +func (d *debouncer) trigger(ctx context.Context) <-chan error { res := make(chan error, 1) + select { + case d.active <- struct{}{}: + // Update the internal state to show we're currently debouncing + // fn execution and start the debounce method. + go func() { + d.debounce(ctx) + // release lock on debouncing + <-d.active + }() + default: + } d.triggerc <- res return res } -// loop waits for requests to fire the debounced function fn sent on channel -// triggerc. When the first request is received, it starts the debounce() -// method. While debounce() is active, all additional sends on triggerc will be -// received within that function. The return value of debounce() is sent to the -// buffered channel res. -func (d *debouncer) loop(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case res := <-d.triggerc: - res <- d.debounce(ctx) - close(res) - } - } -} - // debounce debounces function fn, so that repeated calls to trigger() will // only execute fn a single time after timeout has elapsed. Repeated calls to // trigger() reset the timer. Additionally, each call to trigger() sends a chan @@ -385,7 +379,7 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { r.mu.Unlock() // debouncer is wrapping r.reload(), ensuring that a rapid succession // of config changes will only reload the apm-server once. - return <-r.debouncer.trigger() + return <-r.debouncer.trigger(r.runServerContext) } func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { @@ -400,7 +394,7 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { r.mu.Unlock() // debouncer is wrapping r.reload(), ensuring that a rapid succession // of config changes will only reload the apm-server once. - return <-r.debouncer.trigger() + return <-r.debouncer.trigger(r.runServerContext) } func (r *reloader) reload() error { diff --git a/beater/debouncer_test.go b/beater/debouncer_test.go index 1069b4ea73f..df50145814b 100644 --- a/beater/debouncer_test.go +++ b/beater/debouncer_test.go @@ -31,6 +31,7 @@ func TestDebouncer(t *testing.T) { defer cancel() fired := make(chan struct{}) d := &debouncer{ + active: make(chan struct{}, 1), triggerc: make(chan chan<- error), timeout: 50 * time.Millisecond, fn: func() error { @@ -39,21 +40,19 @@ func TestDebouncer(t *testing.T) { }, } - go d.loop(ctx) - - c1 := d.trigger() + c1 := d.trigger(ctx) select { case <-fired: t.Fatal("didn't debounce") case <-time.After(30 * time.Millisecond): } - c2 := d.trigger() + c2 := d.trigger(ctx) select { case <-fired: t.Fatal("didn't debounce") case <-time.After(30 * time.Millisecond): } - c3 := d.trigger() + c3 := d.trigger(ctx) select { case <-fired: case <-time.After(time.Second):