Skip to content

Commit

Permalink
Lock when bumping clfs. Make sure to unlock mset lock before bumpCLFS. (
Browse files Browse the repository at this point in the history
#5183)

Locking order is clMu then mset lock.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored Mar 7, 2024
2 parents b9c2d75 + a0e69e3 commit f30c7e1
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 24 deletions.
12 changes: 6 additions & 6 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7558,7 +7558,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
canRespond := !mset.cfg.NoAck && len(reply) > 0
name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store
s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs
maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq
interestPolicy, discard, maxMsgs, maxBytes := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes
isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed

Expand Down Expand Up @@ -7698,10 +7698,10 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// We only use mset.clseq for clustering and in case we run ahead of actual commits.
// Check if we need to set initial value here
mset.clMu.Lock()
if mset.clseq == 0 || mset.clseq < lseq+clfs {
if mset.clseq == 0 || mset.clseq < lseq+mset.clfs {
// Re-capture
lseq, clfs = mset.lastSeq(), mset.clfs
mset.clseq = lseq + clfs
lseq = mset.lastSeq()
mset.clseq = lseq + mset.clfs
}

// Check if we have an interest policy and discard new with max msgs or bytes.
Expand Down Expand Up @@ -7764,7 +7764,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [

// Check to see if we are being overrun.
// TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.
if mset.clseq-(lseq+clfs) > streamLagWarnThreshold {
if mset.clseq-(lseq+mset.clfs) > streamLagWarnThreshold {
lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, name)
s.RateLimitWarnf(lerr.Error())
}
Expand Down Expand Up @@ -7955,11 +7955,11 @@ var (
func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
// Update any deletes, etc.
mset.processSnapshotDeletes(snap)
mset.setCLFS(snap.Failed)

mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
mset.setCLFS(snap.Failed)
sreq := mset.calculateSyncRequest(&state, snap)

s, js, subject, n, st := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Storage
Expand Down
35 changes: 17 additions & 18 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,9 +997,7 @@ func (mset *stream) rebuildDedupe() {
}

func (mset *stream) lastSeqAndCLFS() (uint64, uint64) {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.lseq, mset.getCLFS()
return mset.lastSeq(), mset.getCLFS()
}

func (mset *stream) getCLFS() uint64 {
Expand All @@ -1016,9 +1014,8 @@ func (mset *stream) setCLFS(clfs uint64) {

func (mset *stream) lastSeq() uint64 {
mset.mu.RLock()
lseq := mset.lseq
mset.mu.RUnlock()
return lseq
defer mset.mu.RUnlock()
return mset.lseq
}

func (mset *stream) setLastSeq(lseq uint64) {
Expand Down Expand Up @@ -4396,7 +4393,9 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if traceOnly {
return
}
mset.clMu.Lock()
mset.clfs++
mset.clMu.Unlock()
}

// Apply the input subject transform if any
Expand Down Expand Up @@ -4427,8 +4426,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Bail here if sealed.
if isSealed {
outq := mset.outq
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond && outq != nil {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamSealedErr]
Expand Down Expand Up @@ -4495,8 +4494,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if !isClustered || traceOnly {
// Expected stream.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamNotMatchError()
Expand All @@ -4510,8 +4509,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Dedupe detection.
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
if dde := mset.checkMsgId(msgId); dde != nil {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
Expand All @@ -4534,8 +4533,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
fseq, err = 0, nil
}
if err != nil || fseq != seq {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(fseq)
Expand All @@ -4549,8 +4548,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Expected last sequence.
if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.lseq {
mlseq := mset.lseq
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(mlseq)
Expand All @@ -4566,8 +4565,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
if lmsgId != mset.lmsgId {
last := mset.lmsgId
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastMsgIDError(last)
Expand All @@ -4580,8 +4579,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check for any rollups.
if rollup := getRollup(hdr); rollup != _EMPTY_ {
if !mset.cfg.AllowRollup || mset.cfg.DenyPurge {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamRollupFailedError(errors.New("rollup not permitted"))
Expand All @@ -4596,8 +4595,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
case JSMsgRollupAll:
rollupAll = true
default:
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
err := fmt.Errorf("rollup value invalid: %q", rollup)
if canRespond {
resp.PubAck = &PubAck{Stream: name}
Expand All @@ -4619,8 +4618,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,

// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamMessageExceedsMaximumError()
Expand All @@ -4631,8 +4630,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}

if len(hdr) > math.MaxUint16 {
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamHeaderExceedsMaximumError()
Expand All @@ -4645,8 +4644,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check to see if we have exceeded our limits.
if js.limitsExceeded(stype) {
s.resourcesExceededError()
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSInsufficientResourcesError()
Expand Down Expand Up @@ -4773,8 +4772,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
mset.store.FastState(&state)
mset.lseq = state.LastSeq
mset.lmsgId = olmsgId
bumpCLFS()
mset.mu.Unlock()
bumpCLFS()

switch err {
case ErrMaxMsgs, ErrMaxBytes, ErrMaxMsgsPerSubject, ErrMsgTooLarge:
Expand Down

0 comments on commit f30c7e1

Please sign in to comment.