Skip to content

Commit 729636a

Browse files
authored
Create non-blocking broadcaster helper and use it to manage Coordinator state notifications (#2849)
1 parent 6b6bb42 commit 729636a

37 files changed

+2722
-10273
lines changed

internal/pkg/agent/application/actions/handlers/handler_action_application.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"fmt"
1010
"time"
1111

12-
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator/state"
13-
1412
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1513

1614
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
@@ -171,7 +169,7 @@ func readMapString(m map[string]interface{}, key string, def string) string {
171169
return def
172170
}
173171

174-
func findUnitFromInputType(state state.State, inputType string) (component.Component, component.Unit, bool) {
172+
func findUnitFromInputType(state coordinator.State, inputType string) (component.Component, component.Unit, bool) {
175173
for _, comp := range state.Components {
176174
for _, unit := range comp.Component.Units {
177175
if unit.Type == client.UnitTypeInput && unit.Config != nil && unit.Config.Type == inputType {

internal/pkg/agent/application/actions/handlers/handler_action_upgrade.go

-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
5454
go func() {
5555
h.log.Infof("starting upgrade to version %s in background", action.Version)
5656
if err := h.coord.Upgrade(asyncCtx, action.Version, action.SourceURI, action, false); err != nil {
57-
5857
h.log.Errorf("upgrade to version %s failed: %v", action.Version, err)
5958
// If context is cancelled in getAsyncContext, the actions are acked there
6059
if !errors.Is(asyncCtx.Err(), context.Canceled) {

internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go

+64-21
Original file line numberDiff line numberDiff line change
@@ -51,59 +51,102 @@ func (u *mockUpgradeManager) Ack(ctx context.Context, acker acker.Acker) error {
5151
}
5252

5353
func TestUpgradeHandler(t *testing.T) {
54+
// Create a cancellable context that will shut down the coordinator after
55+
// the test.
56+
ctx, cancel := context.WithCancel(context.Background())
57+
defer cancel()
58+
5459
log, _ := logger.New("", false)
55-
ack := noopacker.New()
5660
agentInfo, _ := info.NewAgentInfo(true)
5761
msgChan := make(chan string)
58-
upgradeMgr := &mockUpgradeManager{msgChan: msgChan}
59-
specs := component.RuntimeSpecs{}
60-
c := coordinator.New(log, configuration.DefaultConfiguration(), logger.DefaultLogLevel, agentInfo, specs, nil, upgradeMgr, nil, nil, nil, nil, nil, false)
62+
63+
// Create and start the coordinator
64+
c := coordinator.New(
65+
log,
66+
configuration.DefaultConfiguration(),
67+
logger.DefaultLogLevel,
68+
agentInfo,
69+
component.RuntimeSpecs{},
70+
nil,
71+
&mockUpgradeManager{msgChan: msgChan},
72+
nil, nil, nil, nil, nil, false)
73+
//nolint:errcheck // We don't need the termination state of the Coordinator
74+
go c.Run(ctx)
75+
6176
u := NewUpgrade(log, c)
62-
ctx := context.Background()
6377
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
78+
ack := noopacker.New()
6479
err := u.Handle(ctx, &a, ack)
6580
require.NoError(t, err)
6681
msg := <-msgChan
6782
require.Equal(t, "completed 8.3.0", msg)
6883
}
6984

7085
func TestUpgradeHandlerSameVersion(t *testing.T) {
86+
// Create a cancellable context that will shut down the coordinator after
87+
// the test.
88+
ctx, cancel := context.WithCancel(context.Background())
89+
defer cancel()
90+
7191
log, _ := logger.New("", false)
72-
ack := noopacker.New()
7392
agentInfo, _ := info.NewAgentInfo(true)
7493
msgChan := make(chan string)
75-
upgradeMgr := &mockUpgradeManager{msgChan: msgChan}
76-
specs := component.RuntimeSpecs{}
77-
c := coordinator.New(log, configuration.DefaultConfiguration(), logger.DefaultLogLevel, agentInfo, specs, nil, upgradeMgr, nil, nil, nil, nil, nil, false)
94+
95+
// Create and start the Coordinator
96+
c := coordinator.New(
97+
log,
98+
configuration.DefaultConfiguration(),
99+
logger.DefaultLogLevel,
100+
agentInfo,
101+
component.RuntimeSpecs{},
102+
nil,
103+
&mockUpgradeManager{msgChan: msgChan},
104+
nil, nil, nil, nil, nil, false)
105+
//nolint:errcheck // We don't need the termination state of the Coordinator
106+
go c.Run(ctx)
107+
78108
u := NewUpgrade(log, c)
79-
ctx1 := context.Background()
80-
ctx2 := context.Background()
81109
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
82-
err1 := u.Handle(ctx1, &a, ack)
83-
err2 := u.Handle(ctx2, &a, ack)
110+
ack := noopacker.New()
111+
err1 := u.Handle(ctx, &a, ack)
112+
err2 := u.Handle(ctx, &a, ack)
84113
require.NoError(t, err1)
85114
require.NoError(t, err2)
86115
msg := <-msgChan
87116
require.Equal(t, "completed 8.3.0", msg)
88117
}
89118

90119
func TestUpgradeHandlerNewVersion(t *testing.T) {
120+
// Create a cancellable context that will shut down the coordinator after
121+
// the test.
122+
ctx, cancel := context.WithCancel(context.Background())
123+
defer cancel()
124+
91125
log, _ := logger.New("", false)
92-
ack := noopacker.New()
93126
agentInfo, _ := info.NewAgentInfo(true)
94127
msgChan := make(chan string)
95-
upgradeMgr := &mockUpgradeManager{msgChan: msgChan}
96-
specs := component.RuntimeSpecs{}
97-
c := coordinator.New(log, configuration.DefaultConfiguration(), logger.DefaultLogLevel, agentInfo, specs, nil, upgradeMgr, nil, nil, nil, nil, nil, false)
128+
129+
// Create and start the Coordinator
130+
c := coordinator.New(
131+
log,
132+
configuration.DefaultConfiguration(),
133+
logger.DefaultLogLevel,
134+
agentInfo,
135+
component.RuntimeSpecs{},
136+
nil,
137+
&mockUpgradeManager{msgChan: msgChan},
138+
nil, nil, nil, nil, nil, false)
139+
//nolint:errcheck // We don't need the termination state of the Coordinator
140+
go c.Run(ctx)
141+
98142
u := NewUpgrade(log, c)
99-
ctx1 := context.Background()
100-
ctx2 := context.Background()
101143
a1 := fleetapi.ActionUpgrade{Version: "8.2.0", SourceURI: "http://localhost"}
102144
a2 := fleetapi.ActionUpgrade{Version: "8.5.0", SourceURI: "http://localhost"}
103-
err1 := u.Handle(ctx1, &a1, ack)
145+
ack := noopacker.New()
146+
err1 := u.Handle(ctx, &a1, ack)
104147
require.NoError(t, err1)
105148
time.Sleep(1 * time.Second)
106-
err2 := u.Handle(ctx2, &a2, ack)
149+
err2 := u.Handle(ctx, &a2, ack)
107150
require.NoError(t, err2)
108151
msg1 := <-msgChan
109152
require.Equal(t, "canceled 8.2.0", msg1)

0 commit comments

Comments
 (0)