Skip to content

Commit

Permalink
[FIXED] Respect consumer's starting seq, even if in the future
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Dec 13, 2024
1 parent 2325e09 commit 6902961
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
12 changes: 9 additions & 3 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5360,12 +5360,18 @@ func (o *consumer) selectStartingSeqNo() {
o.sseq = o.cfg.OptStartSeq
}

if state.FirstSeq == 0 {
if state.FirstSeq == 0 && (o.cfg.Direct || o.cfg.OptStartSeq == 0) {
// If the stream is empty, deliver only new.
// But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value.
o.sseq = 1
} else if o.sseq > state.LastSeq && (o.cfg.Direct || o.cfg.OptStartSeq == 0) {
// If selected sequence is in the future, clamp back down.
// But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value.
o.sseq = state.LastSeq + 1
} else if o.sseq < state.FirstSeq {
// If the first sequence is further ahead than the starting sequence,
// there are no messages there anymore, so move the sequence up.
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
}

Expand Down
54 changes: 54 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"testing"
"time"

"github.com/nats-io/nats.go/jetstream"

"github.com/nats-io/jwt/v2"
"github.com/nats-io/nats.go"
)
Expand Down Expand Up @@ -6942,6 +6944,58 @@ func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) {
require_NoError(t, err)
}

func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc := clientConnectToServer(t, c.randomServer())
defer nc.Close()
js, err := jetstream.New(nc)
require_NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()

// Create replicated stream.
_, err = js.CreateStream(ctx, jetstream.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// We could have published messages into the stream that have not yet been applied on the follower.
// If we create a consumer with a starting sequence in the future, we must respect it.
consumer, err := js.OrderedConsumer(ctx, "TEST", jetstream.OrderedConsumerConfig{
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
OptStartSeq: 20,
})
require_NoError(t, err)
require_Equal(t, consumer.CachedInfo().Delivered.Stream, 19)

// Same thing if the first sequence is not 0.
stream, err := js.Stream(ctx, "TEST")
require_NoError(t, err)
err = stream.Purge(ctx, jetstream.WithPurgeSequence(10))
require_NoError(t, err)

consumer, err = js.OrderedConsumer(ctx, "TEST", jetstream.OrderedConsumerConfig{
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
OptStartSeq: 20,
})
require_NoError(t, err)
require_Equal(t, consumer.CachedInfo().Delivered.Stream, 19)

// Only if we're requested to start at a sequence that's not available anymore
// can we safely move it up. That data is gone already, so can't do anything else.
consumer, err = js.OrderedConsumer(ctx, "TEST", jetstream.OrderedConsumerConfig{
DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
OptStartSeq: 5,
})
require_NoError(t, err)
require_Equal(t, consumer.CachedInfo().Delivered.Stream, 9)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down

0 comments on commit 6902961

Please sign in to comment.