7
7
"fmt"
8
8
"io/ioutil"
9
9
"log"
10
+ "math"
10
11
"math/rand"
11
12
"os"
12
13
"os/signal"
@@ -20,6 +21,7 @@ import (
20
21
21
22
hdrhistogram "github.com/HdrHistogram/hdrhistogram-go"
22
23
redis "github.com/redis/go-redis/v9"
24
+ "golang.org/x/time/rate"
23
25
)
24
26
25
27
const (
@@ -31,6 +33,8 @@ const (
31
33
redisTLSInsecureSkipVerify = "tls_insecure_skip_verify"
32
34
)
33
35
36
+ const Inf = rate .Limit (math .MaxFloat64 )
37
+
34
38
var totalMessages uint64
35
39
var totalSubscribedChannels int64
36
40
var totalPublishers int64
@@ -53,7 +57,7 @@ type testResult struct {
53
57
Addresses []string `json:"Addresses"`
54
58
}
55
59
56
- func publisherRoutine (clientName string , channels []string , mode string , measureRTT bool , verbose bool , dataSize int , ctx context.Context , wg * sync.WaitGroup , client * redis.Client ) {
60
+ func publisherRoutine (clientName string , channels []string , mode string , measureRTT bool , verbose bool , dataSize int , ctx context.Context , wg * sync.WaitGroup , client * redis.Client , useLimiter bool , rateLimiter * rate. Limiter ) {
57
61
defer wg .Done ()
58
62
59
63
if verbose {
@@ -81,12 +85,16 @@ func publisherRoutine(clientName string, channels []string, mode string, measure
81
85
82
86
default :
83
87
msg := payload
84
- if measureRTT {
85
- now := time .Now ().UnixMicro ()
86
- msg = strconv .FormatInt (now , 10 )
87
- }
88
88
89
89
for _ , ch := range channels {
90
+ if useLimiter {
91
+ r := rateLimiter .ReserveN (time .Now (), int (1 ))
92
+ time .Sleep (r .Delay ())
93
+ }
94
+ if measureRTT {
95
+ now := time .Now ().UnixMicro ()
96
+ msg = strconv .FormatInt (now , 10 )
97
+ }
90
98
var err error
91
99
switch mode {
92
100
case "spublish" :
@@ -210,6 +218,8 @@ func main() {
210
218
host := flag .String ("host" , "127.0.0.1" , "redis host." )
211
219
port := flag .String ("port" , "6379" , "redis port." )
212
220
cpuprofile := flag .String ("cpuprofile" , "" , "write cpu profile to file" )
221
+ rps := flag .Int64 ("rps" , 0 , "Max rps. If 0 no limit is applied and the DB is stressed up to maximum." )
222
+ rpsburst := flag .Int64 ("rps-burst" , 0 , "Max rps burst. If 0 the allowed burst will be the ammount of clients." )
213
223
password := flag .String ("a" , "" , "Password for Redis Auth." )
214
224
dataSize := flag .Int ("data-size" , 128 , "Payload size in bytes for publisher messages when RTT mode is disabled" )
215
225
mode := flag .String ("mode" , "subscribe" , "Subscribe mode. Either 'subscribe' or 'ssubscribe'." )
@@ -381,6 +391,21 @@ func main() {
381
391
rttLatencyChannel := make (chan int64 , 100000 ) // Channel for RTT measurements. buffer of 100K messages to process
382
392
totalCreatedClients := 0
383
393
if strings .Contains (* mode , "publish" ) {
394
+ var requestRate = Inf
395
+ var requestBurst = int (* rps )
396
+ useRateLimiter := false
397
+ if * rps != 0 {
398
+ requestRate = rate .Limit (* rps )
399
+ log .Println (fmt .Sprintf ("running publisher mode with rate-limit enabled. Max published %d messages/sec.\n " , * rps ))
400
+ useRateLimiter = true
401
+ if * rpsburst != 0 {
402
+ requestBurst = int (* rpsburst )
403
+ }
404
+ } else {
405
+ log .Println (fmt .Sprintf ("running publisher mode with maximum rate enabled." ))
406
+ }
407
+
408
+ var rateLimiter = rate .NewLimiter (requestRate , requestBurst )
384
409
// Only run publishers
385
410
for client_id := 1 ; client_id <= * clients ; client_id ++ {
386
411
channels := []string {}
@@ -421,7 +446,7 @@ func main() {
421
446
}
422
447
423
448
wg .Add (1 )
424
- go publisherRoutine (publisherName , channels , * mode , * measureRTT , * verbose , * dataSize , ctx , & wg , client )
449
+ go publisherRoutine (publisherName , channels , * mode , * measureRTT , * verbose , * dataSize , ctx , & wg , client , useRateLimiter , rateLimiter )
425
450
atomic .AddInt64 (& totalPublishers , 1 )
426
451
atomic .AddUint64 (& totalConnects , 1 )
427
452
}
0 commit comments