-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance improvements (quick wins) #3087
base: main
Are you sure you want to change the base?
Conversation
This pull request does not have a backport label. Could you fix it @moukoublen? 🙏
|
5e99ce8
to
850d5bf
Compare
internal/flavors/publisher.go
Outdated
*events = nil | ||
*events = (*events)[:0] // reuse the capacity and set len to 0. | ||
|
||
// if for some reason capacity exceeds 4*threshold, drop the slice and create a new one. (it will never get here, just a precaution) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore you are expecting the capacity to grow?
The flow is the following:
- Init with capacity of 1.5x the threshold
- Append to event once the channel receives it
- If the capacity is 4x higher than threshold, restart the capacity
The question is, how would capacity grow if publish
is a blocking action once the threshold has been met?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our flow is pretty much (in high level) here:
cloudbeat/internal/flavors/benchmark/builder/benchmark.go
Lines 52 to 56 in 5eea2c3
func (b *basebenchmark) Run(ctx context.Context) (<-chan []beat.Event, error) { | |
b.manager.Run() | |
findingsCh := pipeline.Step(ctx, b.log, b.resourceCh, b.evaluator.Eval) | |
eventsCh := pipeline.Step(ctx, b.log, findingsCh, b.transformer.CreateBeatEvents) | |
return eventsCh, nil |
and here
cloudbeat/internal/flavors/posture.go
Lines 96 to 104 in 5eea2c3
func (bt *posture) Run(*beat.Beat) error { | |
bt.log.Info("posture is running! Hit CTRL-C to stop it") | |
eventsCh, err := bt.benchmark.Run(bt.ctx) | |
if err != nil { | |
return err | |
} | |
bt.publisher.HandleEvents(bt.ctx, eventsCh) | |
bt.log.Warn("Posture has finished running") |
resources
-> findings
(plural) per resource -> create (plural) beat events
-> publish
.
SO the channel in this case receives multiple events []beat.Event
case event, ok := <-ch:
// ...
eventsToSend = append(eventsToSend, event...)
So in theory if a resource produces multiple findings (more than the threshold) the capacity will grow. And because we re-use we will keep using the "extended" capacity for ever (not nececerilly a bad thing) but its good to have a fallback.
So if we have a single resource that produced an enormous number of findings (if), lets reset the slice in that specific case.
And in general, since we receive batches (slice), we can potentially grow beyond the threshold at any given point in time.
I don't think it's a very possible scenario to grow that big (> x4). But it seems good to have it covered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a very possible scenario to grow that big (> x4). But it seems good to have it covered.
I actually think it's extremely likely. That's just 4*75 = 300 events. Since our fetchers don't stream events right now, any resource type with more than 300 objects will trigger it in my understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its not batched per resource type but per single resource.
So the case event, ok := <-ch:
basically means (if I am not mistaken) rule findings for a single resource id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a hight level of glow data travels through cloudbeat currently (if I am not mistaken):
chan fetching.ResourceInfo
<- fethcers fetch (batch or stream-ly) and send each asset (single resource per asset).
chan fetching.ResourceInfo
-> basebenchmark, step 1, receives (single resource) and runs opa eval per each one.
chan evaluator.EventData
<- basebenchmark, step 1, opa eval sends multiple findings per single resource batched together to single item.
chan evaluator.EventData
-> basebenchmark, step 2, receives and converts each resource's findings (multiple findings per single resource) to beat.Event each one.
chan []beat.Event
<- basebenchmark, step 2, sends multiple events (all findings per a single resource).
chan []beat.Event
-> publisher receives from channel and publish to elasticsearch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite important to add tests that cover cases with events exceeding 4x the threshold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resetting
Would be nice (outside of this PR) to set some kind of continued benchmarking tool to compare our performance / memory usage for go benchmarks: https://github.com/benchmark-action/github-action-benchmark |
790ca64
to
0208d1d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interval doesn't play an role in this test so let's not mislead by putting it in the table
fbdf0a4
to
2e305b4
Compare
Co-authored-by: Orestis Floros <orestisflo@gmail.com>
Co-authored-by: Orestis Floros <orestisflo@gmail.com>
2e305b4
to
95f0211
Compare
Summary of your changes
Screenshot/Data
Related Issues
Checklist
Introducing a new rule?