-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
aws/sqs: prevent ackLoop starvation #3103
Conversation
Just adding the enforcement of this limit seems to alievate some lock contention and now the ackLoop doesn't get stuck and stop acking stuff, the limit here should help too in cases where there is actual backup. The property name was inspired from the gcp pubsub input.
If the advanced delete message prop was ever `false`, the in flight tracker would just accumulate memory until OOM. Much better to just noop the actual delete, because now when there are nacks we will reset the visibility faster.
From testing I am getting false positive log messages and am not sure why the atomic still has a value unless there is some sneaky copy in there somewhere. Just move to checking in the original map when there are failures, which should hopefully be rare anyways.
Duh, we *don't* want to refresh messages if the deadline is *greater* than half the timeout Sigh 🤦
if timeoutStr, exists := m.Attributes[sqsiAttributeNameVisibilityTimeout]; exists { | ||
// Might as well keep the queue timeout setting refreshed as we | ||
// consume new data. | ||
if tmpTimeoutSeconds, err := strconv.Atoi(timeoutStr); err == nil { | ||
handle.timeoutSeconds = tmpTimeoutSeconds | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI this attribute is not mentioned in the docs and doesn't seem to be present in my testing...
Most of this is just cleanup, the only real fix is correcting the timeout to match the queue.
if !a.conf.DeleteMessage { | ||
return nil | ||
} | ||
const maxBatchSize = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API limit here on the amazon side is 10, so currently if you MaxNumberOfMessages > 10 then you can't delete or reset visibility for anything...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job! 🏆 Just left some small nitpicky questions, but feel free to
Prevents errors in the batch API, even if it's unlikely. Also fix a log statement.
The ackLoop can be starved, we add a max outstanding limit to ensure that stuff gets acked properly and fix the refresh behavior in general.
Making this a batch input might help too. My theory is that:
So if there is a backlog and the locking is fair, the poll loop is going to produce a lot more values than the ack loop can and we end up with unbounded memory usage for the inflight messages. So a batch input where 10 messages are acked at a time would probably help with lock fairness. Just having the cond var in there seems to help a ton, but there is now an explicit mechanism to ensure that we don't have unbounded memory usage.
We fix all this by moving the refresh out to another loop and only try to refresh 10 messages at once (which is the API limit for requests at once). The only improvement from here is to make more requests in parallel, but I think with limiting the refreshes we're good from needing to do that.