Skip to content

Commit 23f030e

Browse files
Cherry-picks for 2.10.19-RC.3 (#5745)
Includes the following: - #5690 - #5725 - #5729 - #5734 - #5735 - #5736 - #5743 - #5744 - #5751 - #5755 - #5754 - #5732 - #5750 - #5756 The following were reverted before this PR: - #5602 Signed-off-by: Neil Twigg <neil@nats.io>
2 parents c8788fe + 8c04528 commit 23f030e

24 files changed

+804
-105
lines changed

.travis.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ language: go
88
go:
99
# This should be quoted or use .x, but should not be unquoted.
1010
# Remember that a YAML bare float drops trailing zeroes.
11-
- "1.22.5"
12-
- "1.21.12"
11+
- "1.22.6"
12+
- "1.21.13"
1313

1414
go_import_path: github.com/nats-io/nats-server
1515

@@ -47,7 +47,7 @@ jobs:
4747
- name: "Run all tests from all other packages"
4848
env: TEST_SUITE=non_srv_pkg_tests
4949
- name: "Compile with older Go release"
50-
go: "1.20"
50+
go: "1.21.x"
5151
env: TEST_SUITE=build_only
5252

5353
script: ./scripts/runTestsOnTravis.sh $TEST_SUITE

go.mod

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/nats-io/nats-server/v2
22

3-
go 1.21
3+
go 1.21.0
44

55
require (
66
github.com/klauspost/compress v1.17.9
@@ -11,6 +11,6 @@ require (
1111
github.com/nats-io/nuid v1.0.1
1212
go.uber.org/automaxprocs v1.5.3
1313
golang.org/x/crypto v0.25.0
14-
golang.org/x/sys v0.22.0
15-
golang.org/x/time v0.5.0
14+
golang.org/x/sys v0.23.0
15+
golang.org/x/time v0.6.0
1616
)

go.sum

+4
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/
2525
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
2626
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
2727
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
28+
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
29+
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
2830
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
2931
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
32+
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
33+
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
3034
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
3135
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

server/accounts.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -1478,11 +1478,13 @@ func (a *Account) addServiceImportWithClaim(destination *Account, from, to strin
14781478
}
14791479

14801480
// Check if this introduces a cycle before proceeding.
1481-
if err := a.serviceImportFormsCycle(destination, from); err != nil {
1482-
return err
1481+
// From will be the mapped subject.
1482+
// If the 'to' has a wildcard make sure we pre-transform the 'from' before we check for cycles, e.g. '$1'
1483+
fromT := from
1484+
if subjectHasWildcard(to) {
1485+
fromT, _ = transformUntokenize(from)
14831486
}
1484-
1485-
if err := a.serviceImportFormsCycle(destination, to); err != nil {
1487+
if err := a.serviceImportFormsCycle(destination, fromT); err != nil {
14861488
return err
14871489
}
14881490

@@ -1807,7 +1809,7 @@ func (a *Account) _checkForReverseEntry(reply string, si *serviceImport, checkIn
18071809
// Note that if we are here reply has to be a literal subject.
18081810
if checkInterest {
18091811
// If interest still exists we can not clean these up yet.
1810-
if rr := a.sl.Match(reply); len(rr.psubs)+len(rr.qsubs) > 0 {
1812+
if a.sl.HasInterest(reply) {
18111813
a.mu.RUnlock()
18121814
return
18131815
}
@@ -1925,6 +1927,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
19251927
tr *subjectTransform
19261928
err error
19271929
)
1930+
19281931
if subjectHasWildcard(to) {
19291932
// If to and from match, then we use the published subject.
19301933
if to == from {
@@ -3774,7 +3777,7 @@ func fetchAccount(res AccountResolver, name string) (string, error) {
37743777
if !nkeys.IsValidPublicAccountKey(name) {
37753778
return _EMPTY_, fmt.Errorf("will only fetch valid account keys")
37763779
}
3777-
return res.Fetch(name)
3780+
return res.Fetch(copyString(name))
37783781
}
37793782

37803783
// AccountResolver interface. This is to fetch Account JWTs by public nkeys

server/auth.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -978,12 +978,12 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
978978
deniedSub := []string{}
979979
for _, sub := range denyAllJs {
980980
if c.perms.pub.deny != nil {
981-
if r := c.perms.pub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 {
981+
if c.perms.pub.deny.HasInterest(sub) {
982982
deniedPub = append(deniedPub, sub)
983983
}
984984
}
985985
if c.perms.sub.deny != nil {
986-
if r := c.perms.sub.deny.Match(sub); len(r.psubs)+len(r.qsubs) > 0 {
986+
if c.perms.sub.deny.HasInterest(sub) {
987987
deniedSub = append(deniedSub, sub)
988988
}
989989
}

server/client.go

+42-12
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,8 @@ type outbound struct {
312312
cw *s2.Writer
313313
}
314314

315+
const nbMaxVectorSize = 1024 // == IOV_MAX on Linux/Darwin and most other Unices (except Solaris/AIX)
316+
315317
const nbPoolSizeSmall = 512 // Underlying array size of small buffer
316318
const nbPoolSizeMedium = 4096 // Underlying array size of medium buffer
317319
const nbPoolSizeLarge = 65536 // Underlying array size of large buffer
@@ -1611,7 +1613,7 @@ func (c *client) flushOutbound() bool {
16111613
// referenced in c.out.nb (which can be modified in queueOutboud() while
16121614
// the lock is released).
16131615
c.out.wnb = append(c.out.wnb, collapsed...)
1614-
var _orig [1024][]byte
1616+
var _orig [nbMaxVectorSize][]byte
16151617
orig := append(_orig[:0], c.out.wnb...)
16161618

16171619
// Since WriteTo is lopping things off the beginning, we need to remember
@@ -1622,13 +1624,31 @@ func (c *client) flushOutbound() bool {
16221624
// flush here
16231625
start := time.Now()
16241626

1625-
// FIXME(dlc) - writev will do multiple IOs past 1024 on
1626-
// most platforms, need to account for that with deadline?
1627-
nc.SetWriteDeadline(start.Add(wdl))
1628-
1629-
// Actual write to the socket.
1630-
n, err := c.out.wnb.WriteTo(nc)
1631-
nc.SetWriteDeadline(time.Time{})
1627+
var n int64 // Total bytes written
1628+
var wn int64 // Bytes written per loop
1629+
var err error // Error from last write, if any
1630+
for len(c.out.wnb) > 0 {
1631+
// Limit the number of vectors to no more than nbMaxVectorSize,
1632+
// which if 1024, will mean a maximum of 64MB in one go.
1633+
wnb := c.out.wnb
1634+
if len(wnb) > nbMaxVectorSize {
1635+
wnb = wnb[:nbMaxVectorSize]
1636+
}
1637+
consumed := len(wnb)
1638+
1639+
// Actual write to the socket.
1640+
nc.SetWriteDeadline(start.Add(wdl))
1641+
wn, err = wnb.WriteTo(nc)
1642+
nc.SetWriteDeadline(time.Time{})
1643+
1644+
// Update accounting, move wnb slice onwards if needed, or stop
1645+
// if a write error was reported that wasn't a short write.
1646+
n += wn
1647+
c.out.wnb = c.out.wnb[consumed-len(wnb):]
1648+
if err != nil && err != io.ErrShortWrite {
1649+
break
1650+
}
1651+
}
16321652

16331653
lft := time.Since(start)
16341654

@@ -1810,7 +1830,9 @@ func (c *client) markConnAsClosed(reason ClosedState) {
18101830
if nc := c.nc; nc != nil && c.srv != nil {
18111831
// TODO: May want to send events to single go routine instead
18121832
// of creating a new go routine for each save.
1813-
go c.srv.saveClosedClient(c, nc, reason)
1833+
// Pass the c.subs as a reference. It may be set to nil in
1834+
// closeConnection.
1835+
go c.srv.saveClosedClient(c, nc, c.subs, reason)
18141836
}
18151837
}
18161838
// If writeLoop exists, let it do the final flush, close and teardown.
@@ -3964,7 +3986,7 @@ func (c *client) subForReply(reply []byte) *subscription {
39643986
func (c *client) handleGWReplyMap(msg []byte) bool {
39653987
// Check for leaf nodes
39663988
if c.srv.gwLeafSubs.Count() > 0 {
3967-
if r := c.srv.gwLeafSubs.Match(string(c.pa.subject)); len(r.psubs) > 0 {
3989+
if r := c.srv.gwLeafSubs.MatchBytes(c.pa.subject); len(r.psubs) > 0 {
39683990
c.processMsgResults(c.acc, r, msg, c.pa.deliver, c.pa.subject, c.pa.reply, pmrNoFlag)
39693991
}
39703992
}
@@ -5284,6 +5306,14 @@ func (c *client) closeConnection(reason ClosedState) {
52845306
}
52855307
}
52865308

5309+
// Now that we are done with subscriptions, clear the field so that the
5310+
// connection can be released and gc'ed.
5311+
if kind == CLIENT || kind == LEAF {
5312+
c.mu.Lock()
5313+
c.subs = nil
5314+
c.mu.Unlock()
5315+
}
5316+
52875317
// Don't reconnect connections that have been marked with
52885318
// the no reconnect flag.
52895319
if noReconnect {
@@ -5441,14 +5471,14 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {
54415471
}
54425472
} else {
54435473
// Match correct account and sublist.
5444-
if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil {
5474+
if acc, _ = c.srv.LookupAccount(bytesToString(c.pa.account)); acc == nil {
54455475
return nil, nil
54465476
}
54475477
}
54485478
sl := acc.sl
54495479

54505480
// Match against the account sublist.
5451-
r = sl.Match(string(c.pa.subject))
5481+
r = sl.MatchBytes(c.pa.subject)
54525482

54535483
// Check if we need to prune.
54545484
if len(c.in.pacache) >= maxPerAccountCacheSize {

server/client_test.go

+69-1
Original file line numberDiff line numberDiff line change
@@ -2137,7 +2137,7 @@ type testConnWritePartial struct {
21372137
func (c *testConnWritePartial) Write(p []byte) (int, error) {
21382138
n := len(p)
21392139
if c.partial {
2140-
n = 15
2140+
n = n/2 + 1
21412141
}
21422142
return c.buf.Write(p[:n])
21432143
}
@@ -2962,3 +2962,71 @@ func TestRemoveHeaderIfPrefixPresent(t *testing.T) {
29622962
t.Fatalf("Expected headers to be stripped, got %q", hdr)
29632963
}
29642964
}
2965+
2966+
func TestClientFlushOutboundNoSlowConsumer(t *testing.T) {
2967+
opts := DefaultOptions()
2968+
opts.MaxPending = 1024 * 1024 * 140 // 140MB
2969+
opts.MaxPayload = 1024 * 1024 * 16 // 16MB
2970+
opts.WriteDeadline = time.Second * 30
2971+
s := RunServer(opts)
2972+
defer s.Shutdown()
2973+
2974+
nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
2975+
defer nc.Close()
2976+
2977+
proxy := newNetProxy(0, 1024*1024*8, 1024*1024*8, s.ClientURL()) // 8MB/s
2978+
defer proxy.stop()
2979+
2980+
wait := make(chan error)
2981+
2982+
nca, err := nats.Connect(proxy.clientURL())
2983+
require_NoError(t, err)
2984+
nca.SetDisconnectErrHandler(func(c *nats.Conn, err error) {
2985+
wait <- err
2986+
close(wait)
2987+
})
2988+
2989+
ncb, err := nats.Connect(s.ClientURL())
2990+
require_NoError(t, err)
2991+
2992+
_, err = nca.Subscribe("test", func(msg *nats.Msg) {
2993+
wait <- nil
2994+
})
2995+
require_NoError(t, err)
2996+
2997+
// Publish 128MB of data onto the test subject. This will
2998+
// mean that the outbound queue for nca has more than 64MB,
2999+
// which is the max we will send into a single writev call.
3000+
payload := make([]byte, 1024*1024*16) // 16MB
3001+
for i := 0; i < 8; i++ {
3002+
require_NoError(t, ncb.Publish("test", payload))
3003+
}
3004+
3005+
// Get the client ID for nca.
3006+
cid, err := nca.GetClientID()
3007+
require_NoError(t, err)
3008+
3009+
// Check that the client queue has more than 64MB queued
3010+
// up in it.
3011+
s.mu.RLock()
3012+
ca := s.clients[cid]
3013+
s.mu.RUnlock()
3014+
ca.mu.Lock()
3015+
pba := ca.out.pb
3016+
ca.mu.Unlock()
3017+
require_True(t, pba > 1024*1024*64)
3018+
3019+
// Wait for our messages to be delivered. This will take
3020+
// a few seconds as the client is limited to 8MB/s, so it
3021+
// can't deliver messages to us as quickly as the other
3022+
// client can publish them.
3023+
var msgs int
3024+
for err := range wait {
3025+
require_NoError(t, err)
3026+
msgs++
3027+
if msgs == 8 {
3028+
break
3029+
}
3030+
}
3031+
require_Equal(t, msgs, 8)
3032+
}

server/consumer.go

+10-13
Original file line numberDiff line numberDiff line change
@@ -1012,8 +1012,8 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
10121012
// Check if we are running only 1 replica and that the delivery subject has interest.
10131013
// Check in place here for interest. Will setup properly in setLeader.
10141014
if config.replicas(&mset.cfg) == 1 {
1015-
r := o.acc.sl.Match(o.cfg.DeliverSubject)
1016-
if !o.hasDeliveryInterest(len(r.psubs)+len(r.qsubs) > 0) {
1015+
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
1016+
if !o.hasDeliveryInterest(interest) {
10171017
// Let the interest come to us eventually, but setup delete timer.
10181018
o.updateDeliveryInterest(false)
10191019
}
@@ -1508,7 +1508,7 @@ func (s *Server) hasGatewayInterest(account, subject string) bool {
15081508
gw.RLock()
15091509
defer gw.RUnlock()
15101510
for _, gwc := range gw.outo {
1511-
psi, qr := gwc.gatewayInterest(account, subject)
1511+
psi, qr := gwc.gatewayInterest(account, stringToBytes(subject))
15121512
if psi || qr != nil {
15131513
return true
15141514
}
@@ -1802,8 +1802,7 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
18021802
if ncfg.DeliverSubject == _EMPTY_ {
18031803
return errors.New("can not update push consumer to pull based")
18041804
}
1805-
rr := acc.sl.Match(cfg.DeliverSubject)
1806-
if len(rr.psubs)+len(rr.qsubs) != 0 {
1805+
if acc.sl.HasInterest(cfg.DeliverSubject) {
18071806
return NewJSConsumerNameExistError()
18081807
}
18091808
}
@@ -3221,8 +3220,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
32213220
}
32223221

32233222
if wr.expires.IsZero() || time.Now().Before(wr.expires) {
3224-
rr := wr.acc.sl.Match(wr.interest)
3225-
if len(rr.psubs)+len(rr.qsubs) > 0 {
3223+
if wr.acc.sl.HasInterest(wr.interest) {
32263224
return o.waiting.pop()
32273225
} else if time.Since(wr.received) < defaultGatewayRecentSubExpiration && (o.srv.leafNodeEnabled || o.srv.gateway.enabled) {
32283226
return o.waiting.pop()
@@ -3649,8 +3647,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
36493647
continue
36503648
}
36513649
// Now check interest.
3652-
rr := wr.acc.sl.Match(wr.interest)
3653-
interest := len(rr.psubs)+len(rr.qsubs) > 0
3650+
interest := wr.acc.sl.HasInterest(wr.interest)
36543651
if !interest && (s.leafNodeEnabled || s.gateway.enabled) {
36553652
// If we are here check on gateways and leaf nodes (as they can mask gateways on the other end).
36563653
// If we have interest or the request is too young break and do not expire.
@@ -4954,9 +4951,9 @@ func (o *consumer) isActive() bool {
49544951
// hasNoLocalInterest return true if we have no local interest.
49554952
func (o *consumer) hasNoLocalInterest() bool {
49564953
o.mu.RLock()
4957-
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
4954+
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
49584955
o.mu.RUnlock()
4959-
return len(rr.psubs)+len(rr.qsubs) == 0
4956+
return !interest
49604957
}
49614958

49624959
// This is when the underlying stream has been purged.
@@ -5320,13 +5317,13 @@ func (o *consumer) switchToEphemeral() {
53205317
o.mu.Lock()
53215318
o.cfg.Durable = _EMPTY_
53225319
store, ok := o.store.(*consumerFileStore)
5323-
rr := o.acc.sl.Match(o.cfg.DeliverSubject)
5320+
interest := o.acc.sl.HasInterest(o.cfg.DeliverSubject)
53245321
// Setup dthresh.
53255322
o.updateInactiveThreshold(&o.cfg)
53265323
o.mu.Unlock()
53275324

53285325
// Update interest
5329-
o.updateDeliveryInterest(len(rr.psubs)+len(rr.qsubs) > 0)
5326+
o.updateDeliveryInterest(interest)
53305327
// Write out new config
53315328
if ok {
53325329
store.updateConfig(o.cfg)

server/errors.go

+3
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ var (
180180
// ErrClusterNameRemoteConflict signals that a remote server has a different cluster name.
181181
ErrClusterNameRemoteConflict = errors.New("cluster name from remote server conflicts")
182182

183+
// ErrClusterNameHasSpaces signals that the cluster name contains spaces, which is not allowed.
184+
ErrClusterNameHasSpaces = errors.New("cluster name cannot contain spaces or new lines")
185+
183186
// ErrMalformedSubject is returned when a subscription is made with a subject that does not conform to subject rules.
184187
ErrMalformedSubject = errors.New("malformed subject")
185188

0 commit comments

Comments
 (0)