Skip to content

Commit 2c8877e

Browse files
managed-agent will periodically call dispatcher.Dispatch (#2344)
* Allow fleet-gateway to return empty action lists * Change fix to periodically call the dispatcher * Fix comment * Fix changelog, add unit tests
1 parent 6617e74 commit 2c8877e

File tree

3 files changed

+187
-11
lines changed

3 files changed

+187
-11
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Periodically check for scheduled actions.
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
description: Add a timer that will periodically run the action handling
19+
mechanism that managed agents use to force it to run scheduled actions.
20+
This will allow the agent to handle previously scheduled actions if theres is
21+
no communication with fleet, or a checkin does not return any actions.
22+
23+
# Affected component; a word indicating the component this changeset affects.
24+
component: managed-agent
25+
26+
# PR number; optional; the PR number that added the changeset.
27+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
28+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
29+
# Please provide it if you are adding a fragment for a different PR.
30+
#pr: 1234
31+
32+
# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
33+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
34+
issue: 2343

internal/pkg/agent/application/managed_mode.go

+23-11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers"
1414
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
1515
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
16+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
1617
fleetgateway "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet"
1718
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
1819
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
@@ -21,6 +22,7 @@ import (
2122
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
2223
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
2324
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
25+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
2426
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/fleet"
2527
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/lazy"
2628
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/retrier"
@@ -33,6 +35,9 @@ import (
3335
"github.com/elastic/elastic-agent/pkg/core/logger"
3436
)
3537

38+
// dispatchFlushInterval is the max time between calls to dispatcher.Dispatch
39+
const dispatchFlushInterval = time.Minute * 5
40+
3641
type managedConfigManager struct {
3742
log *logger.Logger
3843
agentInfo *info.AgentInfo
@@ -207,22 +212,29 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
207212
return gateway.Run(ctx)
208213
})
209214

210-
// pass actions collected from gateway to dispatcher
211-
go func() {
212-
for {
213-
select {
214-
case <-ctx.Done():
215-
return
216-
case actions := <-gateway.Actions():
217-
m.dispatcher.Dispatch(ctx, actionAcker, actions...)
218-
}
219-
}
220-
}()
215+
go runDispatcher(ctx, m.dispatcher, gateway, actionAcker, dispatchFlushInterval)
221216

222217
<-ctx.Done()
223218
return gatewayRunner.Err()
224219
}
225220

221+
// runDispatcher passes actions collected from gateway to dispatcher or calls Dispatch with no actions every flushInterval.
222+
func runDispatcher(ctx context.Context, actionDispatcher dispatcher.Dispatcher, fleetGateway gateway.FleetGateway, actionAcker acker.Acker, flushInterval time.Duration) {
223+
t := time.NewTimer(flushInterval)
224+
for {
225+
select {
226+
case <-ctx.Done():
227+
return
228+
case <-t.C: // periodically call the dispatcher to handle scheduled actions.
229+
actionDispatcher.Dispatch(ctx, actionAcker)
230+
t.Reset(flushInterval)
231+
case actions := <-fleetGateway.Actions():
232+
actionDispatcher.Dispatch(ctx, actionAcker, actions...)
233+
t.Reset(flushInterval)
234+
}
235+
}
236+
}
237+
226238
// ActionErrors returns the error channel for actions.
227239
// May return errors for fleet managed errors.
228240
func (m *managedConfigManager) ActionErrors() <-chan error {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package application
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
9+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
10+
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/mock"
14+
)
15+
16+
type mockDispatcher struct {
17+
mock.Mock
18+
}
19+
20+
func (m *mockDispatcher) Dispatch(ctx context.Context, ack acker.Acker, actions ...fleetapi.Action) {
21+
m.Called(ctx, ack, actions)
22+
}
23+
24+
func (m *mockDispatcher) Errors() <-chan error {
25+
args := m.Called()
26+
return args.Get(0).(<-chan error)
27+
}
28+
29+
type mockGateway struct {
30+
mock.Mock
31+
}
32+
33+
func (m *mockGateway) Run(ctx context.Context) error {
34+
args := m.Called(ctx)
35+
return args.Error(0)
36+
}
37+
38+
func (m *mockGateway) Errors() <-chan error {
39+
args := m.Called()
40+
return args.Get(0).(<-chan error)
41+
}
42+
43+
func (m *mockGateway) Actions() <-chan []fleetapi.Action {
44+
args := m.Called()
45+
return args.Get(0).(<-chan []fleetapi.Action)
46+
}
47+
48+
func (m *mockGateway) SetClient(c client.Sender) {
49+
m.Called(c)
50+
}
51+
52+
type mockAcker struct {
53+
mock.Mock
54+
}
55+
56+
func (m *mockAcker) Ack(ctx context.Context, action fleetapi.Action) error {
57+
args := m.Called(ctx, action)
58+
return args.Error(0)
59+
}
60+
61+
func (m *mockAcker) Commit(ctx context.Context) error {
62+
args := m.Called(ctx)
63+
return args.Error(0)
64+
}
65+
66+
func Test_runDispatcher(t *testing.T) {
67+
tests := []struct {
68+
name string
69+
mockGateway func(chan []fleetapi.Action) *mockGateway
70+
mockDispatcher func() *mockDispatcher
71+
interval time.Duration
72+
}{{
73+
name: "dispatcher not called",
74+
mockGateway: func(ch chan []fleetapi.Action) *mockGateway {
75+
gateway := &mockGateway{}
76+
gateway.On("Actions").Return((<-chan []fleetapi.Action)(ch))
77+
return gateway
78+
},
79+
mockDispatcher: func() *mockDispatcher {
80+
dispatcher := &mockDispatcher{}
81+
return dispatcher
82+
},
83+
interval: time.Second,
84+
}, {
85+
name: "gateway actions passed",
86+
mockGateway: func(ch chan []fleetapi.Action) *mockGateway {
87+
ch <- []fleetapi.Action{&fleetapi.ActionUnknown{ActionID: "test"}}
88+
gateway := &mockGateway{}
89+
gateway.On("Actions").Return((<-chan []fleetapi.Action)(ch))
90+
return gateway
91+
},
92+
mockDispatcher: func() *mockDispatcher {
93+
dispatcher := &mockDispatcher{}
94+
dispatcher.On("Dispatch", mock.Anything, mock.Anything, mock.Anything).Once()
95+
return dispatcher
96+
},
97+
interval: time.Second,
98+
}, {
99+
name: "no gateway actions, dispatcher is flushed",
100+
mockGateway: func(ch chan []fleetapi.Action) *mockGateway {
101+
gateway := &mockGateway{}
102+
gateway.On("Actions").Return((<-chan []fleetapi.Action)(ch))
103+
return gateway
104+
},
105+
mockDispatcher: func() *mockDispatcher {
106+
dispatcher := &mockDispatcher{}
107+
dispatcher.On("Dispatch", mock.Anything, mock.Anything, mock.Anything).Once()
108+
return dispatcher
109+
},
110+
interval: time.Millisecond * 60,
111+
}}
112+
113+
for _, tc := range tests {
114+
t.Run(tc.name, func(t *testing.T) {
115+
ch := make(chan []fleetapi.Action, 1)
116+
gateway := tc.mockGateway(ch)
117+
dispatcher := tc.mockDispatcher()
118+
acker := &mockAcker{}
119+
120+
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
121+
defer cancel()
122+
runDispatcher(ctx, dispatcher, gateway, acker, tc.interval)
123+
assert.Empty(t, ch)
124+
125+
gateway.AssertExpectations(t)
126+
dispatcher.AssertExpectations(t)
127+
acker.AssertExpectations(t)
128+
})
129+
}
130+
}

0 commit comments

Comments
 (0)