Skip to content

Commit d600673

Browse files
authored
Refactor action policy change handler (#4563)
* Refactor PolicyChangeHandler.Handle() in validate, store, apply runtime steps * Save original handler config patched with new Fleet client config
1 parent 430dcfa commit d600673

File tree

2 files changed

+140
-72
lines changed

2 files changed

+140
-72
lines changed

internal/pkg/agent/application/actions/handlers/handler_action_policy_change.go

+133-65
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (h *PolicyChangeHandler) Handle(ctx context.Context, a fleetapi.Action, ack
104104
}
105105

106106
h.log.Debugf("handlerPolicyChange: emit configuration for action %+v", a)
107-
err = h.handleFleetServerHosts(ctx, c)
107+
err = h.handlePolicyChange(ctx, c)
108108
if err != nil {
109109
return err
110110
}
@@ -118,104 +118,172 @@ func (h *PolicyChangeHandler) Watch() <-chan coordinator.ConfigChange {
118118
return h.ch
119119
}
120120

121-
func (h *PolicyChangeHandler) handleFleetServerHosts(ctx context.Context, c *config.Config) (err error) {
121+
func (h *PolicyChangeHandler) validateFleetServerHosts(ctx context.Context, cfg *configuration.Configuration) (*remote.Config, error) {
122122
// do not update fleet-server host from policy; no setters provided with local Fleet Server
123123
if len(h.setters) == 0 {
124-
return nil
125-
}
126-
data, err := c.ToMapStr()
127-
if err != nil {
128-
return errors.New(err, "could not convert the configuration from the policy", errors.TypeConfig)
129-
}
130-
if _, ok := data["fleet"]; !ok {
131-
// no fleet information in the configuration (skip checking client)
132-
return nil
133-
}
134-
135-
cfg, err := configuration.NewFromConfig(c)
136-
if err != nil {
137-
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
124+
return nil, nil
138125
}
139126

140127
if clientEqual(h.config.Fleet.Client, cfg.Fleet.Client) {
141128
// already the same hosts
142-
return nil
129+
return nil, nil
143130
}
144131

145-
// only set protocol/hosts as that is all Fleet currently sends
146-
prevProtocol := h.config.Fleet.Client.Protocol
147-
prevPath := h.config.Fleet.Client.Path
148-
prevHost := h.config.Fleet.Client.Host
149-
prevHosts := h.config.Fleet.Client.Hosts
150-
prevProxy := h.config.Fleet.Client.Transport.Proxy
151-
h.config.Fleet.Client.Protocol = cfg.Fleet.Client.Protocol
152-
h.config.Fleet.Client.Path = cfg.Fleet.Client.Path
153-
h.config.Fleet.Client.Host = cfg.Fleet.Client.Host
154-
h.config.Fleet.Client.Hosts = cfg.Fleet.Client.Hosts
132+
// make a copy the current client config and apply the changes in place on this copy
133+
newFleetClientConfig := h.config.Fleet.Client
134+
updateFleetConfig(h.log, cfg.Fleet.Client, &newFleetClientConfig)
155135

156-
// Empty proxies from fleet are ignored. That way a proxy set by --proxy-url
157-
// it won't be overridden by an absent or empty proxy from fleet-server.
158-
// However, if there is a proxy sent by fleet-server, it'll take precedence.
159-
// Therefore, it's not possible to remove a proxy once it's set.
160-
if cfg.Fleet.Client.Transport.Proxy.URL == nil ||
161-
cfg.Fleet.Client.Transport.Proxy.URL.String() == "" {
162-
h.log.Debug("proxy from fleet is empty or null, the proxy will not be changed")
163-
} else {
164-
h.config.Fleet.Client.Transport.Proxy = cfg.Fleet.Client.Transport.Proxy
165-
h.log.Debug("received proxy from fleet, applying it")
166-
}
167-
168-
// rollback on failure
169-
defer func() {
170-
if err != nil {
171-
h.config.Fleet.Client.Protocol = prevProtocol
172-
h.config.Fleet.Client.Path = prevPath
173-
h.config.Fleet.Client.Host = prevHost
174-
h.config.Fleet.Client.Hosts = prevHosts
175-
h.config.Fleet.Client.Transport.Proxy = prevProxy
176-
}
177-
}()
136+
// Test new config
137+
err := testFleetConfig(ctx, h.log, newFleetClientConfig, h.config.Fleet.AccessAPIKey)
138+
if err != nil {
139+
return nil, fmt.Errorf("validating fleet client config: %w", err)
140+
}
178141

179-
client, err := client.NewAuthWithConfig(
180-
h.log, h.config.Fleet.AccessAPIKey, h.config.Fleet.Client)
142+
return &newFleetClientConfig, nil
143+
}
144+
145+
func testFleetConfig(ctx context.Context, log *logger.Logger, clientConfig remote.Config, apiKey string) error {
146+
fleetClient, err := client.NewAuthWithConfig(
147+
log, apiKey, clientConfig)
181148
if err != nil {
182149
return errors.New(
183150
err, "fail to create API client with updated config",
184151
errors.TypeConfig,
185152
errors.M("hosts", append(
186-
h.config.Fleet.Client.Hosts, h.config.Fleet.Client.Host)))
153+
clientConfig.Hosts, clientConfig.Host)))
187154
}
188155

189156
ctx, cancel := context.WithTimeout(ctx, apiStatusTimeout)
190157
defer cancel()
191158

192-
resp, err := client.Send(ctx, http.MethodGet, "/api/status", nil, nil, nil)
159+
// TODO: a HEAD should be enough as we need to test only the connectivity part
160+
resp, err := fleetClient.Send(ctx, http.MethodGet, "/api/status", nil, nil, nil)
193161
if err != nil {
194162
return errors.New(
195163
err, "fail to communicate with Fleet Server API client hosts",
196-
errors.TypeNetwork, errors.M("hosts", h.config.Fleet.Client.Hosts))
164+
errors.TypeNetwork, errors.M("hosts", clientConfig.Hosts))
197165
}
198166

199167
// discard body for proper cancellation and connection reuse
200168
_, _ = io.Copy(io.Discard, resp.Body)
201169
resp.Body.Close()
202170

203-
reader, err := fleetToReader(h.agentInfo, h.config)
171+
return nil
172+
}
173+
174+
// updateFleetConfig copies the relevant Fleet client settings from src on dst. The destination struct is modified in-place
175+
func updateFleetConfig(log *logger.Logger, src remote.Config, dst *remote.Config) {
176+
dst.Protocol = src.Protocol
177+
dst.Path = src.Path
178+
dst.Host = src.Host
179+
dst.Hosts = src.Hosts
180+
181+
// Empty proxies from fleet are ignored. That way a proxy set by --proxy-url
182+
// it won't be overridden by an absent or empty proxy from fleet-server.
183+
// However, if there is a proxy sent by fleet-server, it'll take precedence.
184+
// Therefore, it's not possible to remove a proxy once it's set.
185+
186+
if src.Transport.Proxy.URL == nil ||
187+
src.Transport.Proxy.URL.String() == "" {
188+
log.Debug("proxy from fleet is empty or null, the proxy will not be changed")
189+
} else {
190+
// copy the proxy struct
191+
dst.Transport.Proxy = src.Transport.Proxy
192+
193+
// replace in dst the attributes that are passed by reference within the proxy struct
194+
195+
// Headers map
196+
dst.Transport.Proxy.Headers = map[string]string{}
197+
for k, v := range src.Transport.Proxy.Headers {
198+
dst.Transport.Proxy.Headers[k] = v
199+
}
200+
201+
// Proxy URL
202+
urlCopy := *src.Transport.Proxy.URL
203+
dst.Transport.Proxy.URL = &urlCopy
204+
205+
log.Debug("received proxy from fleet, applying it")
206+
}
207+
}
208+
209+
func (h *PolicyChangeHandler) handlePolicyChange(ctx context.Context, c *config.Config) (err error) {
210+
cfg, err := configuration.NewFromConfig(c)
204211
if err != nil {
205-
return errors.New(
206-
err, "fail to persist new Fleet Server API client hosts",
207-
errors.TypeUnexpected, errors.M("hosts", h.config.Fleet.Client.Hosts))
212+
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
213+
}
214+
215+
// validate Fleet connectivity with the new configuration
216+
var validatedConfig *remote.Config
217+
validatedConfig, err = h.validateFleetServerHosts(ctx, cfg)
218+
if err != nil {
219+
return fmt.Errorf("error validating Fleet client config: %w", err)
220+
}
221+
222+
if validatedConfig != nil {
223+
// there's a change in the fleet client settings
224+
backupFleetClientCfg := h.config.Fleet.Client
225+
// rollback in case of error
226+
defer func() {
227+
if err != nil {
228+
h.config.Fleet.Client = backupFleetClientCfg
229+
}
230+
}()
231+
232+
// modify runtime handler config before saving
233+
h.config.Fleet.Client = *validatedConfig
234+
}
235+
236+
// persist configuration
237+
err = saveConfig(h.agentInfo, h.config, h.store)
238+
if err != nil {
239+
return fmt.Errorf("saving FleetClientConfig: %w", err)
240+
}
241+
242+
// apply the new configuration to the current clients
243+
err = h.applyFleetClientConfig(validatedConfig)
244+
if err != nil {
245+
return fmt.Errorf("applying FleetClientConfig: %w", err)
246+
}
247+
248+
return nil
249+
}
250+
251+
func (h *PolicyChangeHandler) applyFleetClientConfig(validatedConfig *remote.Config) error {
252+
if validatedConfig == nil || len(h.setters) == 0 {
253+
// nothing to do for fleet hosts
254+
return nil
208255
}
209256

210-
err = h.store.Save(reader)
257+
// the config has already been validated, no need for error handling
258+
fleetClient, err := client.NewAuthWithConfig(
259+
h.log, h.config.Fleet.AccessAPIKey, *validatedConfig)
260+
if err != nil {
261+
return fmt.Errorf("creating new fleet client with updated config: %w", err)
262+
}
263+
for _, setter := range h.setters {
264+
setter.SetClient(fleetClient)
265+
}
266+
267+
return nil
268+
}
269+
270+
func saveConfig(agentInfo info.Agent, validatedConfig *configuration.Configuration, store storage.Store) error {
271+
if validatedConfig == nil {
272+
// nothing to do for fleet hosts
273+
return nil
274+
}
275+
reader, err := fleetToReader(agentInfo.AgentID(), agentInfo.Headers(), validatedConfig)
211276
if err != nil {
212277
return errors.New(
213278
err, "fail to persist new Fleet Server API client hosts",
214-
errors.TypeFilesystem, errors.M("hosts", h.config.Fleet.Client.Hosts))
279+
errors.TypeUnexpected, errors.M("hosts", validatedConfig.Fleet.Client.Hosts))
215280
}
216281

217-
for _, setter := range h.setters {
218-
setter.SetClient(client)
282+
err = store.Save(reader)
283+
if err != nil {
284+
return errors.New(
285+
err, "fail to persist new Fleet Server API client hosts",
286+
errors.TypeFilesystem, errors.M("hosts", validatedConfig.Fleet.Client.Hosts))
219287
}
220288
return nil
221289
}
@@ -264,12 +332,12 @@ func clientEqual(k1 remote.Config, k2 remote.Config) bool {
264332
return true
265333
}
266334

267-
func fleetToReader(agentInfo info.Agent, cfg *configuration.Configuration) (io.Reader, error) {
335+
func fleetToReader(agentID string, headers map[string]string, cfg *configuration.Configuration) (io.Reader, error) {
268336
configToStore := map[string]interface{}{
269337
"fleet": cfg.Fleet,
270338
"agent": map[string]interface{}{
271-
"id": agentInfo.AgentID(),
272-
"headers": agentInfo.Headers(),
339+
"id": agentID,
340+
"headers": headers,
273341
"logging.level": cfg.Settings.LoggingConfig.Level,
274342
"monitoring.http": cfg.Settings.MonitoringConfig.HTTP,
275343
"monitoring.pprof": cfg.Settings.MonitoringConfig.Pprof,

internal/pkg/agent/application/actions/handlers/handler_action_policy_change_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestPolicyAcked(t *testing.T) {
9191
})
9292
}
9393

94-
func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
94+
func TestPolicyChangeHandler_handlePolicyChange_FleetClientSettings(t *testing.T) {
9595
mockProxy := httptest.NewServer(
9696
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
9797
_, err := w.Write(nil)
@@ -158,7 +158,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
158158
"fleet.proxy_url": "http://some.proxy",
159159
})
160160

161-
err := h.handleFleetServerHosts(context.Background(), cfg)
161+
err := h.handlePolicyChange(context.Background(), cfg)
162162
require.Error(t, err) // it needs to fail to rollback
163163

164164
assert.Equal(t, 0, setterCalledCount)
@@ -214,7 +214,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
214214
"fleet.proxy_url": "http://some.proxy",
215215
})
216216

217-
err := h.handleFleetServerHosts(context.Background(), cfg)
217+
err := h.handlePolicyChange(context.Background(), cfg)
218218
require.Error(t, err) // it needs to fail to rollback
219219

220220
assert.Equal(t, 0, setterCalledCount)
@@ -260,7 +260,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
260260
map[string]interface{}{
261261
"fleet.host": fleetServer.URL})
262262

263-
err := h.handleFleetServerHosts(context.Background(), cfg)
263+
err := h.handlePolicyChange(context.Background(), cfg)
264264
require.NoError(t, err)
265265

266266
assert.Equal(t, 1, setterCalledCount)
@@ -301,7 +301,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
301301
map[string]interface{}{
302302
"fleet.hosts": wantHosts})
303303

304-
err := h.handleFleetServerHosts(context.Background(), cfg)
304+
err := h.handlePolicyChange(context.Background(), cfg)
305305
require.NoError(t, err)
306306

307307
assert.Equal(t, 1, setterCalledCount)
@@ -347,7 +347,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
347347
"fleet.proxy_url": mockProxy.URL,
348348
"fleet.host": fleetServer.URL})
349349

350-
err := h.handleFleetServerHosts(context.Background(), cfg)
350+
err := h.handlePolicyChange(context.Background(), cfg)
351351
require.NoError(t, err)
352352

353353
assert.Equal(t, 1, setterCalledCount)
@@ -400,7 +400,7 @@ func TestPolicyChangeHandler_handleFleetServerHosts(t *testing.T) {
400400
"fleet.proxy_url": "",
401401
"fleet.host": fleetServer.URL})
402402

403-
err = h.handleFleetServerHosts(context.Background(), cfg)
403+
err = h.handlePolicyChange(context.Background(), cfg)
404404
require.NoError(t, err)
405405

406406
assert.Equal(t, 1, setterCalledCount)

0 commit comments

Comments
 (0)