@@ -6,6 +6,8 @@ package handlers
6
6
7
7
import (
8
8
"context"
9
+ "errors"
10
+ "sync/atomic"
9
11
"testing"
10
12
"time"
11
13
@@ -26,7 +28,19 @@ import (
26
28
)
27
29
28
30
type mockUpgradeManager struct {
31
+ << << << < HEAD
29
32
msgChan chan string
33
+ == == == =
34
+ UpgradeFn func (
35
+ ctx context.Context ,
36
+ version string ,
37
+ sourceURI string ,
38
+ action * fleetapi.ActionUpgrade ,
39
+ details * details.Details ,
40
+ skipVerifyOverride bool ,
41
+ skipDefaultPgp bool ,
42
+ pgpBytes ... string ) (reexec.ShutdownCallbackFn , error )
43
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
30
44
}
31
45
32
46
func (u * mockUpgradeManager ) Upgradeable () bool {
@@ -37,6 +51,7 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error {
37
51
return nil
38
52
}
39
53
54
+ << << << < HEAD
40
55
func (u * mockUpgradeManager ) Upgrade (ctx context.Context , version string , sourceURI string , action * fleetapi.ActionUpgrade , details * details.Details , skipVerifyOverride bool , skipDefaultPgp bool , pgpBytes ... string ) (_ reexec.ShutdownCallbackFn , err error ) {
41
56
select {
42
57
case <- time .After (2 * time .Second ):
@@ -46,6 +61,27 @@ func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, source
46
61
u .msgChan <- "canceled " + version
47
62
return nil , ctx .Err ()
48
63
}
64
+ == == == =
65
+ func (u * mockUpgradeManager ) Upgrade (
66
+ ctx context .Context ,
67
+ version string ,
68
+ sourceURI string ,
69
+ action * fleetapi .ActionUpgrade ,
70
+ details * details .Details ,
71
+ skipVerifyOverride bool ,
72
+ skipDefaultPgp bool ,
73
+ pgpBytes ... string ) (reexec .ShutdownCallbackFn , error ) {
74
+
75
+ return u .UpgradeFn (
76
+ ctx ,
77
+ version ,
78
+ sourceURI ,
79
+ action ,
80
+ details ,
81
+ skipVerifyOverride ,
82
+ skipDefaultPgp ,
83
+ pgpBytes ... )
84
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
49
85
}
50
86
51
87
func (u * mockUpgradeManager ) Ack (ctx context .Context , acker acker .Acker ) error {
@@ -65,7 +101,11 @@ func TestUpgradeHandler(t *testing.T) {
65
101
log , _ := logger .New ("" , false )
66
102
67
103
agentInfo := & info.AgentInfo {}
104
+ << << << < HEAD
68
105
msgChan := make (chan string )
106
+ == == == =
107
+ upgradeCalledChan := make (chan struct {})
108
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
69
109
70
110
// Create and start the coordinator
71
111
c := coordinator .New (
@@ -75,7 +115,25 @@ func TestUpgradeHandler(t *testing.T) {
75
115
agentInfo ,
76
116
component.RuntimeSpecs {},
77
117
nil ,
118
+ << << << < HEAD
78
119
& mockUpgradeManager {msgChan : msgChan },
120
+ == == == =
121
+ & mockUpgradeManager {
122
+ UpgradeFn : func (
123
+ ctx context.Context ,
124
+ version string ,
125
+ sourceURI string ,
126
+ action * fleetapi.ActionUpgrade ,
127
+ details * details.Details ,
128
+ skipVerifyOverride bool ,
129
+ skipDefaultPgp bool ,
130
+ pgpBytes ... string ) (reexec.ShutdownCallbackFn , error ) {
131
+
132
+ upgradeCalledChan <- struct {}{}
133
+ return nil , nil
134
+ },
135
+ },
136
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
79
137
nil , nil , nil , nil , nil , false )
80
138
//nolint:errcheck // We don't need the termination state of the Coordinator
81
139
go c .Run (ctx )
@@ -86,8 +144,13 @@ func TestUpgradeHandler(t *testing.T) {
86
144
ack := noopacker .New ()
87
145
err := u .Handle (ctx , & a , ack )
88
146
require .NoError (t , err )
89
- msg := <- msgChan
90
- require .Equal (t , "completed 8.3.0" , msg )
147
+
148
+ // Make sure this test does not dead lock or wait for too long
149
+ select {
150
+ case <- time .Tick (50 * time .Millisecond ):
151
+ t .Fatal ("mockUpgradeManager.Upgrade was not called" )
152
+ case <- upgradeCalledChan :
153
+ }
91
154
}
92
155
93
156
func TestUpgradeHandlerSameVersion (t * testing .T ) {
@@ -99,17 +162,45 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
99
162
log , _ := logger .New ("" , false )
100
163
101
164
agentInfo := & info.AgentInfo {}
165
+ << << << < HEAD
102
166
msgChan := make (chan string )
167
+ == == == =
168
+ upgradeCalledChan := make (chan struct {})
169
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
103
170
104
171
// Create and start the Coordinator
172
+ upgradeCalled := atomic.Bool {}
105
173
c := coordinator .New (
106
174
log ,
107
175
configuration .DefaultConfiguration (),
108
176
logger .DefaultLogLevel ,
109
177
agentInfo ,
110
178
component.RuntimeSpecs {},
111
179
nil ,
180
+ << << << < HEAD
112
181
& mockUpgradeManager {msgChan : msgChan },
182
+ == == == =
183
+ & mockUpgradeManager {
184
+ UpgradeFn : func (
185
+ ctx context.Context ,
186
+ version string ,
187
+ sourceURI string ,
188
+ action * fleetapi.ActionUpgrade ,
189
+ details * details.Details ,
190
+ skipVerifyOverride bool ,
191
+ skipDefaultPgp bool ,
192
+ pgpBytes ... string ) (reexec.ShutdownCallbackFn , error ) {
193
+
194
+ if upgradeCalled .CompareAndSwap (false , true ) {
195
+ upgradeCalledChan <- struct {}{}
196
+ return nil , nil
197
+ }
198
+ err := errors .New ("mockUpgradeManager.Upgrade called more than once" )
199
+ t .Error (err .Error ())
200
+ return nil , err
201
+ },
202
+ },
203
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
113
204
nil , nil , nil , nil , nil , false )
114
205
//nolint:errcheck // We don't need the termination state of the Coordinator
115
206
go c .Run (ctx )
@@ -122,8 +213,18 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
122
213
err2 := u .Handle (ctx , & a , ack )
123
214
require .NoError (t , err1 )
124
215
require .NoError (t , err2 )
216
+ << << << < HEAD
125
217
msg := <- msgChan
126
218
require .Equal (t , "completed 8.3.0" , msg )
219
+ == == == =
220
+
221
+ // Make sure this test does not dead lock or wait for too long
222
+ select {
223
+ case <- time .Tick (50 * time .Millisecond ):
224
+ t .Fatal ("mockUpgradeManager.Upgrade was not called" )
225
+ case <- upgradeCalledChan :
226
+ }
227
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
127
228
}
128
229
129
230
func TestUpgradeHandlerNewVersion (t * testing.T ) {
@@ -133,9 +234,13 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
133
234
defer cancel ()
134
235
135
236
log , _ := logger .New ("" , false )
237
+ upgradeCalledChan := make (chan string )
136
238
137
239
agentInfo := & info.AgentInfo {}
240
+ << << << < HEAD
138
241
msgChan := make (chan string )
242
+ == == == =
243
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
139
244
140
245
// Create and start the Coordinator
141
246
c := coordinator .New (
@@ -145,7 +250,31 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
145
250
agentInfo ,
146
251
component.RuntimeSpecs {},
147
252
nil ,
253
+ << << << < HEAD
148
254
& mockUpgradeManager {msgChan : msgChan },
255
+ == == == =
256
+ & mockUpgradeManager {
257
+ UpgradeFn : func (
258
+ ctx context.Context ,
259
+ version string ,
260
+ sourceURI string ,
261
+ action * fleetapi.ActionUpgrade ,
262
+ details * details.Details ,
263
+ skipVerifyOverride bool ,
264
+ skipDefaultPgp bool ,
265
+ pgpBytes ... string ) (reexec.ShutdownCallbackFn , error ) {
266
+
267
+ defer func () {
268
+ upgradeCalledChan <- version
269
+ }()
270
+ if version == "8.2.0" {
271
+ return nil , errors .New ("upgrade to 8.2.0 will always fail" )
272
+ }
273
+
274
+ return nil , nil
275
+ },
276
+ },
277
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
149
278
nil , nil , nil , nil , nil , false )
150
279
//nolint:errcheck // We don't need the termination state of the Coordinator
151
280
go c .Run (ctx )
@@ -156,13 +285,35 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
156
285
a2 := fleetapi.ActionUpgrade {Data : fleetapi.ActionUpgradeData {
157
286
Version : "8.5.0" , SourceURI : "http://localhost" }}
158
287
ack := noopacker .New ()
288
+
289
+ checkMsg := func (c <- chan string , expected , errMsg string ) {
290
+ t .Helper ()
291
+ // Make sure this test does not dead lock or wait for too long
292
+ // For some reason < 1s sometimes makes the test fail.
293
+ select {
294
+ case <- time .Tick (1300 * time .Millisecond ):
295
+ t .Fatal ("timed out waiting for Upgrade to return" )
296
+ case msg := <- c :
297
+ require .Equal (t , expected , msg , errMsg )
298
+ }
299
+ }
300
+
301
+ // Send both upgrade actions, a1 will error before a2 succeeds
159
302
err1 := u .Handle (ctx , & a1 , ack )
160
303
require .NoError (t , err1 )
304
+ << << << < HEAD
161
305
time .Sleep (1 * time .Second )
162
306
err2 := u .Handle (ctx , & a2 , ack )
163
307
require .NoError (t , err2 )
164
308
msg1 := <- msgChan
165
309
require .Equal (t , "canceled 8.2.0" , msg1 )
166
310
msg2 := <- msgChan
167
311
require .Equal (t , "completed 8.5.0" , msg2 )
312
+ == == == =
313
+ checkMsg (upgradeCalledChan , "8.2.0" , "first call must be with version 8.2.0" )
314
+
315
+ err2 := u .Handle (ctx , & a2 , ack )
316
+ require .NoError (t , err2 )
317
+ checkMsg (upgradeCalledChan , "8.5.0" , "second call to Upgrade must be with version 8.5.0" )
318
+ >> >> >> > 1242e7186 a ([Integration Test Framework ] fix createTempDir and flaky tests (#5409 ))
168
319
}
0 commit comments