Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] option to keep or remove js headers #5409

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23503,3 +23503,99 @@ func TestInterestStreamWithFilterSubjectsConsumer(t *testing.T) {
t.Fatalf("expected 2 messages got %d", nfo.State.Msgs)
}
}

func TestJetStreamHeadersToKeep(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_ = js
acc := s.GlobalAccount()
if _, err := acc.addStream(&StreamConfig{
Name: "test",
Subjects: []string{"test.*"},
HeadersToKeep: []string{
"bbb",
"ddd",
},
// The that both options are mutually exclusive.
HeadersToRemove: []string{
"bbb",
"ddd",
},
}); err != nil {
t.Fatalf("Failed to add stream: %v", err)
}

// Now add a message with a header.
pubAck, err := js.PublishMsg(&nats.Msg{
Subject: "test.foo",
Header: nats.Header{
JSMsgId: []string{"1234"},
"aaa": []string{"111"},
"bbb": []string{"222"},
"ccc": []string{"333"},
"ddd": []string{"444"},
},
})
require_NoError(t, err)

// Now check that the header was removed.
rawMsg, err := js.GetMsg("test", pubAck.Sequence)
require_NoError(t, err)

require_Equal(t, rawMsg.Header.Get(JSMsgId), "1234")
require_Equal(t, rawMsg.Header.Get("aaa"), "")
require_Equal(t, rawMsg.Header.Get("bbb"), "222")
require_Equal(t, rawMsg.Header.Get("ccc"), "")
require_Equal(t, rawMsg.Header.Get("ddd"), "444")
}

func TestJetStreamHeadersToRemove(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_ = js
acc := s.GlobalAccount()
if _, err := acc.addStream(&StreamConfig{
Name: "test",
Subjects: []string{"test.*"},
HeadersToRemove: []string{
"aaa",
"ccc",
JSMsgId,
},
}); err != nil {
t.Fatalf("Failed to add stream: %v", err)
}

// Now add a message with a header.
pubAck, err := js.PublishMsg(&nats.Msg{
Subject: "test.foo",
Header: nats.Header{
JSMsgId: []string{"1234"},
"aaa": []string{"111"},
"bbb": []string{"222"},
"ccc": []string{"333"},
"ddd": []string{"444"},
},
})
require_NoError(t, err)

// Now check that the header was removed.
rawMsg, err := js.GetMsg("test", pubAck.Sequence)
require_NoError(t, err)

require_Equal(t, rawMsg.Header.Get(JSMsgId), "1234")
require_Equal(t, rawMsg.Header.Get("aaa"), "")
require_Equal(t, rawMsg.Header.Get("bbb"), "222")
require_Equal(t, rawMsg.Header.Get("ccc"), "")
require_Equal(t, rawMsg.Header.Get("ddd"), "444")
}
116 changes: 100 additions & 16 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ type StreamConfig struct {
// Allow KV like semantics to also discard new on a per subject basis
DiscardNewPer bool `json:"discard_new_per_subject,omitempty"`

// Specifies a list of headers that should be kept in each message.
// If empty, all headers will be kept.
// This property is mutually exclusive with HeadersToRemove.
HeadersToKeep []string `json:"headers_to_keep,omitempty"`

// Specifies a list of headers that should be removed from each message.
HeadersToRemove []string `json:"headers_to_remove,omitempty"`

// Optional qualifiers. These can not be modified after set to true.

// Sealed will seal a stream so no messages can get out or in.
Expand Down Expand Up @@ -279,6 +287,12 @@ type stream struct {
// to know if trace event should be sent after processing.
mt map[uint64]*msgTrace

// headers to keep in each message
headersToKeep map[string]struct{}

// headers to remove from each message
headersToRemove map[string]struct{}

// For non limits policy streams when they process an ack before the actual msg.
// Can happen in stretch clusters, multi-cloud, or during catchup for a restarted server.
preAcks map[uint64]map[*consumer]struct{}
Expand Down Expand Up @@ -558,28 +572,49 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
return nil, fmt.Errorf("no applicable tier found")
}

// Create a hashmap for headers to keep or removed.
var headersToKeep, headersToRemove map[string]struct{}
if len(cfg.HeadersToKeep) > 0 {
headersToKeep = make(map[string]struct{}, len(cfg.HeadersToKeep)+1)
headersToKeep[JSMsgId] = struct{}{} // Always keep the message ID.
for _, h := range cfg.HeadersToKeep {
headersToKeep[h] = struct{}{}
}
} else if len(cfg.HeadersToRemove) > 0 {
headersToRemove = make(map[string]struct{}, len(cfg.HeadersToRemove))
for _, h := range cfg.HeadersToRemove {
if h == JSMsgId {
continue
}

headersToRemove[h] = struct{}{}
}
}

// Setup the internal clients.
c := s.createInternalJetStreamClient()
ic := s.createInternalJetStreamClient()

qpfx := fmt.Sprintf("[ACC:%s] stream '%s' ", a.Name, config.Name)
mset := &stream{
acc: a,
jsa: jsa,
cfg: cfg,
js: js,
srv: s,
client: c,
sysc: ic,
tier: tier,
stype: cfg.Storage,
consumers: make(map[string]*consumer),
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
mqch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
acc: a,
jsa: jsa,
cfg: cfg,
js: js,
srv: s,
client: c,
sysc: ic,
tier: tier,
stype: cfg.Storage,
consumers: make(map[string]*consumer),
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
mqch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
headersToKeep: headersToKeep,
headersToRemove: headersToRemove,
}

// Start our signaling routine to process consumers.
Expand Down Expand Up @@ -3402,6 +3437,48 @@ func streamAndSeq(shdr string) (string, string, uint64) {

}

func keepHeaders(hdr []byte, headers map[string]struct{}) []byte {
return processHeaders(hdr, headers, true)
}

func removeHeaders(hdr []byte, headers map[string]struct{}) []byte {
return processHeaders(hdr, headers, false)
}

func processHeaders(hdr []byte, headers map[string]struct{}, keep bool) []byte {
var index int
for {
if index >= len(hdr) {
return hdr
}

// Find the end of the line
end := bytes.Index(hdr[index:], []byte(_CRLF_))
if end < 0 {
return hdr
}
end += index

// Find the end of the key
endKey := bytes.Index(hdr[index:end], []byte(":"))
if endKey < 0 {
index = end + len(_CRLF_)
continue
}
endKey += index

if _, ok := headers[string(hdr[index:endKey])]; ok != keep {
hdr = append(hdr[:index], hdr[end+len(_CRLF_):]...)

if len(hdr) <= len(emptyHdrLine) {
return nil
}
} else {
index = end + len(_CRLF_)
}
}
}

// Lock should be held.
func (mset *stream) setStartingSequenceForSources(iNames map[string]struct{}) {
var state StreamState
Expand Down Expand Up @@ -4662,6 +4739,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}

// Remove any headers that are not needed for storage.
if mset.headersToKeep != nil {
hdr = keepHeaders(hdr, mset.headersToKeep)
} else if mset.headersToRemove != nil {
hdr = removeHeaders(hdr, mset.headersToRemove)
}

// Response Ack.
var (
response []byte
Expand Down