From 5bde876efe98025c5bba06297fc08f16b9209de3 Mon Sep 17 00:00:00 2001 From: Will Bollock Date: Mon, 18 Nov 2024 11:07:45 -0500 Subject: [PATCH 1/8] feat: rate limiting Implement's a commit from @wbh1. Copied message: "In instances where Vault has a bunch of certificates, you can essentially DoS Vault with all your requests. Instead of spamming all of the requests as fast as you can, this allows you to throttle how many requests per second you send to Vault by using a simple token-bucket rate limiter. The limiter is off by default, so this does not change existing default behavior." https://github.com/linode-obs/vault-pki-exporter/pull/11/commits/1e45b57a6f04339047be1574354e19d17efeefc8 Co-authored-by: Will Hegedus --- README.md | 2 ++ cmd/main.go | 12 +++++++++++- compose.yaml | 4 ++++ pkg/vault-mon/pki.go | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f3f41a6..73c2b70 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,8 @@ Flags: --refresh-interval duration How many sec between metrics update (default 1m0s) --batch-size-percent How large of a batch of certificates to get data for at once, supports floats (e.g 0.0 - 100.0) (default 1) --log-level Set log level (options: info, warn, error, debug) + --request-limit float Token-bucket limiter for number of requests per second to Vault when fetching certs (0 = disabled) + --request-limit-burst int Token-bucket burst limit for number of requests per second to Vault when fetching certs (0 = match 'request-limit' value) -v, --verbose (deprecated) Enable verbose logging. Defaults to debug level logging Use " [command] --help" for more information about a command. diff --git a/cmd/main.go b/cmd/main.go index a8975d0..1018965 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -73,6 +73,16 @@ func init() { if err := viper.BindPFlag("batch_size_percent", flags.Lookup("batch-size-percent")); err != nil { log.Fatal("Could not bind batch-size-percent flag:", err) } + + flags.Float64("request-limit", 0.0, "Token-bucket limiter for number of requests per second to Vault when fetching certs (0 = disabled)") + if err := viper.BindPFlag("request_limit", flags.Lookup("request-limit")); err != nil { + log.Fatal(err) + } + + flags.Int("request-limit-burst", 0, "Token-bucket burst limit for number of requests per second to Vault when fetching certs (0 = match 'request-limit' value)") + if err := viper.BindPFlag("request_limit_burst", flags.Lookup("request-limit-burst")); err != nil { + log.Fatal(err) + } } func main() { @@ -87,7 +97,7 @@ func main() { } // note mix of underscores and dashes - slog.Info("CLI flag values", "fetch-interval", viper.GetDuration("fetch_interval"), "refresh-interval", viper.GetDuration("refresh_interval"), "batch-size-percent", viper.GetFloat64("batch_size_percent") ) + slog.Info("CLI flag values", "fetch-interval", viper.GetDuration("fetch_interval"), "refresh-interval", viper.GetDuration("refresh_interval"), "batch-size-percent", viper.GetFloat64("batch_size_percent"), "request-limit", viper.GetFloat64("request_limit"), "request-limit-burst", viper.GetInt("request_limit_burst") ) err := cli.Execute() if err != nil { diff --git a/compose.yaml b/compose.yaml index 5929f7b..12e1025 100644 --- a/compose.yaml +++ b/compose.yaml @@ -29,6 +29,10 @@ services: - --fetch-interval=5s - --refresh-interval=5s - --log-level=debug + # 5 requests per second + - --request-limit=5 + # burst of 75 tokens + - --request-limit-burst=75 networks: - vault-pki-exporter ports: diff --git a/pkg/vault-mon/pki.go b/pkg/vault-mon/pki.go index 042b590..dd690f6 100644 --- a/pkg/vault-mon/pki.go +++ b/pkg/vault-mon/pki.go @@ -1,6 +1,7 @@ package vault_mon import ( + "context" "crypto/x509" "encoding/pem" "fmt" @@ -14,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/spf13/viper" + "golang.org/x/time/rate" ) type PKI struct { @@ -40,6 +42,12 @@ var loadCertsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Buckets: prometheus.ExponentialBuckets(1, 3, 10), }) +var loadCertsLimitDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "x509_load_certs_request_limit_gated_duration_seconds", + Help: "Duration of time spent throttled waiting to contact Vault during loadCerts execution", + Buckets: prometheus.ExponentialBuckets(1, 3, 10), +}) + func (mon *PKIMon) Init(vault *vaultapi.Client) error { mon.vault = vault mon.pkis = make(map[string]*PKI) @@ -237,6 +245,22 @@ func (pki *PKI) loadCerts() error { batchSize = 1 } + requestLimit := rate.Limit(viper.GetFloat64("request_limit")) + requestLimitBurst := viper.GetInt("request_limit_burst") + + // Special value for limiter that allows all events + if requestLimit == 0 { + requestLimit = rate.Inf + } + + // If non-default value for requestLimit, but default requestLimitBurst, + // set requestLimitBurst to requestLimit + if requestLimit != rate.Inf && requestLimitBurst == 0 { + requestLimitBurst = int(requestLimit) + } + + limiter := rate.NewLimiter(requestLimit, requestLimitBurst) + // loop in batches via waitgroups to make this much faster for large vault installations for i := 0; i < len(serialsList.Keys); i += batchSize { end := i + batchSize @@ -255,6 +279,14 @@ func (pki *PKI) loadCerts() error { go func(serial string) { defer wg.Done() + waitStart := time.Now() + err := limiter.Wait(context.Background()) + if err != nil { + slog.Error("Error waiting for request limiter", "error", err) + return + } + loadCertsLimitDuration.Observe(time.Since(waitStart).Seconds()) + secret, err := pki.vault.Logical().Read(fmt.Sprintf("%scert/%s", pki.path, serial)) if err != nil || secret == nil || secret.Data == nil { slog.Error("Failed to get certificate", "pki", pki.path, "serial", serial, "error", err) From 903465542efc12b785527cf28ef329bab81bcc55 Mon Sep 17 00:00:00 2001 From: Will Bollock Date: Mon, 18 Nov 2024 12:12:43 -0500 Subject: [PATCH 2/8] docs: add rate limiting docs and sleep info logs --- README.md | 6 +++++- compose.yaml | 4 ++-- pkg/vault-mon/influx.go | 2 ++ pkg/vault-mon/pki.go | 1 + pkg/vault-mon/prometheus.go | 2 +- pkg/vault/client.go | 2 ++ 6 files changed, 13 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 73c2b70..8237bc8 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ x509_cert_startdate{common_name="My PKI CA",country="CA",locality="Montreal",org ## Batch Size -Vault PKI Exporter supports a `--batch-size-percent` flag to batch many requests for individual certificate metrics at once. +Vault PKI Exporter supports a `--batch-size-percent` flag to batch many requests for individual certificate metrics at once. Each active batch will create a goroutine. If you are getting many log messages such as: @@ -93,6 +93,10 @@ level=error msg="failed to get certificate for pki/26:97:08:32:44:40:30:de:11:5z Your batch size is probably too high. +## Rate Limiting + +Rate limiting flags are also added for large Vault installations. These rate limits apply to all batches with a global, shared limit between batches. This is to prevent overloading Vault with many API calls. You may want to set your `--request-limit-burst` roughly equal to `--request-limit` so the token bucket will begin with as many tokens as your limit uses. This is measured in Vault API calls per second. + ## Certificate Selection Any certificate with a unique subject common name and organizational unit is considered for metrics. If a certificate is renewed in place with the same CN and OU, it will still retain the same time series to avoid false alarms. diff --git a/compose.yaml b/compose.yaml index 12e1025..12f3219 100644 --- a/compose.yaml +++ b/compose.yaml @@ -30,9 +30,9 @@ services: - --refresh-interval=5s - --log-level=debug # 5 requests per second - - --request-limit=5 + - --request-limit=20 # burst of 75 tokens - - --request-limit-burst=75 + - --request-limit-burst=20 networks: - vault-pki-exporter ports: diff --git a/pkg/vault-mon/influx.go b/pkg/vault-mon/influx.go index 17ba25b..0d4fc2d 100644 --- a/pkg/vault-mon/influx.go +++ b/pkg/vault-mon/influx.go @@ -3,6 +3,7 @@ package vault_mon import ( "crypto/x509" "fmt" + "log/slog" "os" "strings" "time" @@ -21,6 +22,7 @@ func InfluxWatchCerts(pkimon *PKIMon, interval time.Duration, loop bool) { go func() { for { influxProcessData(pkimon) + slog.Info("Sleeping after processing influx data", "time", interval) time.Sleep(interval) } }() diff --git a/pkg/vault-mon/pki.go b/pkg/vault-mon/pki.go index dd690f6..ecd07a6 100644 --- a/pkg/vault-mon/pki.go +++ b/pkg/vault-mon/pki.go @@ -104,6 +104,7 @@ func (mon *PKIMon) Watch(interval time.Duration) { } } mon.Loaded = true + slog.Info("Sleeping after refreshing PKI certs", "time", interval) time.Sleep(interval) } }() diff --git a/pkg/vault-mon/prometheus.go b/pkg/vault-mon/prometheus.go index ac47d05..4a2e86f 100644 --- a/pkg/vault-mon/prometheus.go +++ b/pkg/vault-mon/prometheus.go @@ -128,7 +128,7 @@ func PromWatchCerts(pkimon *PKIMon, interval time.Duration) { duration := time.Since(startTime).Seconds() promWatchCertsDuration.Observe(duration) - slog.Info("PromWatchCerts loop completed", "duration_seconds", duration, "pkis_processed", len(pkis)) + slog.Info("Sleeping after PromWatchCerts loop completed", "duration_seconds", duration, "pkis_processed", len(pkis), "time", interval) time.Sleep(interval) } } diff --git a/pkg/vault/client.go b/pkg/vault/client.go index ef30814..28a5382 100644 --- a/pkg/vault/client.go +++ b/pkg/vault/client.go @@ -115,6 +115,7 @@ func (vault *ClientWrapper) GetSecret(path string, fn secretCallback) error { if secret.LeaseDuration > 0 { go func() { for { + slog.Info("Sleeping before refreshing vault", "time", time.Duration(secret.LeaseDuration)) time.Sleep(time.Duration(secret.LeaseDuration) * time.Second) secret, err = vault.Client.Logical().Read(path) if err != nil { @@ -175,6 +176,7 @@ func watch_renewer_vault(renewer *vaultapi.Renewer) { go func() { for { // Prevent loop when secret wasn't renewed before expiration + slog.Info("Waiting before calling another renew", "time", time.Second) time.Sleep(time.Second) renewer.Renew() } From be584d910665e6ca3a6de5d22b5d38fcd38f2ee3 Mon Sep 17 00:00:00 2001 From: Will Bollock Date: Mon, 18 Nov 2024 12:15:16 -0500 Subject: [PATCH 3/8] chore: log total size of keys --- pkg/vault-mon/pki.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/vault-mon/pki.go b/pkg/vault-mon/pki.go index ecd07a6..4e1013a 100644 --- a/pkg/vault-mon/pki.go +++ b/pkg/vault-mon/pki.go @@ -271,7 +271,7 @@ func (pki *PKI) loadCerts() error { batchKeys := serialsList.Keys[i:end] var wg sync.WaitGroup - slog.Info("Processing batch of certs", "pki", pki.path, "batchsize", len(batchKeys)) + slog.Info("Processing batch of certs", "pki", pki.path, "batchsize", len(batchKeys), "total_size", len(serialsList.Keys)) // add a mutex for protecting concurrent access to the certs map var certsMux sync.Mutex From 17cee2aa3e2f0b47c4a5a43bec571958a2981820 Mon Sep 17 00:00:00 2001 From: Will Bollock Date: Mon, 18 Nov 2024 12:39:45 -0500 Subject: [PATCH 4/8] fix: rename time log to interval time is a standard log collector field, interval is better and won't conflict --- pkg/vault-mon/influx.go | 2 +- pkg/vault-mon/pki.go | 2 +- pkg/vault-mon/prometheus.go | 2 +- pkg/vault/client.go | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/vault-mon/influx.go b/pkg/vault-mon/influx.go index 0d4fc2d..c9ead16 100644 --- a/pkg/vault-mon/influx.go +++ b/pkg/vault-mon/influx.go @@ -22,7 +22,7 @@ func InfluxWatchCerts(pkimon *PKIMon, interval time.Duration, loop bool) { go func() { for { influxProcessData(pkimon) - slog.Info("Sleeping after processing influx data", "time", interval) + slog.Info("Sleeping after processing influx data", "interval", interval) time.Sleep(interval) } }() diff --git a/pkg/vault-mon/pki.go b/pkg/vault-mon/pki.go index 4e1013a..b1071f5 100644 --- a/pkg/vault-mon/pki.go +++ b/pkg/vault-mon/pki.go @@ -104,7 +104,7 @@ func (mon *PKIMon) Watch(interval time.Duration) { } } mon.Loaded = true - slog.Info("Sleeping after refreshing PKI certs", "time", interval) + slog.Info("Sleeping after refreshing PKI certs", "interval", interval) time.Sleep(interval) } }() diff --git a/pkg/vault-mon/prometheus.go b/pkg/vault-mon/prometheus.go index 4a2e86f..1d1f1c7 100644 --- a/pkg/vault-mon/prometheus.go +++ b/pkg/vault-mon/prometheus.go @@ -128,7 +128,7 @@ func PromWatchCerts(pkimon *PKIMon, interval time.Duration) { duration := time.Since(startTime).Seconds() promWatchCertsDuration.Observe(duration) - slog.Info("Sleeping after PromWatchCerts loop completed", "duration_seconds", duration, "pkis_processed", len(pkis), "time", interval) + slog.Info("Sleeping after PromWatchCerts loop completed", "duration_seconds", duration, "pkis_processed", len(pkis), "interval", interval) time.Sleep(interval) } } diff --git a/pkg/vault/client.go b/pkg/vault/client.go index 28a5382..9ceed82 100644 --- a/pkg/vault/client.go +++ b/pkg/vault/client.go @@ -115,7 +115,7 @@ func (vault *ClientWrapper) GetSecret(path string, fn secretCallback) error { if secret.LeaseDuration > 0 { go func() { for { - slog.Info("Sleeping before refreshing vault", "time", time.Duration(secret.LeaseDuration)) + slog.Info("Sleeping before refreshing vault", "interval", time.Duration(secret.LeaseDuration)) time.Sleep(time.Duration(secret.LeaseDuration) * time.Second) secret, err = vault.Client.Logical().Read(path) if err != nil { @@ -176,7 +176,7 @@ func watch_renewer_vault(renewer *vaultapi.Renewer) { go func() { for { // Prevent loop when secret wasn't renewed before expiration - slog.Info("Waiting before calling another renew", "time", time.Second) + slog.Info("Waiting before calling another renew", "interval", time.Second) time.Sleep(time.Second) renewer.Renew() } From 50631cc7a3ffc46f2c46dc772c4cb4392f99a117 Mon Sep 17 00:00:00 2001 From: Will Bollock Date: Mon, 18 Nov 2024 13:05:00 -0500 Subject: [PATCH 5/8] feat: return certs incrementally rather than wait for all certs to be examinined, this changes GetCerts to just return whatever it currently has and re-locks itself when done. works well with prometheus refresh interval flag to return metrics more often. The fetch interval should still clear all certs from scratch --- pkg/vault-mon/pki.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/vault-mon/pki.go b/pkg/vault-mon/pki.go index b1071f5..1e88170 100644 --- a/pkg/vault-mon/pki.go +++ b/pkg/vault-mon/pki.go @@ -361,7 +361,9 @@ func (pki *PKI) GetCRLs() map[string]*x509.RevocationList { } func (pki *PKI) GetCerts() map[string]map[string]*x509.Certificate { + // Unlock the mutex, return the certs, then re-lock it + pki.certsmux.Unlock() + certs := pki.certs pki.certsmux.Lock() - defer pki.certsmux.Unlock() - return pki.certs + return certs } From eadd5e97b24175830f073f4a6bb543e842f04179 Mon Sep 17 00:00:00 2001 From: Will Bollock Date: Tue, 19 Nov 2024 14:55:17 -0500 Subject: [PATCH 6/8] fix: remove unintended getcerts change Was testing, merge conflict brought it in --- pkg/vault-mon/pki.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/vault-mon/pki.go b/pkg/vault-mon/pki.go index dddd08f..54ac89b 100644 --- a/pkg/vault-mon/pki.go +++ b/pkg/vault-mon/pki.go @@ -377,9 +377,7 @@ func (pki *PKI) GetCRLs() map[string]*x509.RevocationList { } func (pki *PKI) GetCerts() map[string]map[string]*x509.Certificate { - // Unlock the mutex, return the certs, then re-lock it - pki.certsmux.Unlock() - certs := pki.certs pki.certsmux.Lock() - return certs + defer pki.certsmux.Unlock() + return pki.certs } From 2706c3bcafc336002c6201c70438fd66fa5fef0b Mon Sep 17 00:00:00 2001 From: Will Bollock Date: Tue, 19 Nov 2024 15:01:47 -0500 Subject: [PATCH 7/8] fix: remove log.fatal --- cmd/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 14f91ed..37b9a54 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -76,12 +76,12 @@ func init() { flags.Float64("request-limit", 0.0, "Token-bucket limiter for number of requests per second to Vault when fetching certs (0 = disabled)") if err := viper.BindPFlag("request_limit", flags.Lookup("request-limit")); err != nil { - log.Fatal(err) + logger.SlogFatal("Could not bind request-limit flag", "error", err) } flags.Int("request-limit-burst", 0, "Token-bucket burst limit for number of requests per second to Vault when fetching certs (0 = match 'request-limit' value)") if err := viper.BindPFlag("request_limit_burst", flags.Lookup("request-limit-burst")); err != nil { - log.Fatal(err) + logger.SlogFatal("Could not bind request-limit-burst flag", "error", err) } } @@ -96,7 +96,7 @@ func main() { } // note mix of underscores and dashes - slog.Info("CLI flag values", "fetch-interval", viper.GetDuration("fetch_interval"), "refresh-interval", viper.GetDuration("refresh_interval"), "batch-size-percent", viper.GetFloat64("batch_size_percent"), "request-limit", viper.GetFloat64("request_limit"), "request-limit-burst", viper.GetInt("request_limit_burst") ) + slog.Info("CLI flag values", "fetch-interval", viper.GetDuration("fetch_interval"), "refresh-interval", viper.GetDuration("refresh_interval"), "batch-size-percent", viper.GetFloat64("batch_size_percent"), "request-limit", viper.GetFloat64("request_limit"), "request-limit-burst", viper.GetInt("request_limit_burst")) err := cli.Execute() if err != nil { From 39317ebdc4254a8426d85d9199b229dfb32efb86 Mon Sep 17 00:00:00 2001 From: Will Bollock Date: Tue, 19 Nov 2024 15:04:30 -0500 Subject: [PATCH 8/8] fix: compose comments --- compose.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/compose.yaml b/compose.yaml index 12f3219..5c779bb 100644 --- a/compose.yaml +++ b/compose.yaml @@ -29,9 +29,9 @@ services: - --fetch-interval=5s - --refresh-interval=5s - --log-level=debug - # 5 requests per second + # 20 requests per second - --request-limit=20 - # burst of 75 tokens + # burst of 20 tokens - --request-limit-burst=20 networks: - vault-pki-exporter