-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(2.11) JetStream API routed queue changes #6342
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Neil Twigg <neil@nats.io>
Signed-off-by: Neil Twigg <neil@nats.io>
One concern I have with LIFO is - if the System is under managable, but constant pressure, causing the queue to be more or less at constant size, the old requests might never be handled. |
With the LIFO approach it does mean that we prioritise more recent requests, yes, but consider the alternative: if the system builds up requests because of a pause or because it is consistently struggling, then we have a head-of-line blocking problem. The current FIFO approach will be pulling off the oldest requests and handling them after the clients have given up waiting, which in turn means that all requests behind them will also be handled after the clients have given up. The result is that we've affected a really large number of clients because none of the work got completed in time. The LIFO approach turns it on its head and continues to serve the most recent requests as soon as possible, which gets a number of "happy" clients out of the way as soon as possible. (They were never head-of-line blocked and therefore do not need to come back and retry later, which just compounds the problem and makes everything worse.) We then optimistically try to satisfy the older requests if we have the resources available to do so. If we don't then that's unavoidable, but that's no worse than FIFO today. At least with LIFO, we prioritised getting some work done that clients will still be waiting for and won't need to retry. Also worth noting this is only affecting the routed JS API requests, i.e. ones that have to be routed to the metaleader for completion. For requests that can be handled inline from a client connection, this doesn't change anything. It also doesn't change anything for a system that is operating normally. |
This updates the JS API workers to use `popOne`, so that if the queue stacks up at all, the recovery is fairer across workers, rather than single workers stealing the entire queue. This was extracted from #6342. Signed-off-by: Neil Twigg <neil@nats.io>
I was going to propose soon changes to IPQueue with a lockless implementation, so that is putting a wrench in it since I don't think (but did not really look into it) that it would be "doable" with my implementation. |
Not that it should stop you to go with this approach if that the only or best way. I will either be able to add this new API to the lockless implementation or just give up on lockless implementation ;-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearly this new API makes the lockless implementation that I was working on moot, because I can't do a popLast() I think. But regardless, see some comments I have.
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa}) | ||
limit := atomic.LoadInt64(&js.queueLimit) | ||
if pending >= int(limit) { | ||
if _, ok := s.jsAPIRoutedReqs.popOne(); ok { | ||
// If we were able to take one of the oldest items off the queue, then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code is no longer properly updating the js.apiInflight
that is bumped above and is normally decremented when processing elements in processJSAPIRoutedRequests()
.
You have introduced quite recently ability to limit the ipQueue with making the push() fail when above that limit, why not using this instead of some arbitrary limit leading to a drain?
|
||
// It's likely not possible to get to this point, but if for some reason we have got here, | ||
// then something is wrong for us to be both over the limit but unable to pull entries, so | ||
// throw everything away and hope we recover from it. | ||
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending) | ||
s.jsAPIRoutedReqs.drain() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearly the apiInflight
will be out of whack when reaching this code...
// Only pop one item at a time here, otherwise if the system is recovering | ||
// from queue buildup, then one worker will pull off all the tasks and the | ||
// others will be starved of work. | ||
for r, ok := queue.popOneLast(); ok && r != nil; r, ok = queue.popOneLast() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to say that this seems wrong: picture a push("some create operation")
followed by push("some delete of that created object")
. With popOneLast()
, you would possibly get the "delete" first, then the "create". But I realize that a change was made some time ago to make processJSAPIRoutedRequests()
be executed from several go routines, which then already completely put the "ordering" out of the window. So I assume that if that was done it means that those actions are independent of each other and order does not matter. You would want to verify that this is the case. Again, if there is possibly ordering issue, then popOneLast()
would be wrong, but running processJSAPIRoutedRequests()
from more than 1 go routine would be wrong too.
// the use of drain(). In short, the caller should always check the | ||
// boolean return value to ensure that the value is genuine and not a | ||
// default empty value. | ||
func (q *ipQueue[T]) popOneLast() (T, bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: popLast()
may be enough.
Adding a test for this new API would be a good thing, for instance making sure that we can do popOne() and popLast() mixed for instance.
This PR makes three changes:
Signed-off-by: Neil Twigg neil@nats.io