Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws/sqs: prevent ackLoop starvation #3103

Merged
merged 16 commits into from
Jan 7, 2025
Merged

aws/sqs: prevent ackLoop starvation #3103

merged 16 commits into from
Jan 7, 2025

Conversation

rockwotj
Copy link
Collaborator

@rockwotj rockwotj commented Jan 4, 2025

The ackLoop can be starved, we add a max outstanding limit to ensure that stuff gets acked properly and fix the refresh behavior in general.

Making this a batch input might help too. My theory is that:

  • Polling returns 10 messages it grabs the lock
  • Acking also grabs the lock, but is only able to process 1 message at a time

So if there is a backlog and the locking is fair, the poll loop is going to produce a lot more values than the ack loop can and we end up with unbounded memory usage for the inflight messages. So a batch input where 10 messages are acked at a time would probably help with lock fairness. Just having the cond var in there seems to help a ton, but there is now an explicit mechanism to ensure that we don't have unbounded memory usage.

We fix all this by moving the refresh out to another loop and only try to refresh 10 messages at once (which is the API limit for requests at once). The only improvement from here is to make more requests in parallel, but I think with limiting the refreshes we're good from needing to do that.

@rockwotj rockwotj requested review from Jeffail and mihaitodor January 4, 2025 05:11
Just adding the enforcement of this limit seems to alievate some lock
contention and now the ackLoop doesn't get stuck and stop acking stuff,
the limit here should help too in cases where there is actual backup.

The property name was inspired from the gcp pubsub input.
If the advanced delete message prop was ever `false`, the in flight
tracker would just accumulate memory until OOM. Much better to just
noop the actual delete, because now when there are nacks we will reset
the visibility faster.
From testing I am getting false positive log messages and am not sure
why the atomic still has a value unless there is some sneaky copy in
there somewhere. Just move to checking in the original map when there
are failures, which should hopefully be rare anyways.
Duh, we *don't* want to refresh messages if the deadline is *greater* than
half the timeout

Sigh 🤦
@rockwotj rockwotj requested a review from ooesili January 6, 2025 16:35
Comment on lines -249 to -254
if timeoutStr, exists := m.Attributes[sqsiAttributeNameVisibilityTimeout]; exists {
// Might as well keep the queue timeout setting refreshed as we
// consume new data.
if tmpTimeoutSeconds, err := strconv.Atoi(timeoutStr); err == nil {
handle.timeoutSeconds = tmpTimeoutSeconds
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this attribute is not mentioned in the docs and doesn't seem to be present in my testing...

Most of this is just cleanup, the only real fix is correcting the timeout
to match the queue.
if !a.conf.DeleteMessage {
return nil
}
const maxBatchSize = 10
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API limit here on the amazon side is 10, so currently if you MaxNumberOfMessages > 10 then you can't delete or reset visibility for anything...

Copy link
Collaborator

@mihaitodor mihaitodor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job! 🏆 Just left some small nitpicky questions, but feel free to :shipit:

internal/impl/aws/input_sqs.go Show resolved Hide resolved
internal/impl/aws/input_sqs.go Show resolved Hide resolved
internal/impl/aws/input_sqs.go Show resolved Hide resolved
internal/impl/aws/input_sqs.go Outdated Show resolved Hide resolved
Prevents errors in the batch API, even if it's unlikely. Also fix a log
statement.
@rockwotj rockwotj merged commit 6801337 into main Jan 7, 2025
4 checks passed
@rockwotj rockwotj deleted the sqs branch January 7, 2025 15:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants