Skip to content

Commit fe0d427

Browse files
authored
Implement basic version negotiation with Fleet Server (#3383)
* Initial implementation of elastic api version in roundtrip and fleet client * signal Fleet warning on elastic-agent status
1 parent 342cda7 commit fe0d427

File tree

12 files changed

+205
-33
lines changed

12 files changed

+205
-33
lines changed

.mockery.yaml

+1-5
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,4 @@ inpackage: False
22
testonly: False
33
with-expecter: True
44
keeptree: True
5-
case: underscore
6-
note: |
7-
Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
8-
// or more contributor license agreements. Licensed under the Elastic License;
9-
// you may not use this file except in compliance with the Elastic License.
5+
case: underscore

internal/pkg/agent/application/coordinator/coordinator.go

+4
Original file line numberDiff line numberDiff line change
@@ -855,8 +855,12 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
855855

856856
case configErr := <-c.managerChans.configManagerError:
857857
if c.isManaged {
858+
var wErr *WarningError
858859
if configErr == nil {
859860
c.setFleetState(agentclient.Healthy, "Connected")
861+
} else if errors.As(configErr, &wErr) {
862+
// we received a warning from Fleet, set state to degraded and the warning as state string
863+
c.setFleetState(agentclient.Degraded, wErr.Error())
860864
} else {
861865
c.setFleetState(agentclient.Failed, configErr.Error())
862866
}

internal/pkg/agent/application/coordinator/coordinator_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,20 @@ func TestCoordinator_State_ConfigError_Managed(t *testing.T) {
222222
return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected"
223223
}, 3*time.Second, 10*time.Millisecond)
224224

225+
// report a warning
226+
cfgMgr.ReportError(ctx, NewWarningError("some msg from Fleet"))
227+
assert.Eventually(t, func() bool {
228+
state := coord.State()
229+
return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Degraded && state.FleetMessage == "some msg from Fleet"
230+
}, 3*time.Second, 10*time.Millisecond)
231+
232+
// recover from warning error
233+
cfgMgr.ReportError(ctx, nil)
234+
assert.Eventually(t, func() bool {
235+
state := coord.State()
236+
return state.State == agentclient.Healthy && state.Message == "Running" && state.FleetState == agentclient.Healthy && state.FleetMessage == "Connected"
237+
}, 3*time.Second, 10*time.Millisecond)
238+
225239
cancel()
226240
err = <-coordCh
227241
require.NoError(t, err)

internal/pkg/agent/application/gateway/gateway.go internal/pkg/agent/application/coordinator/gateway.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// or more contributor license agreements. Licensed under the Elastic License;
33
// you may not use this file except in compliance with the Elastic License.
44

5-
package gateway
5+
package coordinator
66

77
import (
88
"context"
@@ -28,3 +28,16 @@ type FleetGateway interface {
2828
// SetClient sets the client for the gateway.
2929
SetClient(client.Sender)
3030
}
31+
32+
// WarningError is emitted when we receive a warning in the Fleet response
33+
type WarningError struct {
34+
msg string
35+
}
36+
37+
func (w WarningError) Error() string {
38+
return w.msg
39+
}
40+
41+
func NewWarningError(warningMsg string) *WarningError {
42+
return &WarningError{msg: warningMsg}
43+
}

internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

+18-14
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
1414
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
15-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
1615
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
1716
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
1817
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
@@ -66,7 +65,7 @@ type stateStore interface {
6665
Actions() []fleetapi.Action
6766
}
6867

69-
type fleetGateway struct {
68+
type FleetGateway struct {
7069
log *logger.Logger
7170
client client.Sender
7271
scheduler scheduler.Scheduler
@@ -89,7 +88,7 @@ func New(
8988
acker acker.Acker,
9089
stateFetcher func() coordinator.State,
9190
stateStore stateStore,
92-
) (gateway.FleetGateway, error) {
91+
) (*FleetGateway, error) {
9392

9493
scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
9594
return newFleetGatewayWithScheduler(
@@ -113,8 +112,8 @@ func newFleetGatewayWithScheduler(
113112
acker acker.Acker,
114113
stateFetcher func() coordinator.State,
115114
stateStore stateStore,
116-
) (gateway.FleetGateway, error) {
117-
return &fleetGateway{
115+
) (*FleetGateway, error) {
116+
return &FleetGateway{
118117
log: log,
119118
client: client,
120119
settings: settings,
@@ -128,11 +127,11 @@ func newFleetGatewayWithScheduler(
128127
}, nil
129128
}
130129

131-
func (f *fleetGateway) Actions() <-chan []fleetapi.Action {
130+
func (f *FleetGateway) Actions() <-chan []fleetapi.Action {
132131
return f.actionCh
133132
}
134133

135-
func (f *fleetGateway) Run(ctx context.Context) error {
134+
func (f *FleetGateway) Run(ctx context.Context) error {
136135
// Backoff implementation doesn't support the use of a context [cancellation] as the shutdown mechanism.
137136
// So we keep a done channel that will be closed when the current context is shutdown.
138137
done := make(chan struct{})
@@ -174,11 +173,11 @@ func (f *fleetGateway) Run(ctx context.Context) error {
174173
}
175174

176175
// Errors returns the channel to watch for reported errors.
177-
func (f *fleetGateway) Errors() <-chan error {
176+
func (f *FleetGateway) Errors() <-chan error {
178177
return f.errCh
179178
}
180179

181-
func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) {
180+
func (f *FleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*fleetapi.CheckinResponse, error) {
182181
bo.Reset()
183182

184183
// Guard if the context is stopped by a out of bound call,
@@ -222,7 +221,12 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
222221
}
223222

224223
f.checkinFailCounter = 0
225-
f.errCh <- nil
224+
if resp.FleetWarning != "" {
225+
f.errCh <- coordinator.NewWarningError(resp.FleetWarning)
226+
} else {
227+
f.errCh <- nil
228+
}
229+
226230
// Request was successful, return the collected actions.
227231
return resp, nil
228232
}
@@ -232,7 +236,7 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
232236
return nil, ctx.Err()
233237
}
234238

235-
func (f *fleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent {
239+
func (f *FleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent {
236240
if components == nil {
237241
return nil
238242
}
@@ -307,7 +311,7 @@ func (f *fleetGateway) convertToCheckinComponents(components []runtime.Component
307311
return checkinComponents
308312
}
309313

310-
func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
314+
func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, time.Duration, error) {
311315
ecsMeta, err := info.Metadata(ctx, f.log)
312316
if err != nil {
313317
f.log.Error(errors.New("failed to load metadata", err))
@@ -367,15 +371,15 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
367371
}
368372

369373
// shouldUnenroll checks if the max number of trying an invalid key is reached
370-
func (f *fleetGateway) shouldUnenroll() bool {
374+
func (f *FleetGateway) shouldUnenroll() bool {
371375
return f.unauthCounter > maxUnauthCounter
372376
}
373377

374378
func isUnauth(err error) bool {
375379
return errors.Is(err, client.ErrInvalidAPIKey)
376380
}
377381

378-
func (f *fleetGateway) SetClient(c client.Sender) {
382+
func (f *FleetGateway) SetClient(c client.Sender) {
379383
f.client = c
380384
}
381385

internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"gotest.tools/assert"
2424

2525
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
26-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
2726
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
2827
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
2928
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
@@ -70,7 +69,7 @@ func newTestingClient() *testingClient {
7069
return &testingClient{received: make(chan struct{}, 1)}
7170
}
7271

73-
type withGatewayFunc func(*testing.T, gateway.FleetGateway, *testingClient, *scheduler.Stepper)
72+
type withGatewayFunc func(*testing.T, coordinator.FleetGateway, *testingClient, *scheduler.Stepper)
7473

7574
func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) {
7675
return func(t *testing.T) {
@@ -128,7 +127,7 @@ func TestFleetGateway(t *testing.T) {
128127

129128
t.Run("send no event and receive no action", withGateway(agentInfo, settings, func(
130129
t *testing.T,
131-
gateway gateway.FleetGateway,
130+
gateway coordinator.FleetGateway,
132131
client *testingClient,
133132
scheduler *scheduler.Stepper,
134133
) {
@@ -160,7 +159,7 @@ func TestFleetGateway(t *testing.T) {
160159

161160
t.Run("Successfully connects and receives a series of actions", withGateway(agentInfo, settings, func(
162161
t *testing.T,
163-
gateway gateway.FleetGateway,
162+
gateway coordinator.FleetGateway,
164163
client *testingClient,
165164
scheduler *scheduler.Stepper,
166165
) {
@@ -321,7 +320,7 @@ func TestRetriesOnFailures(t *testing.T) {
321320
t.Run("When the gateway fails to communicate with the checkin API we will retry",
322321
withGateway(agentInfo, settings, func(
323322
t *testing.T,
324-
gateway gateway.FleetGateway,
323+
gateway coordinator.FleetGateway,
325324
client *testingClient,
326325
scheduler *scheduler.Stepper,
327326
) {
@@ -369,7 +368,7 @@ func TestRetriesOnFailures(t *testing.T) {
369368
Backoff: backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute},
370369
}, func(
371370
t *testing.T,
372-
gateway gateway.FleetGateway,
371+
gateway coordinator.FleetGateway,
373372
client *testingClient,
374373
scheduler *scheduler.Stepper,
375374
) {
@@ -404,7 +403,7 @@ func emptyStateFetcher() coordinator.State {
404403
return coordinator.State{}
405404
}
406405

407-
func runFleetGateway(ctx context.Context, g gateway.FleetGateway) <-chan error {
406+
func runFleetGateway(ctx context.Context, g coordinator.FleetGateway) <-chan error {
408407
done := make(chan bool)
409408
errCh := make(chan error, 1)
410409
go func() {

internal/pkg/agent/application/managed_mode.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers"
1414
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
1515
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
16-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
1716
fleetgateway "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet"
1817
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
1918
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
@@ -223,7 +222,7 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
223222
}
224223

225224
// runDispatcher passes actions collected from gateway to dispatcher or calls Dispatch with no actions every flushInterval.
226-
func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway gateway.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) {
225+
func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway coordinator.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) {
227226
t := time.NewTimer(flushInterval)
228227
for {
229228
select {

internal/pkg/fleetapi/checkin_cmd.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ func (e *CheckinRequest) Validate() error {
7777
// CheckinResponse is the response send back from the server which contains all the action that
7878
// need to be executed or proxy to running processes.
7979
type CheckinResponse struct {
80-
AckToken string `json:"ack_token"`
81-
Actions Actions `json:"actions"`
80+
AckToken string `json:"ack_token"`
81+
Actions Actions `json:"actions"`
82+
FleetWarning string `json:"-"`
8283
}
8384

8485
// Validate validates the response send from the server.
@@ -140,6 +141,7 @@ func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinRe
140141
}
141142

142143
checkinResponse := &CheckinResponse{}
144+
checkinResponse.FleetWarning = resp.Header.Get("Warning")
143145
decoder := json.NewDecoder(bytes.NewReader(rs))
144146
if err := decoder.Decode(checkinResponse); err != nil {
145147
return nil, sendDuration, errors.New(err,

internal/pkg/fleetapi/client/client.go

+6
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,14 @@ type Sender interface {
3434
URI() string
3535
}
3636

37+
// Default value for Elastic-Api-Version header when sending requests to Fleet (that's the only version we have at the time of writing)
38+
const defaultFleetApiVersion = "2023-06-01"
39+
3740
var baseRoundTrippers = func(rt http.RoundTripper) (http.RoundTripper, error) {
3841
rt = NewFleetUserAgentRoundTripper(rt, release.Version())
42+
43+
rt = NewElasticApiVersionRoundTripper(rt, defaultFleetApiVersion)
44+
3945
return rt, nil
4046
}
4147

0 commit comments

Comments
 (0)