Skip to content

Commit 8cffd34

Browse files
[8.x](backport #3143) cnvm: Delete old snapshots on startup (#3151)
cnvm: Delete old snapshots on startup (#3143) ### Summary of your changes Change CNVM to add a background job cleaning up old snapshots. The logic here is: 1. Background routine is started. Since the amount of leftover snapshots can be quite high (e.g. 40k in our account), 3 cleanup workers are used 2. `IterOwnedSnapshots()` is called which goes over all snapshots, returning an iterator. Snapshots are selected if: 1. They are more than 48 hours old 2. They have a tag with key "Name" and value starting with "elastic-vulnerability" 3. They are "self-owned" 3. Cleanup returns. On context cancelled, no extra grace period is added so the process doesn't block restarts/shutdown. ### Screenshot/Data Deleted 1178 snapshots from `2025-03-25T16:17:08.384Z` to `2025-03-25T16:20:19.504Z` ### Related Issues Closes #3105 Closes https://github.com/elastic/sdh-security-team/issues/1168 ### Checklist - [x] I have added tests that prove my fix is effective or that my feature works - [x] I have added the necessary README/documentation (if appropriate) #### Introducing a new rule? No (cherry picked from commit 8860b9d) Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
1 parent 1b6e026 commit 8cffd34

File tree

7 files changed

+326
-11
lines changed

7 files changed

+326
-11
lines changed

internal/resources/providers/awslib/ec2/ebs_snapshot.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package ec2
2020
import (
2121
"fmt"
2222

23+
"github.com/aws/aws-sdk-go-v2/aws"
2324
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
2425

2526
"github.com/elastic/cloudbeat/internal/resources/fetching"
@@ -56,8 +57,8 @@ func FromSnapshotInfo(snapshot types.SnapshotInfo, region string, awsAccount str
5657
State: snapshot.State,
5758
Region: region,
5859
awsAccount: awsAccount,
59-
VolumeSize: int(*snapshot.VolumeSize),
60-
IsEncrypted: *snapshot.Encrypted,
60+
VolumeSize: int(aws.ToInt32(snapshot.VolumeSize)),
61+
IsEncrypted: aws.ToBool(snapshot.Encrypted),
6162
}
6263
}
6364

@@ -67,8 +68,8 @@ func FromSnapshot(snapshot types.Snapshot, region string, awsAccount string, ins
6768
State: snapshot.State,
6869
Region: region,
6970
awsAccount: awsAccount,
70-
VolumeSize: int(*snapshot.VolumeSize),
71+
VolumeSize: int(aws.ToInt32(snapshot.VolumeSize)),
7172
Instance: ins,
72-
IsEncrypted: *snapshot.Encrypted,
73+
IsEncrypted: aws.ToBool(snapshot.Encrypted),
7374
}
7475
}

internal/resources/providers/awslib/ec2/provider.go

+70-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ package ec2
2020
import (
2121
"context"
2222
"fmt"
23+
"iter"
24+
"strings"
25+
"time"
2326

2427
"github.com/aws/aws-sdk-go-v2/aws"
2528
"github.com/aws/aws-sdk-go-v2/aws/retry"
@@ -37,12 +40,24 @@ var (
3740
subnetMainAssociationFilterName = "association.main"
3841
)
3942

43+
const (
44+
snapshotPrefix = "elastic-vulnerability"
45+
)
46+
4047
type Provider struct {
4148
log *clog.Logger
4249
clients map[string]Client
4350
awsAccountID string
4451
}
4552

53+
func NewProviderFromClients(log *clog.Logger, awsAccountID string, clients map[string]Client) *Provider {
54+
return &Provider{
55+
log: log,
56+
clients: clients,
57+
awsAccountID: awsAccountID,
58+
}
59+
}
60+
4661
type Client interface {
4762
CreateSnapshots(ctx context.Context, params *ec2.CreateSnapshotsInput, optFns ...func(*ec2.Options)) (*ec2.CreateSnapshotsOutput, error)
4863
DeleteSnapshot(ctx context.Context, params *ec2.DeleteSnapshotInput, optFns ...func(*ec2.Options)) (*ec2.DeleteSnapshotOutput, error)
@@ -78,7 +93,7 @@ func (p *Provider) CreateSnapshots(ctx context.Context, ins *Ec2Instance) ([]EBS
7893
{
7994
ResourceType: "snapshot",
8095
Tags: []types.Tag{
81-
{Key: aws.String("Name"), Value: aws.String(fmt.Sprintf("elastic-vulnerability-%s", *ins.InstanceId))},
96+
{Key: aws.String("Name"), Value: aws.String(fmt.Sprintf("%s-%s", snapshotPrefix, *ins.InstanceId))},
8297
{Key: aws.String("Workload"), Value: aws.String("Cloudbeat Vulnerability Snapshot")},
8398
},
8499
},
@@ -311,6 +326,60 @@ func (p *Provider) DescribeSnapshots(ctx context.Context, snapshot EBSSnapshot)
311326
return result, nil
312327
}
313328

329+
// IterOwnedSnapshots will iterate over the snapshots owned by cloudbeat (snapshotPrefix) that are older than the
330+
// specified before time. A snapshot will be yielded if:
331+
// - It has a tag with key "Name" and value starting with snapshotPrefix
332+
// - It is older than the specified before time
333+
// - It is "owned" by the current account (owner ID is "self")
334+
func (p *Provider) IterOwnedSnapshots(ctx context.Context, before time.Time) iter.Seq[EBSSnapshot] {
335+
return func(yield func(EBSSnapshot) bool) {
336+
_, err := awslib.MultiRegionFetch(ctx, p.clients, func(ctx context.Context, region string, c Client) ([]awslib.AwsResource, error) {
337+
input := &ec2.DescribeSnapshotsInput{
338+
Filters: []types.Filter{
339+
{
340+
Name: aws.String("tag:Name"),
341+
Values: []string{fmt.Sprintf("%s-*", snapshotPrefix)},
342+
},
343+
},
344+
OwnerIds: []string{"self"},
345+
}
346+
paginator := ec2.NewDescribeSnapshotsPaginator(c, input)
347+
for paginator.HasMorePages() {
348+
output, err := paginator.NextPage(ctx)
349+
if err != nil {
350+
return nil, err
351+
}
352+
for _, snap := range output.Snapshots {
353+
if filterSnap(snap, before) {
354+
p.log.Infof("Found old snapshot %s", *snap.SnapshotId)
355+
ebsSnap := FromSnapshot(snap, region, p.awsAccountID, Ec2Instance{})
356+
if !yield(ebsSnap) {
357+
return nil, nil
358+
}
359+
}
360+
}
361+
}
362+
return nil, nil
363+
})
364+
if err != nil {
365+
p.log.Errorf("Error listing owned snapshots: %v", err)
366+
}
367+
}
368+
}
369+
370+
func filterSnap(snap types.Snapshot, before time.Time) bool {
371+
if aws.ToTime(snap.StartTime).After(before) {
372+
return false
373+
}
374+
375+
for _, tag := range snap.Tags {
376+
if aws.ToString(tag.Key) == "Name" {
377+
return strings.HasPrefix(aws.ToString(tag.Value), snapshotPrefix)
378+
}
379+
}
380+
return false
381+
}
382+
314383
func (p *Provider) DescribeSubnets(ctx context.Context) ([]awslib.AwsResource, error) {
315384
subnets, err := awslib.MultiRegionFetch(ctx, p.clients, func(ctx context.Context, region string, c Client) ([]awslib.AwsResource, error) {
316385
input := &ec2.DescribeSubnetsInput{}

internal/vulnerability/mock_snapshot_creator_deleter.go

+53
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/vulnerability/snapshot.go

+70-5
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,23 @@ package vulnerability
1919

2020
import (
2121
"context"
22+
"iter"
2223
"sync"
2324
"time"
2425

2526
"github.com/elastic/cloudbeat/internal/infra/clog"
2627
"github.com/elastic/cloudbeat/internal/resources/providers/awslib/ec2"
2728
)
2829

30+
const (
31+
backgroundDeleteWorkers = 3
32+
backgroundDeleteTimeout = 2 * 24 * time.Hour
33+
)
34+
2935
type snapshotCreatorDeleter interface {
3036
CreateSnapshots(ctx context.Context, ins *ec2.Ec2Instance) ([]ec2.EBSSnapshot, error)
3137
DeleteSnapshot(ctx context.Context, snapshot ec2.EBSSnapshot) error
38+
IterOwnedSnapshots(ctx context.Context, before time.Time) iter.Seq[ec2.EBSSnapshot]
3239
}
3340

3441
type SnapshotManager struct {
@@ -64,7 +71,7 @@ func (s *SnapshotManager) CreateSnapshots(ctx context.Context, ins *ec2.Ec2Insta
6471

6572
func (s *SnapshotManager) DeleteSnapshot(ctx context.Context, snapshot ec2.EBSSnapshot) {
6673
runWithGrace(ctx, shutdownGracePeriod, func(ctx context.Context) {
67-
s.delete(ctx, snapshot)
74+
s.delete(ctx, snapshot, "DeleteSnapshot")
6875
})
6976

7077
s.lock.Lock()
@@ -82,17 +89,45 @@ func (s *SnapshotManager) Cleanup(ctx context.Context) {
8289
wg.Add(1)
8390
go func() {
8491
defer wg.Done()
85-
s.delete(ctx, snap)
92+
s.delete(ctx, snap, "Cleanup")
8693
}()
8794
}
8895
})
96+
clear(s.snapshots)
8997
}
9098

91-
func (s *SnapshotManager) delete(ctx context.Context, snapshot ec2.EBSSnapshot) {
92-
s.logger.Infof("VulnerabilityScanner.manager.DeleteSnapshot %s", snapshot.SnapshotId)
99+
func (s *SnapshotManager) DeleteOldSnapshots(ctx context.Context) {
100+
var wg sync.WaitGroup
101+
defer wg.Wait()
102+
103+
ch := newContextualChan[ec2.EBSSnapshot]()
104+
defer ch.Close()
105+
106+
wg.Add(backgroundDeleteWorkers)
107+
for range backgroundDeleteWorkers {
108+
go func() {
109+
defer wg.Done()
110+
for {
111+
snap, ok := ch.Read(ctx)
112+
if !ok {
113+
return
114+
}
115+
s.delete(ctx, snap, "DeleteOldSnapshots")
116+
}
117+
}()
118+
}
119+
for snapshot := range s.provider.IterOwnedSnapshots(ctx, time.Now().Add(-backgroundDeleteTimeout)) {
120+
if !ch.Write(ctx, snapshot) {
121+
return
122+
}
123+
}
124+
}
125+
126+
func (s *SnapshotManager) delete(ctx context.Context, snapshot ec2.EBSSnapshot, message string) {
127+
s.logger.Infof("VulnerabilityScanner.manager.%s %s", message, snapshot.SnapshotId)
93128
err := s.provider.DeleteSnapshot(ctx, snapshot)
94129
if err != nil {
95-
s.logger.Errorf("VulnerabilityScanner.manager.DeleteSnapshot %s error: %s", snapshot.SnapshotId, err)
130+
s.logger.Errorf("VulnerabilityScanner.manager.%s %s error: %s", message, snapshot.SnapshotId, err)
96131
}
97132
}
98133

@@ -109,3 +144,33 @@ func runWithGrace(ctx context.Context, grace time.Duration, f func(ctx context.C
109144
defer stop() // if the callback finishes in time, stop the AfterFunc
110145
f(newCtx) // finally, call the actual callback!
111146
}
147+
148+
type contextualChan[T any] struct {
149+
ch chan T
150+
}
151+
152+
func newContextualChan[T any]() contextualChan[T] {
153+
return contextualChan[T]{ch: make(chan T)}
154+
}
155+
156+
func (s contextualChan[T]) Write(ctx context.Context, t T) bool {
157+
select {
158+
case <-ctx.Done():
159+
return false
160+
case s.ch <- t:
161+
return true
162+
}
163+
}
164+
165+
func (s contextualChan[T]) Read(ctx context.Context) (T, bool) {
166+
select {
167+
case t, ok := <-s.ch:
168+
return t, ok
169+
case <-ctx.Done():
170+
return *new(T), false
171+
}
172+
}
173+
174+
func (s contextualChan[T]) Close() {
175+
close(s.ch)
176+
}

0 commit comments

Comments
 (0)