Skip to content

Commit 87a5ca0

Browse files
committed
Fix flakiness in TestUpgradeHandler*
Fix the flakiness from TestUpgradeHandler* tests by re-working the mockUpgradeManager, now it accepts a function for its Upgrade method and their implementation is goroutine safe
1 parent a7befe4 commit 87a5ca0

File tree

1 file changed

+118
-43
lines changed

1 file changed

+118
-43
lines changed

internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go

+118-43
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ package handlers
77
import (
88
"context"
99
"errors"
10+
"sync/atomic"
1011
"testing"
12+
"time"
1113

1214
"github.com/stretchr/testify/require"
1315

@@ -27,9 +29,15 @@ import (
2729
)
2830

2931
type mockUpgradeManager struct {
30-
msgChan chan string
31-
successChan chan string
32-
errChan chan error
32+
UpgradeFn func(
33+
ctx context.Context,
34+
version string,
35+
sourceURI string,
36+
action *fleetapi.ActionUpgrade,
37+
details *details.Details,
38+
skipVerifyOverride bool,
39+
skipDefaultPgp bool,
40+
pgpBytes ...string) (reexec.ShutdownCallbackFn, error)
3341
}
3442

3543
func (u *mockUpgradeManager) Upgradeable() bool {
@@ -40,17 +48,25 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error {
4048
return nil
4149
}
4250

43-
func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
44-
select {
45-
case msg := <-u.successChan:
46-
u.msgChan <- msg
47-
return nil, nil
48-
case err := <-u.errChan:
49-
u.msgChan <- err.Error()
50-
return nil, ctx.Err()
51-
case <-ctx.Done():
52-
return nil, ctx.Err()
53-
}
51+
func (u *mockUpgradeManager) Upgrade(
52+
ctx context.Context,
53+
version string,
54+
sourceURI string,
55+
action *fleetapi.ActionUpgrade,
56+
details *details.Details,
57+
skipVerifyOverride bool,
58+
skipDefaultPgp bool,
59+
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {
60+
61+
return u.UpgradeFn(
62+
ctx,
63+
version,
64+
sourceURI,
65+
action,
66+
details,
67+
skipVerifyOverride,
68+
skipDefaultPgp,
69+
pgpBytes...)
5470
}
5571

5672
func (u *mockUpgradeManager) Ack(ctx context.Context, acker acker.Acker) error {
@@ -70,8 +86,7 @@ func TestUpgradeHandler(t *testing.T) {
7086
log, _ := logger.New("", false)
7187

7288
agentInfo := &info.AgentInfo{}
73-
msgChan := make(chan string)
74-
completedChan := make(chan string)
89+
upgradeCalledChan := make(chan struct{})
7590

7691
// Create and start the coordinator
7792
c := coordinator.New(
@@ -81,7 +96,21 @@ func TestUpgradeHandler(t *testing.T) {
8196
agentInfo,
8297
component.RuntimeSpecs{},
8398
nil,
84-
&mockUpgradeManager{msgChan: msgChan, successChan: completedChan},
99+
&mockUpgradeManager{
100+
UpgradeFn: func(
101+
ctx context.Context,
102+
version string,
103+
sourceURI string,
104+
action *fleetapi.ActionUpgrade,
105+
details *details.Details,
106+
skipVerifyOverride bool,
107+
skipDefaultPgp bool,
108+
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {
109+
110+
upgradeCalledChan <- struct{}{}
111+
return nil, nil
112+
},
113+
},
85114
nil, nil, nil, nil, nil, false)
86115
//nolint:errcheck // We don't need the termination state of the Coordinator
87116
go c.Run(ctx)
@@ -91,11 +120,14 @@ func TestUpgradeHandler(t *testing.T) {
91120
Version: "8.3.0", SourceURI: "http://localhost"}}
92121
ack := noopacker.New()
93122
err := u.Handle(ctx, &a, ack)
94-
// indicate that upgrade is completed
95-
close(completedChan)
96123
require.NoError(t, err)
97-
msg := <-msgChan
98-
require.Equal(t, "completed 8.3.0", msg)
124+
125+
// Make sure this test does not dead lock or wait for too long
126+
select {
127+
case <-time.Tick(50 * time.Millisecond):
128+
t.Fatal("mockUpgradeManager.Upgrade was not called")
129+
case <-upgradeCalledChan:
130+
}
99131
}
100132

101133
func TestUpgradeHandlerSameVersion(t *testing.T) {
@@ -109,19 +141,37 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
109141
log, _ := logger.New("", false)
110142

111143
agentInfo := &info.AgentInfo{}
112-
msgChan := make(chan string)
113-
successChan := make(chan string)
114-
errChan := make(chan error)
144+
upgradeCalledChan := make(chan struct{})
115145

116146
// Create and start the Coordinator
147+
upgradeCalled := atomic.Bool{}
117148
c := coordinator.New(
118149
log,
119150
configuration.DefaultConfiguration(),
120151
logger.DefaultLogLevel,
121152
agentInfo,
122153
component.RuntimeSpecs{},
123154
nil,
124-
&mockUpgradeManager{msgChan: msgChan, successChan: successChan, errChan: errChan},
155+
&mockUpgradeManager{
156+
UpgradeFn: func(
157+
ctx context.Context,
158+
version string,
159+
sourceURI string,
160+
action *fleetapi.ActionUpgrade,
161+
details *details.Details,
162+
skipVerifyOverride bool,
163+
skipDefaultPgp bool,
164+
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {
165+
166+
if upgradeCalled.CompareAndSwap(false, true) {
167+
upgradeCalledChan <- struct{}{}
168+
return nil, nil
169+
}
170+
err := errors.New("mockUpgradeManager.Upgrade called more than once")
171+
t.Error(err.Error())
172+
return nil, err
173+
},
174+
},
125175
nil, nil, nil, nil, nil, false)
126176
//nolint:errcheck // We don't need the termination state of the Coordinator
127177
go c.Run(ctx)
@@ -135,11 +185,12 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
135185
require.NoError(t, err1)
136186
require.NoError(t, err2)
137187

138-
successChan <- "completed 8.3.0"
139-
require.Equal(t, "completed 8.3.0", <-msgChan)
140-
errChan <- errors.New("duplicated update, not finishing it?")
141-
require.Equal(t, "duplicated update, not finishing it?", <-msgChan)
142-
188+
// Make sure this test does not dead lock or wait for too long
189+
select {
190+
case <-time.Tick(50 * time.Millisecond):
191+
t.Fatal("mockUpgradeManager.Upgrade was not called")
192+
case <-upgradeCalledChan:
193+
}
143194
}
144195

145196
func TestUpgradeHandlerNewVersion(t *testing.T) {
@@ -149,11 +200,9 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
149200
defer cancel()
150201

151202
log, _ := logger.New("", false)
203+
upgradeCalledChan := make(chan string)
152204

153205
agentInfo := &info.AgentInfo{}
154-
msgChan := make(chan string)
155-
completedChan := make(chan string)
156-
errorChan := make(chan error)
157206

158207
// Create and start the Coordinator
159208
c := coordinator.New(
@@ -163,7 +212,27 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
163212
agentInfo,
164213
component.RuntimeSpecs{},
165214
nil,
166-
&mockUpgradeManager{msgChan: msgChan, successChan: completedChan, errChan: errorChan},
215+
&mockUpgradeManager{
216+
UpgradeFn: func(
217+
ctx context.Context,
218+
version string,
219+
sourceURI string,
220+
action *fleetapi.ActionUpgrade,
221+
details *details.Details,
222+
skipVerifyOverride bool,
223+
skipDefaultPgp bool,
224+
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {
225+
226+
defer func() {
227+
upgradeCalledChan <- version
228+
}()
229+
if version == "8.2.0" {
230+
return nil, errors.New("upgrade to 8.2.0 will always fail")
231+
}
232+
233+
return nil, nil
234+
},
235+
},
167236
nil, nil, nil, nil, nil, false)
168237
//nolint:errcheck // We don't need the termination state of the Coordinator
169238
go c.Run(ctx)
@@ -175,18 +244,24 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
175244
Version: "8.5.0", SourceURI: "http://localhost"}}
176245
ack := noopacker.New()
177246

247+
checkMsg := func(c <-chan string, expected, errMsg string) {
248+
t.Helper()
249+
// Make sure this test does not dead lock or wait for too long
250+
// For some reason < 1s sometimes makes the test fail.
251+
select {
252+
case <-time.Tick(1300 * time.Millisecond):
253+
t.Fatal("timed out waiting for Upgrade to return")
254+
case msg := <-c:
255+
require.Equal(t, expected, msg, errMsg)
256+
}
257+
}
258+
178259
// Send both upgrade actions, a1 will error before a2 succeeds
179260
err1 := u.Handle(ctx, &a1, ack)
180261
require.NoError(t, err1)
262+
checkMsg(upgradeCalledChan, "8.2.0", "first call must be with version 8.2.0")
263+
181264
err2 := u.Handle(ctx, &a2, ack)
182265
require.NoError(t, err2)
183-
184-
// Send an error so the first action is "cancelled"
185-
errorChan <- errors.New("cancelled 8.2.0")
186-
// Wait for the mockUpgradeHandler to receive and "process" the error
187-
require.Equal(t, "cancelled 8.2.0", <-msgChan, "mockUpgradeHandler.Upgrade did not receive the expected error")
188-
189-
// Send a success so the second action succeeds
190-
completedChan <- "completed 8.5.0"
191-
require.Equal(t, "completed 8.5.0", <-msgChan, "mockUpgradeHandler.Upgrade did not receive the success signal")
266+
checkMsg(upgradeCalledChan, "8.5.0", "second call to Upgrade must be with version 8.5.0")
192267
}

0 commit comments

Comments
 (0)