Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cnvm: Delete old snapshots on startup #3143

Merged
merged 8 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions internal/resources/providers/awslib/ec2/ebs_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package ec2
import (
"fmt"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"

"github.com/elastic/cloudbeat/internal/resources/fetching"
Expand Down Expand Up @@ -56,8 +57,8 @@ func FromSnapshotInfo(snapshot types.SnapshotInfo, region string, awsAccount str
State: snapshot.State,
Region: region,
awsAccount: awsAccount,
VolumeSize: int(*snapshot.VolumeSize),
IsEncrypted: *snapshot.Encrypted,
VolumeSize: int(aws.ToInt32(snapshot.VolumeSize)),
IsEncrypted: aws.ToBool(snapshot.Encrypted),
}
}

Expand All @@ -67,8 +68,8 @@ func FromSnapshot(snapshot types.Snapshot, region string, awsAccount string, ins
State: snapshot.State,
Region: region,
awsAccount: awsAccount,
VolumeSize: int(*snapshot.VolumeSize),
VolumeSize: int(aws.ToInt32(snapshot.VolumeSize)),
Instance: ins,
IsEncrypted: *snapshot.Encrypted,
IsEncrypted: aws.ToBool(snapshot.Encrypted),
}
}
71 changes: 70 additions & 1 deletion internal/resources/providers/awslib/ec2/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package ec2
import (
"context"
"fmt"
"iter"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
Expand All @@ -37,12 +40,24 @@ var (
subnetMainAssociationFilterName = "association.main"
)

const (
snapshotPrefix = "elastic-vulnerability"
)

type Provider struct {
log *clog.Logger
clients map[string]Client
awsAccountID string
}

func NewProviderFromClients(log *clog.Logger, awsAccountID string, clients map[string]Client) *Provider {
return &Provider{
log: log,
clients: clients,
awsAccountID: awsAccountID,
}
}

type Client interface {
CreateSnapshots(ctx context.Context, params *ec2.CreateSnapshotsInput, optFns ...func(*ec2.Options)) (*ec2.CreateSnapshotsOutput, error)
DeleteSnapshot(ctx context.Context, params *ec2.DeleteSnapshotInput, optFns ...func(*ec2.Options)) (*ec2.DeleteSnapshotOutput, error)
Expand Down Expand Up @@ -78,7 +93,7 @@ func (p *Provider) CreateSnapshots(ctx context.Context, ins *Ec2Instance) ([]EBS
{
ResourceType: "snapshot",
Tags: []types.Tag{
{Key: aws.String("Name"), Value: aws.String(fmt.Sprintf("elastic-vulnerability-%s", *ins.InstanceId))},
{Key: aws.String("Name"), Value: aws.String(fmt.Sprintf("%s-%s", snapshotPrefix, *ins.InstanceId))},
{Key: aws.String("Workload"), Value: aws.String("Cloudbeat Vulnerability Snapshot")},
},
},
Expand Down Expand Up @@ -311,6 +326,60 @@ func (p *Provider) DescribeSnapshots(ctx context.Context, snapshot EBSSnapshot)
return result, nil
}

// IterOwnedSnapshots will iterate over the snapshots owned by cloudbeat (snapshotPrefix) that are older than the
// specified before time. A snapshot will be yielded if:
// - It has a tag with key "Name" and value starting with snapshotPrefix
// - It is older than the specified before time
// - It is "owned" by the current account (owner ID is "self")
func (p *Provider) IterOwnedSnapshots(ctx context.Context, before time.Time) iter.Seq[EBSSnapshot] {
return func(yield func(EBSSnapshot) bool) {
_, err := awslib.MultiRegionFetch(ctx, p.clients, func(ctx context.Context, region string, c Client) ([]awslib.AwsResource, error) {
input := &ec2.DescribeSnapshotsInput{
Filters: []types.Filter{
{
Name: aws.String("tag:Name"),
Values: []string{fmt.Sprintf("%s-*", snapshotPrefix)},
},
},
OwnerIds: []string{"self"},
}
paginator := ec2.NewDescribeSnapshotsPaginator(c, input)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
return nil, err
}
for _, snap := range output.Snapshots {
if filterSnap(snap, before) {
p.log.Infof("Found old snapshot %s", *snap.SnapshotId)
ebsSnap := FromSnapshot(snap, region, p.awsAccountID, Ec2Instance{})
if !yield(ebsSnap) {
return nil, nil
}
}
}
}
return nil, nil
})
if err != nil {
p.log.Errorf("Error listing owned snapshots: %v", err)
}
}
}

func filterSnap(snap types.Snapshot, before time.Time) bool {
if aws.ToTime(snap.StartTime).After(before) {
return false
}

for _, tag := range snap.Tags {
if aws.ToString(tag.Key) == "Name" {
return strings.HasPrefix(aws.ToString(tag.Value), snapshotPrefix)
}
}
Comment on lines +375 to +379
Copy link
Member

@romulets romulets Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are already querying already the snapshots here:

                    Filters: []types.Filter{
					{
						Name:   aws.String("tag:Name"),
						Values: []string{fmt.Sprintf("%s-*", snapshotPrefix)},
					},
				},

Do we need this piece of filtering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to filter for time anyway so the name check is there just as a sanity check.

return false
}

func (p *Provider) DescribeSubnets(ctx context.Context) ([]awslib.AwsResource, error) {
subnets, err := awslib.MultiRegionFetch(ctx, p.clients, func(ctx context.Context, region string, c Client) ([]awslib.AwsResource, error) {
input := &ec2.DescribeSubnetsInput{}
Expand Down
53 changes: 53 additions & 0 deletions internal/vulnerability/mock_snapshot_creator_deleter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 70 additions & 5 deletions internal/vulnerability/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@ package vulnerability

import (
"context"
"iter"
"sync"
"time"

"github.com/elastic/cloudbeat/internal/infra/clog"
"github.com/elastic/cloudbeat/internal/resources/providers/awslib/ec2"
)

const (
backgroundDeleteWorkers = 3
backgroundDeleteTimeout = 2 * 24 * time.Hour
)

type snapshotCreatorDeleter interface {
CreateSnapshots(ctx context.Context, ins *ec2.Ec2Instance) ([]ec2.EBSSnapshot, error)
DeleteSnapshot(ctx context.Context, snapshot ec2.EBSSnapshot) error
IterOwnedSnapshots(ctx context.Context, before time.Time) iter.Seq[ec2.EBSSnapshot]
}

type SnapshotManager struct {
Expand Down Expand Up @@ -64,7 +71,7 @@ func (s *SnapshotManager) CreateSnapshots(ctx context.Context, ins *ec2.Ec2Insta

func (s *SnapshotManager) DeleteSnapshot(ctx context.Context, snapshot ec2.EBSSnapshot) {
runWithGrace(ctx, shutdownGracePeriod, func(ctx context.Context) {
s.delete(ctx, snapshot)
s.delete(ctx, snapshot, "DeleteSnapshot")
})

s.lock.Lock()
Expand All @@ -82,17 +89,45 @@ func (s *SnapshotManager) Cleanup(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
s.delete(ctx, snap)
s.delete(ctx, snap, "Cleanup")
}()
}
})
clear(s.snapshots)
}

func (s *SnapshotManager) delete(ctx context.Context, snapshot ec2.EBSSnapshot) {
s.logger.Infof("VulnerabilityScanner.manager.DeleteSnapshot %s", snapshot.SnapshotId)
func (s *SnapshotManager) DeleteOldSnapshots(ctx context.Context) {
var wg sync.WaitGroup
defer wg.Wait()

ch := newContextualChan[ec2.EBSSnapshot]()
defer ch.Close()

wg.Add(backgroundDeleteWorkers)
for range backgroundDeleteWorkers {
go func() {
defer wg.Done()
for {
snap, ok := ch.Read(ctx)
if !ok {
return
}
s.delete(ctx, snap, "DeleteOldSnapshots")
}
}()
}
for snapshot := range s.provider.IterOwnedSnapshots(ctx, time.Now().Add(-backgroundDeleteTimeout)) {
if !ch.Write(ctx, snapshot) {
return
}
}
}

func (s *SnapshotManager) delete(ctx context.Context, snapshot ec2.EBSSnapshot, message string) {
s.logger.Infof("VulnerabilityScanner.manager.%s %s", message, snapshot.SnapshotId)
err := s.provider.DeleteSnapshot(ctx, snapshot)
if err != nil {
s.logger.Errorf("VulnerabilityScanner.manager.DeleteSnapshot %s error: %s", snapshot.SnapshotId, err)
s.logger.Errorf("VulnerabilityScanner.manager.%s %s error: %s", message, snapshot.SnapshotId, err)
}
}

Expand All @@ -109,3 +144,33 @@ func runWithGrace(ctx context.Context, grace time.Duration, f func(ctx context.C
defer stop() // if the callback finishes in time, stop the AfterFunc
f(newCtx) // finally, call the actual callback!
}

type contextualChan[T any] struct {
ch chan T
}
Comment on lines +148 to +150
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ just sharing thoughts - not suggesting to change it ]
I wonder which one of "wrapper struct vs just generic functions", is leaner in that case.

eg instead of a wrapper just having something like that

func SendToChannel[T any](ctx context.Context, ch chan<- T, t T) bool
func ReadFromChannel[T any](ctx context.Context, ch <-chan T) (T, bool)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to completely avoid exposing the channel to prevent misusing it. Both can be valid approaches if direct access to the channel is needed.


func newContextualChan[T any]() contextualChan[T] {
return contextualChan[T]{ch: make(chan T)}
}

func (s contextualChan[T]) Write(ctx context.Context, t T) bool {
select {
case <-ctx.Done():
return false
case s.ch <- t:
return true
}
}

func (s contextualChan[T]) Read(ctx context.Context) (T, bool) {
select {
case t, ok := <-s.ch:
return t, ok
case <-ctx.Done():
return *new(T), false
}
}

func (s contextualChan[T]) Close() {
close(s.ch)
}
Loading