Skip to content

Commit 11d7418

Browse files
Adding more resilitent defaults to ES client (#4515)
--------- Co-authored-by: Mikołaj Świątek <mail@mikolajswiatek.com>
1 parent 2e760e1 commit 11d7418

11 files changed

+222
-27
lines changed

NOTICE.txt

+30
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,36 @@ IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
3939
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
4040

4141

42+
--------------------------------------------------------------------------------
43+
Dependency : github.com/cenkalti/backoff/v4
44+
Version: v4.3.0
45+
Licence type (autodetected): MIT
46+
--------------------------------------------------------------------------------
47+
48+
Contents of probable licence file $GOMODCACHE/github.com/cenkalti/backoff/v4@v4.3.0/LICENSE:
49+
50+
The MIT License (MIT)
51+
52+
Copyright (c) 2014 Cenk Altı
53+
54+
Permission is hereby granted, free of charge, to any person obtaining a copy of
55+
this software and associated documentation files (the "Software"), to deal in
56+
the Software without restriction, including without limitation the rights to
57+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
58+
the Software, and to permit persons to whom the Software is furnished to do so,
59+
subject to the following conditions:
60+
61+
The above copyright notice and this permission notice shall be included in all
62+
copies or substantial portions of the Software.
63+
64+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
65+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
66+
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
67+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
68+
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
69+
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
70+
71+
4272
--------------------------------------------------------------------------------
4373
Dependency : github.com/dgraph-io/ristretto
4474
Version: v0.2.0

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.24
44

55
require (
66
github.com/Pallinder/go-randomdata v1.2.0
7+
github.com/cenkalti/backoff/v4 v4.3.0
78
github.com/dgraph-io/ristretto v0.2.0
89
github.com/docker/go-units v0.5.0
910
github.com/elastic/elastic-agent-client/v7 v7.17.1

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgI
1010
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
1111
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
1212
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
13+
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
14+
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
1315
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
1416
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
1517
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=

internal/pkg/api/handleFileDelivery.go

+8-10
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,18 @@ import (
2020
)
2121

2222
type FileDeliveryT struct {
23-
bulker bulk.Bulk
24-
cache cache.Cache
25-
chunkClient *elasticsearch.Client
26-
deliverer *delivery.Deliverer
27-
authAgent func(*http.Request, *string, bulk.Bulk, cache.Cache) (*model.Agent, error) // injectable for testing purposes
23+
bulker bulk.Bulk
24+
cache cache.Cache
25+
deliverer *delivery.Deliverer
26+
authAgent func(*http.Request, *string, bulk.Bulk, cache.Cache) (*model.Agent, error) // injectable for testing purposes
2827
}
2928

3029
func NewFileDeliveryT(cfg *config.Server, bulker bulk.Bulk, chunkClient *elasticsearch.Client, cache cache.Cache) *FileDeliveryT {
3130
return &FileDeliveryT{
32-
chunkClient: chunkClient,
33-
bulker: bulker,
34-
cache: cache,
35-
deliverer: delivery.New(chunkClient, bulker, maxFileSize),
36-
authAgent: authAgent,
31+
bulker: bulker,
32+
cache: cache,
33+
deliverer: delivery.New(chunkClient, bulker, maxFileSize),
34+
authAgent: authAgent,
3735
}
3836
}
3937

internal/pkg/api/handleFileDelivery_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -403,10 +403,9 @@ func prepareFileDeliveryMock(t *testing.T) (http.Handler, apiServer, *MockTransp
403403

404404
si := apiServer{
405405
ft: &FileDeliveryT{
406-
bulker: fakebulk,
407-
chunkClient: mockES,
408-
cache: c,
409-
deliverer: delivery.New(mockES, fakebulk, maxFileSize),
406+
bulker: fakebulk,
407+
cache: c,
408+
deliverer: delivery.New(mockES, fakebulk, maxFileSize),
410409
authAgent: func(r *http.Request, id *string, bulker bulk.Bulk, c cache.Cache) (*model.Agent, error) {
411410
return &model.Agent{
412411
ESDocument: model.ESDocument{

internal/pkg/es/client.go

+88
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,57 @@ package es
66

77
import (
88
"context"
9+
"errors"
910
"fmt"
1011
"net/http"
1112
"runtime"
13+
"syscall"
14+
"time"
1215

1316
"go.elastic.co/apm/module/apmelasticsearch/v2"
1417

1518
"github.com/elastic/fleet-server/v7/internal/pkg/build"
1619
"github.com/elastic/fleet-server/v7/internal/pkg/config"
1720
"github.com/rs/zerolog"
1821

22+
backoff "github.com/cenkalti/backoff/v4"
1923
"github.com/elastic/go-elasticsearch/v8"
2024
)
2125

26+
const (
27+
initialRetryBackoff = 500 * time.Millisecond
28+
maxRetryBackoff = 10 * time.Second
29+
randomizationFactor = 0.5
30+
defaultMaxRetries = 5
31+
)
32+
2233
type ConfigOption func(config *elasticsearch.Config)
2334

35+
func applyDefaultOptions(escfg *elasticsearch.Config) {
36+
exp := backoff.NewExponentialBackOff()
37+
exp.InitialInterval = initialRetryBackoff
38+
exp.RandomizationFactor = randomizationFactor
39+
exp.MaxInterval = maxRetryBackoff
40+
41+
opts := []ConfigOption{
42+
WithRetryOnErrs(syscall.ECONNREFUSED, syscall.ECONNRESET), // server may be restarting
43+
44+
WithRetryOnStatus(http.StatusTooManyRequests),
45+
WithRetryOnStatus(http.StatusRequestTimeout),
46+
WithRetryOnStatus(http.StatusTooEarly),
47+
WithRetryOnStatus(http.StatusBadGateway),
48+
WithRetryOnStatus(http.StatusServiceUnavailable),
49+
WithRetryOnStatus(http.StatusGatewayTimeout),
50+
51+
WithBackoff(exp),
52+
WithMaxRetries(defaultMaxRetries),
53+
}
54+
55+
for _, opt := range opts {
56+
opt(escfg)
57+
}
58+
}
59+
2460
func NewClient(ctx context.Context, cfg *config.Config, longPoll bool, opts ...ConfigOption) (*elasticsearch.Client, error) {
2561
escfg, err := cfg.Output.Elasticsearch.ToESConfig(longPoll)
2662
if err != nil {
@@ -29,6 +65,9 @@ func NewClient(ctx context.Context, cfg *config.Config, longPoll bool, opts ...C
2965
addr := cfg.Output.Elasticsearch.Hosts
3066
mcph := cfg.Output.Elasticsearch.MaxConnPerHost
3167

68+
// apply default config
69+
applyDefaultOptions(&escfg)
70+
3271
// Apply configuration options
3372
for _, opt := range opts {
3473
opt(&escfg)
@@ -78,6 +117,55 @@ func InstrumentRoundTripper() ConfigOption {
78117
}
79118
}
80119

120+
func WithRetryOnErrs(errs ...error) ConfigOption {
121+
return func(config *elasticsearch.Config) {
122+
config.RetryOnError = func(_ *http.Request, err error) bool {
123+
for _, e := range errs {
124+
if errors.Is(err, e) {
125+
return true
126+
}
127+
}
128+
return false
129+
}
130+
}
131+
}
132+
133+
func WithMaxRetries(retries int) ConfigOption {
134+
return func(config *elasticsearch.Config) {
135+
config.MaxRetries = retries
136+
}
137+
}
138+
139+
func WithRetryOnStatus(status int) ConfigOption {
140+
return func(config *elasticsearch.Config) {
141+
for _, s := range config.RetryOnStatus {
142+
// check for duplicities
143+
if s == status {
144+
return
145+
}
146+
}
147+
148+
config.RetryOnStatus = append(config.RetryOnStatus, status)
149+
}
150+
}
151+
152+
func WithBackoff(exp *backoff.ExponentialBackOff) ConfigOption {
153+
return func(config *elasticsearch.Config) {
154+
if exp == nil {
155+
// no retry backoff
156+
config.RetryBackoff = nil
157+
return
158+
}
159+
160+
config.RetryBackoff = func(attempt int) time.Duration {
161+
if attempt == 1 {
162+
exp.Reset()
163+
}
164+
return exp.NextBackOff()
165+
}
166+
}
167+
}
168+
81169
func userAgent(name string, bi build.Info) string {
82170
return fmt.Sprintf("Elastic-%s/%s (%s; %s; %s; %s)",
83171
name,

internal/pkg/server/agent_integration_test.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestAgent(t *testing.T) {
6767
lg := testlog.SetLogger(t)
6868
zerolog.DefaultContextLogger = &lg
6969

70-
ctx, cancel := context.WithCancel(context.Background())
70+
ctx, cancel := context.WithCancel(t.Context())
7171
defer cancel()
7272
ctx = lg.WithContext(ctx)
7373

@@ -158,7 +158,7 @@ func TestAgent(t *testing.T) {
158158
return fmt.Errorf("should be reported as healthy; instead its %s", state)
159159
}
160160
return nil
161-
}, ftesting.RetrySleep(100*time.Millisecond), ftesting.RetryCount(120))
161+
}, ftesting.RetrySleep(100*time.Millisecond), ftesting.RetryCount(220))
162162

163163
assert.Equal(t, zerolog.InfoLevel, zerolog.GlobalLevel(), "expected log level info got: %s", zerolog.GlobalLevel())
164164

@@ -215,7 +215,7 @@ func TestAgent(t *testing.T) {
215215
return fmt.Errorf("should be reported as healthy; instead its %s", state)
216216
}
217217
return nil
218-
}, ftesting.RetrySleep(100*time.Millisecond), ftesting.RetryCount(120))
218+
}, ftesting.RetrySleep(100*time.Millisecond), ftesting.RetryCount(160))
219219
assert.Equal(t, zerolog.DebugLevel, zerolog.GlobalLevel(), "expected log level debug got: %s", zerolog.GlobalLevel())
220220

221221
t.Log("Test stop")
@@ -232,7 +232,7 @@ func TestAgent(t *testing.T) {
232232
return fmt.Errorf("should be reported as stopped; instead its %s", state)
233233
}
234234
return nil
235-
}, ftesting.RetrySleep(100*time.Millisecond), ftesting.RetryCount(120))
235+
}, ftesting.RetrySleep(100*time.Millisecond), ftesting.RetryCount(160))
236236

237237
// stop the agent and wait for go routine to exit
238238
cancel()
@@ -241,7 +241,7 @@ func TestAgent(t *testing.T) {
241241

242242
func TestAgentAPM(t *testing.T) {
243243
lg := testlog.SetLogger(t)
244-
ctx, cancel := context.WithCancel(context.Background())
244+
ctx, cancel := context.WithCancel(t.Context())
245245
defer cancel()
246246
ctx = lg.WithContext(ctx)
247247

@@ -469,7 +469,11 @@ func (s *StubV2Control) Start(opt ...grpc.ServerOption) error {
469469
if err != nil {
470470
return err
471471
}
472-
s.port = lis.Addr().(*net.TCPAddr).Port
472+
tcpAddr, ok := lis.Addr().(*net.TCPAddr)
473+
if !ok {
474+
return errors.New("failed to convert to *net.TCPAddr")
475+
}
476+
s.port = tcpAddr.Port
473477
srv := grpc.NewServer(opt...)
474478
s.server = srv
475479
proto.RegisterElasticAgentServer(s.server, s)

testing/e2e/scaffold/scaffold.go

+24-4
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func (s *Scaffold) IsFleetServerPortFree() bool {
155155
func (s *Scaffold) FleetServerStatusOK(ctx context.Context, url string) {
156156
s.FleetServerStatusCondition(ctx, url, func(resp *http.Response) bool {
157157
return resp.StatusCode == http.StatusOK
158-
})
158+
}, true)
159159
}
160160

161161
// FleetServerStatusIs will poll fleet-server's status endpoint every second and return when it returns the expected state.
@@ -172,18 +172,38 @@ func (s *Scaffold) FleetServerStatusIs(ctx context.Context, url string, state cl
172172
s.Require().NoError(err)
173173

174174
return status.Status == state.String()
175-
})
175+
}, true)
176+
}
177+
178+
// FleetServerStatusIs will poll fleet-server's status endpoint every second and return when it returns the expected state.
179+
// If the passed context terminates before a 200 is returned the current test will be marked as failed.
180+
func (s *Scaffold) FleetServerStatusNeverBecomes(ctx context.Context, url string, state client.UnitState) {
181+
s.FleetServerStatusCondition(ctx, url, func(resp *http.Response) bool {
182+
var status struct {
183+
Status string `json:"status"`
184+
}
185+
d, err := io.ReadAll(resp.Body)
186+
s.Require().NoError(err)
187+
188+
err = json.Unmarshal(d, &status)
189+
s.Require().NoError(err)
190+
191+
s.NotEqual(state.String(), status.Status)
192+
return false
193+
}, false)
176194
}
177195

178196
// FleetServerStatusCondition will poll fleet-server's status till the response satisfies the given
179197
// condition.
180198
// If the passed context terminates before, the current test will be marked as failed.
181-
func (s *Scaffold) FleetServerStatusCondition(ctx context.Context, url string, condition func(resp *http.Response) bool) {
199+
func (s *Scaffold) FleetServerStatusCondition(ctx context.Context, url string, condition func(resp *http.Response) bool, failOnDone bool) {
182200
timer := time.NewTimer(time.Second)
183201
for {
184202
select {
185203
case <-ctx.Done():
186-
s.Require().NoError(ctx.Err(), "context expired before status endpoint returned 200")
204+
if failOnDone {
205+
s.Require().NoError(ctx.Err(), "context expired before status endpoint returned 200")
206+
}
187207
return
188208
case <-timer.C:
189209
// ping /api/status

testing/e2e/stand_alone_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,59 @@ func (suite *StandAloneSuite) TestWithElasticsearchConnectionFailures() {
145145
cmd.Wait()
146146
}
147147

148+
// TestWithElasticsearchConnectionFlakyness checks the behaviour of stand alone Fleet Server
149+
// when Elasticsearch is not reachable portion of the time.
150+
func (suite *StandAloneSuite) TestWithElasticsearchConnectionFlakyness() {
151+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
152+
153+
proxy, err := suite.StartToxiproxy(ctx).CreateProxy("es", "localhost:0", suite.ESHosts)
154+
suite.Require().NoError(err)
155+
156+
// Create a config file from a template in the test temp dir
157+
dir := suite.T().TempDir()
158+
tpl, err := template.ParseFiles(filepath.Join("testdata", "stand-alone-http.tpl"))
159+
suite.Require().NoError(err)
160+
f, err := os.Create(filepath.Join(dir, "config.yml"))
161+
suite.Require().NoError(err)
162+
err = tpl.Execute(f, map[string]string{
163+
"Hosts": "http://" + proxy.Listen,
164+
"ServiceToken": suite.ServiceToken,
165+
})
166+
f.Close()
167+
suite.Require().NoError(err)
168+
169+
// Run the fleet-server binary
170+
cmd := exec.CommandContext(ctx, suite.binaryPath, "-c", filepath.Join(dir, "config.yml"))
171+
cmd.Cancel = func() error {
172+
return cmd.Process.Signal(syscall.SIGTERM)
173+
}
174+
cmd.Env = []string{"GOCOVERDIR=" + suite.CoverPath}
175+
err = cmd.Start()
176+
suite.Require().NoError(err)
177+
178+
// Wait to check that it is healthy.
179+
suite.FleetServerStatusIs(ctx, "http://localhost:8220", client.UnitStateHealthy)
180+
181+
// Provoke timeouts and wait for the healthcheck to fail.
182+
_, err = proxy.AddToxic("force_timeout", "timeout", "upstream", 0.4, toxiproxy.Attributes{}) // we have 5 retries, test with failure 4 out of 10 should be ok
183+
suite.Require().NoError(err)
184+
185+
// wait for unit state degraded
186+
timeoutCtx, tCancel := context.WithTimeout(ctx, 30*time.Second)
187+
suite.FleetServerStatusNeverBecomes(timeoutCtx, "http://localhost:8220", client.UnitStateDegraded)
188+
189+
// test should not fail at this point
190+
tCancel()
191+
192+
// Recover the network and wait for the healthcheck to be healthy again.
193+
err = proxy.RemoveToxic("force_timeout")
194+
suite.Require().NoError(err)
195+
suite.FleetServerStatusIs(ctx, "http://localhost:8220", client.UnitStateHealthy)
196+
197+
cancel()
198+
cmd.Wait()
199+
}
200+
148201
// TestWithSecretFiles tests starting an HTTPS server using a service-token file, public/private keys + passphrase file.
149202
func (suite *StandAloneSuite) TestWithSecretFiles() {
150203
// Create a service token file in the temp test dir

testing/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ require (
2525
github.com/Microsoft/go-winio v0.6.1 // indirect
2626
github.com/Microsoft/hcsshim v0.11.4 // indirect
2727
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
28-
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
28+
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
2929
github.com/containerd/containerd v1.7.15 // indirect
3030
github.com/containerd/log v0.1.0 // indirect
3131
github.com/cpuguy83/dockercfg v0.3.1 // indirect

0 commit comments

Comments
 (0)