Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(2.11) JetStream API routed queue changes #6342

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions server/ipqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,43 @@ func (q *ipQueue[T]) popOne() (T, bool) {
return e, true
}

// Returns the last element from the queue, if any. See comment above
// regarding calling after being notified that there is something and
// the use of drain(). In short, the caller should always check the
// boolean return value to ensure that the value is genuine and not a
// default empty value.
func (q *ipQueue[T]) popOneLast() (T, bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: popLast() may be enough.

Adding a test for this new API would be a good thing, for instance making sure that we can do popOne() and popLast() mixed for instance.

q.Lock()
l := len(q.elts) - q.pos
if l == 0 {
q.Unlock()
var empty T
return empty, false
}
e := q.elts[len(q.elts)-1]
q.elts = q.elts[:len(q.elts)-1]
if l--; l > 0 {
if q.calc != nil {
q.sz -= q.calc(e)
}
// We need to re-signal
select {
case q.ch <- struct{}{}:
default:
}
} else {
// We have just emptied the queue, so we can reuse unless it is too big.
if cap(q.elts) <= q.mrs {
q.elts = q.elts[:0]
} else {
q.elts = nil
}
q.pos, q.sz = 0, 0
}
q.Unlock()
return e, true
}

// After a pop(), the slice can be recycled for the next push() when
// a first element is added to the queue.
// This will also decrement the "in progress" count with the length
Expand Down
28 changes: 25 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,9 +890,30 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
retry:
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
limit := atomic.LoadInt64(&js.queueLimit)
if pending >= int(limit) {
if _, ok := s.jsAPIRoutedReqs.popOne(); ok {
// If we were able to take one of the oldest items off the queue, then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code is no longer properly updating the js.apiInflight that is bumped above and is normally decremented when processing elements in processJSAPIRoutedRequests().

You have introduced quite recently ability to limit the ipQueue with making the push() fail when above that limit, why not using this instead of some arbitrary limit leading to a drain?

// retry the insert.
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping oldest request")
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Type: JSAPILimitReachedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
Domain: js.config.Domain,
Dropped: 1,
})
goto retry
}

// It's likely not possible to get to this point, but if for some reason we have got here,
// then something is wrong for us to be both over the limit but unable to pull entries, so
// throw everything away and hope we recover from it.
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
s.jsAPIRoutedReqs.drain()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearly the apiInflight will be out of whack when reaching this code...


Expand Down Expand Up @@ -922,8 +943,10 @@ func (s *Server) processJSAPIRoutedRequests() {
for {
select {
case <-queue.ch:
reqs := queue.pop()
for _, r := range reqs {
// Only pop one item at a time here, otherwise if the system is recovering
// from queue buildup, then one worker will pull off all the tasks and the
// others will be starved of work.
for r, ok := queue.popOneLast(); ok && r != nil; r, ok = queue.popOneLast() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say that this seems wrong: picture a push("some create operation") followed by push("some delete of that created object"). With popOneLast(), you would possibly get the "delete" first, then the "create". But I realize that a change was made some time ago to make processJSAPIRoutedRequests() be executed from several go routines, which then already completely put the "ordering" out of the window. So I assume that if that was done it means that those actions are independent of each other and order does not matter. You would want to verify that this is the case. Again, if there is possibly ordering issue, then popOneLast() would be wrong, but running processJSAPIRoutedRequests() from more than 1 go routine would be wrong too.

client.pa = r.pa
start := time.Now()
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
Expand All @@ -932,7 +955,6 @@ func (s *Server) processJSAPIRoutedRequests() {
}
atomic.AddInt64(&js.apiInflight, -1)
}
queue.recycle(&reqs)
case <-s.quitCh:
return
}
Expand Down
Loading