Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject TTL messages when not enabled, strip TTL header from sourcing/mirroring #6298

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1638,5 +1638,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSMessageTTLDisabledErr",
"code": 400,
"error_code": 10166,
"description": "per-message TTL is disabled",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
14 changes: 13 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7997,7 +7997,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq
interestPolicy, discard, maxMsgs, maxBytes := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes
isLeader, isSealed, compressOK := mset.isLeader(), mset.cfg.Sealed, mset.compressOK
isLeader, isSealed, compressOK, allowTTL := mset.isLeader(), mset.cfg.Sealed, mset.compressOK, mset.cfg.AllowMsgTTL
mset.mu.RUnlock()

// This should not happen but possible now that we allow scale up, and scale down where this could trigger.
Expand Down Expand Up @@ -8131,6 +8131,18 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.storeMsgIdLocked(&ddentry{msgId, 0, time.Now().UnixNano()})
mset.mu.Unlock()
}

// TTL'd messages are rejected entirely if TTLs are not enabled on the stream.
if ttl, _ := getMessageTTL(hdr); ttl != 0 && !allowTTL {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSMessageTTLDisabledError()
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return errMsgTTLDisabled
}
}

// Proceed with proposing this message.
Expand Down
142 changes: 142 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5331,3 +5331,145 @@ func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) {
ljs.mu.Lock()
t.Logf("Took %s to clear %d items", time.Since(start), count)
}

func TestJetStreamClusterMessageTTLStrippedSourcing(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "SOURCE",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
Replicas: 3,
})

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Sources: []*StreamSource{
{Name: "SOURCE"},
},
Replicas: 3,
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "5s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Header: hdr,
})
require_NoError(t, err)

// Make sure that the header is present in the source stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("SOURCE"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_NotEqual(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}

// Make sure that the header has been stripped in the sourcing stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("TEST"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}
}

func TestJetStreamClusterMessageTTLStrippedMirroring(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "SOURCE",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
Replicas: 3,
})

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Mirror: &StreamSource{Name: "SOURCE"},
Replicas: 3,
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "5s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Header: hdr,
})
require_NoError(t, err)

// Make sure that the header is present in the source stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("SOURCE"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_NotEqual(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}

// Make sure that the header has been stripped in the mirroring stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("TEST"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}
}

func TestJetStreamClusterMessageTTLDisabled(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
Replicas: 3,
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set("Nats-TTL", "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)

// In clustered mode we should have caught this before generating the
// proposal, therefore the CLFS should not have been bumped by this.
for _, s := range c.servers {
stream, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
require_Equal(t, stream.getCLFS(), 0)
}
}
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ const (
// JSMemoryResourcesExceededErr insufficient memory resources available
JSMemoryResourcesExceededErr ErrorIdentifier = 10028

// JSMessageTTLDisabledErr per-message TTL is disabled
JSMessageTTLDisabledErr ErrorIdentifier = 10166

// JSMessageTTLInvalidErr invalid per-message TTL
JSMessageTTLInvalidErr ErrorIdentifier = 10165

Expand Down Expand Up @@ -579,6 +582,7 @@ var (
JSMaximumConsumersLimitErr: {Code: 400, ErrCode: 10026, Description: "maximum consumers limit reached"},
JSMaximumStreamsLimitErr: {Code: 400, ErrCode: 10027, Description: "maximum number of streams reached"},
JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"},
JSMessageTTLDisabledErr: {Code: 400, ErrCode: 10166, Description: "per-message TTL is disabled"},
JSMessageTTLInvalidErr: {Code: 400, ErrCode: 10165, Description: "invalid per-message TTL"},
JSMirrorConsumerSetupFailedErrF: {Code: 500, ErrCode: 10029, Description: "{err}"},
JSMirrorInvalidStreamName: {Code: 400, ErrCode: 10142, Description: "mirrored stream name is invalid"},
Expand Down Expand Up @@ -1551,6 +1555,16 @@ func NewJSMemoryResourcesExceededError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSMemoryResourcesExceededErr]
}

// NewJSMessageTTLDisabledError creates a new JSMessageTTLDisabledErr error: "per-message TTL is disabled"
func NewJSMessageTTLDisabledError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSMessageTTLDisabledErr]
}

// NewJSMessageTTLInvalidError creates a new JSMessageTTLInvalidErr error: "invalid per-message TTL"
func NewJSMessageTTLInvalidError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
4 changes: 4 additions & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,8 @@ func jsClientConnectURL(t testing.TB, url string, opts ...nats.Option) (*nats.Co

// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfig {
t.Helper()

j, err := json.Marshal(cfg)
require_NoError(t, err)

Expand All @@ -1280,6 +1282,8 @@ func jsStreamCreate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) *StreamConfi

// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet.
func jsStreamUpdate(t testing.TB, nc *nats.Conn, cfg *StreamConfig) (*StreamConfig, error) {
t.Helper()

j, err := json.Marshal(cfg)
require_NoError(t, err)

Expand Down
129 changes: 129 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25121,3 +25121,132 @@ func TestJetStreamMessageTTLNeverExpire(t *testing.T) {
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 11)
}

func TestJetStreamMessageTTLDisabled(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Subjects: []string{"test"},
})

msg := &nats.Msg{
Subject: "test",
Header: nats.Header{},
}

msg.Header.Set("Nats-TTL", "1s")
_, err := js.PublishMsg(msg)
require_Error(t, err)
}

func TestJetStreamMessageTTLStrippedSourcing(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "SOURCE",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Sources: []*StreamSource{
{Name: "SOURCE"},
},
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "5s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Header: hdr,
})
require_NoError(t, err)

// Make sure that the header is present in the source stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("SOURCE"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_NotEqual(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}

// Make sure that the header has been stripped in the sourcing stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("TEST"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}
}

func TestJetStreamMessageTTLStrippedMirroring(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

jsStreamCreate(t, nc, &StreamConfig{
Name: "SOURCE",
Storage: FileStorage,
Subjects: []string{"test"},
AllowMsgTTL: true,
})

jsStreamCreate(t, nc, &StreamConfig{
Name: "TEST",
Storage: FileStorage,
Mirror: &StreamSource{Name: "SOURCE"},
})

hdr := nats.Header{}
hdr.Add("Nats-TTL", "5s")

_, err := js.PublishMsg(&nats.Msg{
Subject: "test",
Header: hdr,
})
require_NoError(t, err)

// Make sure that the header is present in the source stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("SOURCE"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_NotEqual(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}

// Make sure that the header has been stripped in the mirroring stream.
{
sc, err := js.PullSubscribe("test", "consumer", nats.BindStream("TEST"))
require_NoError(t, err)

msgs, err := sc.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Header.Get(JSMessageTTL), _EMPTY_)
}
}
Loading
Loading