Skip to content

Commit b579076

Browse files
committed
fix: update affiliate state correctly when they get deleted
This is client-side only fix, server side had no issues. Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
1 parent 49e53b1 commit b579076

File tree

2 files changed

+142
-1
lines changed

2 files changed

+142
-1
lines changed

pkg/client/client.go

+7
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,13 @@ func (client *Client) parseReply(logger *zap.Logger, reply watchReply) {
365365
continue
366366
}
367367

368+
if reply.resp.Deleted {
369+
// affiliate was deleted server-side
370+
delete(client.discoveredAffiliates, affiliate.Id)
371+
372+
continue
373+
}
374+
368375
if len(affiliate.Data) == 0 {
369376
// no affiliate data (yet?), skip it
370377
continue

pkg/client/client_test.go

+135-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func setupServer(t *testing.T) (address string) {
6262
return lis.Addr().String()
6363
}
6464

65-
//nolint:gocognit
65+
//nolint:gocognit,gocyclo,cyclop
6666
func TestClient(t *testing.T) {
6767
t.Parallel()
6868

@@ -232,6 +232,140 @@ func TestClient(t *testing.T) {
232232
}
233233
})
234234

235+
t.Run("AffiliateExpire", func(t *testing.T) {
236+
t.Parallel()
237+
238+
clusterID := "cluster_2"
239+
240+
key := make([]byte, 32)
241+
_, err := io.ReadFull(rand.Reader, key)
242+
require.NoError(t, err)
243+
244+
cipher, err := aes.NewCipher(key)
245+
require.NoError(t, err)
246+
247+
ctx, cancel := context.WithCancel(context.Background())
248+
defer cancel()
249+
250+
affiliate1 := "af_1"
251+
affiliate2 := "af_2"
252+
253+
client1, err := client.NewClient(client.Options{
254+
Cipher: cipher,
255+
Endpoint: endpoint,
256+
ClusterID: clusterID,
257+
AffiliateID: affiliate1,
258+
TTL: time.Second,
259+
Insecure: true,
260+
})
261+
require.NoError(t, err)
262+
263+
client2, err := client.NewClient(client.Options{
264+
Cipher: cipher,
265+
Endpoint: endpoint,
266+
ClusterID: clusterID,
267+
AffiliateID: affiliate2,
268+
TTL: time.Minute,
269+
Insecure: true,
270+
})
271+
require.NoError(t, err)
272+
273+
notify1 := make(chan struct{}, 1)
274+
notify2 := make(chan struct{}, 1)
275+
276+
eg, ctx := errgroup.WithContext(ctx)
277+
278+
ctx1, cancel1 := context.WithCancel(ctx)
279+
defer cancel1()
280+
281+
ctx2, cancel2 := context.WithCancel(ctx)
282+
defer cancel2()
283+
284+
eg.Go(func() error {
285+
return client1.Run(ctx1, logger, notify1)
286+
})
287+
288+
eg.Go(func() error {
289+
return client2.Run(ctx2, logger, notify2)
290+
})
291+
292+
select {
293+
case <-notify1:
294+
case <-time.After(2 * time.Second):
295+
require.Fail(t, "no initial snapshot update")
296+
}
297+
298+
assert.Empty(t, client1.GetAffiliates())
299+
300+
select {
301+
case <-notify2:
302+
case <-time.After(2 * time.Second):
303+
require.Fail(t, "no initial snapshot update")
304+
}
305+
306+
assert.Empty(t, client2.GetAffiliates())
307+
308+
// client1 publishes an affiliate with short TTL
309+
affiliate1PB := &client.Affiliate{
310+
Affiliate: &clientpb.Affiliate{
311+
NodeId: affiliate1,
312+
Addresses: [][]byte{{1, 2, 3}},
313+
Hostname: "host1",
314+
Nodename: "node1",
315+
MachineType: "controlplane",
316+
},
317+
}
318+
319+
require.NoError(t, client1.SetLocalData(affiliate1PB, nil))
320+
321+
// client2 should see the update from client1
322+
for {
323+
t.Logf("client2 affiliates = %d", len(client2.GetAffiliates()))
324+
325+
if len(client2.GetAffiliates()) == 1 {
326+
break
327+
}
328+
329+
select {
330+
case <-notify2:
331+
case <-time.After(2 * time.Second):
332+
t.Logf("client2 affiliates on timeout = %d", len(client2.GetAffiliates()))
333+
334+
require.Fail(t, "no incremental update")
335+
}
336+
}
337+
338+
require.Len(t, client2.GetAffiliates(), 1)
339+
340+
assert.Equal(t, []*client.Affiliate{affiliate1PB}, client2.GetAffiliates())
341+
342+
// stop client1
343+
cancel1()
344+
345+
for {
346+
t.Logf("client2 affiliates = %d", len(client2.GetAffiliates()))
347+
348+
if len(client2.GetAffiliates()) == 0 {
349+
break
350+
}
351+
352+
select {
353+
case <-notify2:
354+
case <-time.After(2 * time.Second):
355+
require.Fail(t, "no expiration")
356+
}
357+
}
358+
359+
require.Len(t, client2.GetAffiliates(), 0)
360+
361+
cancel()
362+
363+
err = eg.Wait()
364+
if err != nil && !errors.Is(err, context.Canceled) {
365+
assert.NoError(t, err)
366+
}
367+
})
368+
235369
t.Run("Cluster1", func(t *testing.T) {
236370
clusterSimulator(t, endpoint, logger, 5)
237371
})

0 commit comments

Comments
 (0)