Skip to content

Commit

Permalink
Reject TTL messages when not enabled, strip TTL header from sourcing/…
Browse files Browse the repository at this point in the history
…mirroring

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jan 10, 2025
1 parent 1dcd873 commit 5a4bbea
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 5 deletions.
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1638,5 +1638,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMessageTTLDisabledErr",
"code": 400,
"error_code": 10166,
"description": "per-message TTL is disabled",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
14 changes: 13 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7997,7 +7997,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq
interestPolicy, discard, maxMsgs, maxBytes := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes
isLeader, isSealed, compressOK := mset.isLeader(), mset.cfg.Sealed, mset.compressOK
isLeader, isSealed, compressOK, allowTTL := mset.isLeader(), mset.cfg.Sealed, mset.compressOK, mset.cfg.AllowMsgTTL
mset.mu.RUnlock()

// This should not happen but possible now that we allow scale up, and scale down where this could trigger.
Expand Down Expand Up @@ -8131,6 +8131,18 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.storeMsgIdLocked(&ddentry{msgId, 0, time.Now().UnixNano()})
mset.mu.Unlock()
}

// TTL'd messages are rejected entirely if TTLs are not enabled on the stream.
if ttl, _ := getMessageTTL(hdr); ttl != 0 && !allowTTL {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSMessageTTLDisabledError()
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return errMsgTTLDisabled
}
}

// Proceed with proposing this message.
Expand Down
142 changes: 142 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5331,3 +5331,145 @@ func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) {
ljs.mu.Lock()
t.Logf("Took %s to clear %d items", time.Since(start), count)
}

func TestJetStreamClusterMessageTTLStrippedSourcing(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "SOURCE",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
Replicas: 3,
})

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Sources: []*StreamSource{
{Name: "SOURCE"},
},
Replicas: 3,
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "5s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Header: hdr,
})
require_NoError(t, err)

// Make sure that the header is present in the source stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("SOURCE"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_NotEqual(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}

// Make sure that the header has been stripped in the sourcing stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("TEST"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}
}

func TestJetStreamClusterMessageTTLStrippedMirroring(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "SOURCE",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
Replicas: 3,
})

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Mirror: &StreamSource{Name: "SOURCE"},
Replicas: 3,
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "5s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Header: hdr,
})
require_NoError(t, err)

// Make sure that the header is present in the source stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("SOURCE"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_NotEqual(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}

// Make sure that the header has been stripped in the mirroring stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("TEST"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}
}

func TestJetStreamClusterMessageTTLDisabled(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
Replicas: 3,
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set("Nats-TTL", "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)

// In clustered mode we should have caught this before generating the
// proposal, therefore the CLFS should not have been bumped by this.
for _, s := range c.servers {
stream, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
require_Equal(t, stream.getCLFS(), 0)
}
}
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ const (
// JSMemoryResourcesExceededErr insufficient memory resources available
JSMemoryResourcesExceededErr ErrorIdentifier = 10028

// JSMessageTTLDisabledErr per-message TTL is disabled
JSMessageTTLDisabledErr ErrorIdentifier = 10166

// JSMessageTTLInvalidErr invalid per-message TTL
JSMessageTTLInvalidErr ErrorIdentifier = 10165

Expand Down Expand Up @@ -579,6 +582,7 @@ var (
JSMaximumConsumersLimitErr: {Code: 400, ErrCode: 10026, Description: "maximum consumers limit reached"},
JSMaximumStreamsLimitErr: {Code: 400, ErrCode: 10027, Description: "maximum number of streams reached"},
JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"},
JSMessageTTLDisabledErr: {Code: 400, ErrCode: 10166, Description: "per-message TTL is disabled"},
JSMessageTTLInvalidErr: {Code: 400, ErrCode: 10165, Description: "invalid per-message TTL"},
JSMirrorConsumerSetupFailedErrF: {Code: 500, ErrCode: 10029, Description: "{err}"},
JSMirrorInvalidStreamName: {Code: 400, ErrCode: 10142, Description: "mirrored stream name is invalid"},
Expand Down Expand Up @@ -1551,6 +1555,16 @@ func NewJSMemoryResourcesExceededError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSMemoryResourcesExceededErr]
}

// NewJSMessageTTLDisabledError creates a new JSMessageTTLDisabledErr error: "per-message TTL is disabled"
func NewJSMessageTTLDisabledError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMessageTTLDisabledErr]
}

// NewJSMessageTTLInvalidError creates a new JSMessageTTLInvalidErr error: "invalid per-message TTL"
func NewJSMessageTTLInvalidError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
4 changes: 4 additions & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,8 @@ func jsClientConnectURL(t testing.TB, url string, opts ...nats.Option) (*nats.Co

// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfig {
t.Helper()

j, err := json.Marshal(cfg)
require_NoError(t, err)

Expand All @@ -1280,6 +1282,8 @@ func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfi

// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamUpdate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) (*StreamConfig, error) {
t.Helper()

j, err := json.Marshal(cfg)
require_NoError(t, err)

Expand Down
129 changes: 129 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25121,3 +25121,132 @@ func TestJetStreamMessageTTLNeverExpire(t *testing.T) {
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)
}

func TestJetStreamMessageTTLDisabled(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set("Nats-TTL", "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)
}

func TestJetStreamMessageTTLStrippedSourcing(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "SOURCE",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Sources: []*StreamSource{
{Name: "SOURCE"},
},
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "5s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Header: hdr,
})
require_NoError(t, err)

// Make sure that the header is present in the source stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("SOURCE"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_NotEqual(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}

// Make sure that the header has been stripped in the sourcing stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("TEST"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}
}

func TestJetStreamMessageTTLStrippedMirroring(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "SOURCE",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Mirror: &StreamSource{Name: "SOURCE"},
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "5s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Header: hdr,
})
require_NoError(t, err)

// Make sure that the header is present in the source stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("SOURCE"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_NotEqual(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}

// Make sure that the header has been stripped in the mirroring stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("TEST"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}
}
Loading

0 comments on commit 5a4bbea

Please sign in to comment.