@@ -4457,3 +4457,69 @@ func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) {
4457
4457
// it should succeed.
4458
4458
require_True (t , s1 .publishAdvisory (s1 .GlobalAccount (), subj , "test" ))
4459
4459
}
4460
+
4461
+ func TestJetStreamClusterRoutedAPIRecoverPerformance (t * testing.T ) {
4462
+ c := createJetStreamClusterExplicit (t , "R3S" , 3 )
4463
+ defer c .shutdown ()
4464
+
4465
+ nc , _ := jsClientConnect (t , c .randomNonLeader ())
4466
+ defer nc .Close ()
4467
+
4468
+ // We only run 16 JetStream API workers.
4469
+ mp := runtime .GOMAXPROCS (0 )
4470
+ if mp > 16 {
4471
+ mp = 16
4472
+ }
4473
+
4474
+ leader := c .leader ()
4475
+ ljs := leader .js .Load ()
4476
+
4477
+ // Take the JS lock, which allows the JS API queue to build up.
4478
+ ljs .mu .Lock ()
4479
+ defer ljs .mu .Unlock ()
4480
+
4481
+ count := JSDefaultRequestQueueLimit - 1
4482
+ ch := make (chan * nats.Msg , count )
4483
+
4484
+ inbox := nc .NewRespInbox ()
4485
+ _ , err := nc .ChanSubscribe (inbox , ch )
4486
+ require_NoError (t , err )
4487
+
4488
+ // To ensure a fair starting line, we need to submit as many tasks as
4489
+ // there are JS workers whilst holding the JS lock. This will ensure that
4490
+ // each JS API worker is properly wedged.
4491
+ msg := & nats.Msg {
4492
+ Subject : fmt .Sprintf (JSApiConsumerInfoT , "Doesnt" , "Exist" ),
4493
+ Reply : "no_one_here" ,
4494
+ }
4495
+ for i := 0 ; i < mp ; i ++ {
4496
+ require_NoError (t , nc .PublishMsg (msg ))
4497
+ }
4498
+
4499
+ // Then we want to submit a fixed number of tasks, big enough to fill
4500
+ // the queue, so that we can measure them.
4501
+ msg = & nats.Msg {
4502
+ Subject : fmt .Sprintf (JSApiConsumerInfoT , "Doesnt" , "Exist" ),
4503
+ Reply : inbox ,
4504
+ }
4505
+ for i := 0 ; i < count ; i ++ {
4506
+ require_NoError (t , nc .PublishMsg (msg ))
4507
+ }
4508
+ checkFor (t , 5 * time .Second , 25 * time .Millisecond , func () error {
4509
+ if queued := leader .jsAPIRoutedReqs .len (); queued != count {
4510
+ return fmt .Errorf ("expected %d queued requests, got %d" , count , queued )
4511
+ }
4512
+ return nil
4513
+ })
4514
+
4515
+ // Now we're going to release the lock and start timing. The workers
4516
+ // will now race to clear the queues and we'll wait to see how long
4517
+ // it takes for them all to respond.
4518
+ start := time .Now ()
4519
+ ljs .mu .Unlock ()
4520
+ for i := 0 ; i < count ; i ++ {
4521
+ <- ch
4522
+ }
4523
+ ljs .mu .Lock ()
4524
+ t .Logf ("Took %s to clear %d items" , time .Since (start ), count )
4525
+ }
0 commit comments