Skip to content

Commit

Permalink
Merge pull request #30 from wbollock/feat/rate_limiting
Browse files Browse the repository at this point in the history
feat: rate limiting
  • Loading branch information
wbollock authored Nov 20, 2024
2 parents 0d5f910 + 39317eb commit 4ca3150
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 4 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Flags:
--refresh-interval duration How many sec between metrics update (default 1m0s)
--batch-size-percent How large of a batch of certificates to get data for at once, supports floats (e.g 0.0 - 100.0) (default 1)
--log-level Set log level (options: info, warn, error, debug)
--request-limit float Token-bucket limiter for number of requests per second to Vault when fetching certs (0 = disabled)
--request-limit-burst int Token-bucket burst limit for number of requests per second to Vault when fetching certs (0 = match 'request-limit' value)
-v, --verbose (deprecated) Enable verbose logging. Defaults to debug level logging

Use "[command] --help" for more information about a command.
Expand Down Expand Up @@ -81,7 +83,7 @@ x509_cert_startdate{common_name="My PKI CA",country="CA",locality="Montreal",org

## Batch Size

Vault PKI Exporter supports a `--batch-size-percent` flag to batch many requests for individual certificate metrics at once.
Vault PKI Exporter supports a `--batch-size-percent` flag to batch many requests for individual certificate metrics at once. Each active batch will create a goroutine.

If you are getting many log messages such as:

Expand All @@ -91,6 +93,10 @@ level=error msg="failed to get certificate for pki/26:97:08:32:44:40:30:de:11:5z

Your batch size is probably too high.

## Rate Limiting

Rate limiting flags are also added for large Vault installations. These rate limits apply to all batches with a global, shared limit between batches. This is to prevent overloading Vault with many API calls. You may want to set your `--request-limit-burst` roughly equal to `--request-limit` so the token bucket will begin with as many tokens as your limit uses. This is measured in Vault API calls per second.

## Certificate Selection

Any certificate with a unique subject common name and organizational unit is considered for metrics. If a certificate is renewed in place with the same CN and OU, it will still retain the same time series to avoid false alarms.
Expand Down
12 changes: 11 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ func init() {
if err := viper.BindPFlag("batch_size_percent", flags.Lookup("batch-size-percent")); err != nil {
logger.SlogFatal("Could not bind batch-size-percent flag", "error", err)
}

flags.Float64("request-limit", 0.0, "Token-bucket limiter for number of requests per second to Vault when fetching certs (0 = disabled)")
if err := viper.BindPFlag("request_limit", flags.Lookup("request-limit")); err != nil {
logger.SlogFatal("Could not bind request-limit flag", "error", err)
}

flags.Int("request-limit-burst", 0, "Token-bucket burst limit for number of requests per second to Vault when fetching certs (0 = match 'request-limit' value)")
if err := viper.BindPFlag("request_limit_burst", flags.Lookup("request-limit-burst")); err != nil {
logger.SlogFatal("Could not bind request-limit-burst flag", "error", err)
}
}

func main() {
Expand All @@ -86,7 +96,7 @@ func main() {
}

// note mix of underscores and dashes
slog.Info("CLI flag values", "fetch-interval", viper.GetDuration("fetch_interval"), "refresh-interval", viper.GetDuration("refresh_interval"), "batch-size-percent", viper.GetFloat64("batch_size_percent"))
slog.Info("CLI flag values", "fetch-interval", viper.GetDuration("fetch_interval"), "refresh-interval", viper.GetDuration("refresh_interval"), "batch-size-percent", viper.GetFloat64("batch_size_percent"), "request-limit", viper.GetFloat64("request_limit"), "request-limit-burst", viper.GetInt("request_limit_burst"))

err := cli.Execute()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ services:
- --fetch-interval=5s
- --refresh-interval=5s
- --log-level=debug
# 20 requests per second
- --request-limit=20
# burst of 20 tokens
- --request-limit-burst=20
networks:
- vault-pki-exporter
ports:
Expand Down
2 changes: 2 additions & 0 deletions pkg/vault-mon/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package vault_mon
import (
"crypto/x509"
"fmt"
"log/slog"
"os"
"strings"
"time"
Expand All @@ -21,6 +22,7 @@ func InfluxWatchCerts(pkimon *PKIMon, interval time.Duration, loop bool) {
go func() {
for {
influxProcessData(pkimon)
slog.Info("Sleeping after processing influx data", "interval", interval)
time.Sleep(interval)
}
}()
Expand Down
35 changes: 34 additions & 1 deletion pkg/vault-mon/pki.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package vault_mon

import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/spf13/viper"
"golang.org/x/time/rate"
)

type PKI struct {
Expand All @@ -40,6 +42,12 @@ var loadCertsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Buckets: prometheus.ExponentialBuckets(1, 3, 10),
})

var loadCertsLimitDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "x509_load_certs_request_limit_gated_duration_seconds",
Help: "Duration of time spent throttled waiting to contact Vault during loadCerts execution",
Buckets: prometheus.ExponentialBuckets(1, 3, 10),
})

func (mon *PKIMon) Init(vault *vaultapi.Client) error {
mon.vault = vault
mon.pkis = make(map[string]*PKI)
Expand Down Expand Up @@ -96,6 +104,7 @@ func (mon *PKIMon) Watch(interval time.Duration) {
}
}
mon.Loaded = true
slog.Info("Sleeping after refreshing PKI certs", "interval", interval)
time.Sleep(interval)
}
}()
Expand Down Expand Up @@ -237,6 +246,22 @@ func (pki *PKI) loadCerts() error {
batchSize = 1
}

requestLimit := rate.Limit(viper.GetFloat64("request_limit"))
requestLimitBurst := viper.GetInt("request_limit_burst")

// Special value for limiter that allows all events
if requestLimit == 0 {
requestLimit = rate.Inf
}

// If non-default value for requestLimit, but default requestLimitBurst,
// set requestLimitBurst to requestLimit
if requestLimit != rate.Inf && requestLimitBurst == 0 {
requestLimitBurst = int(requestLimit)
}

limiter := rate.NewLimiter(requestLimit, requestLimitBurst)

// gather CRLs to determine revoked certs
revokedCerts := make(map[string]struct{})

Expand All @@ -257,7 +282,7 @@ func (pki *PKI) loadCerts() error {
batchKeys := serialsList.Keys[i:end]

var wg sync.WaitGroup
slog.Info("Processing batch of certs", "pki", pki.path, "batchsize", len(batchKeys))
slog.Info("Processing batch of certs", "pki", pki.path, "batchsize", len(batchKeys), "total_size", len(serialsList.Keys))

// add a mutex for protecting concurrent access to the certs map
var certsMux sync.Mutex
Expand All @@ -266,6 +291,14 @@ func (pki *PKI) loadCerts() error {
go func(serial string) {
defer wg.Done()

waitStart := time.Now()
err := limiter.Wait(context.Background())
if err != nil {
slog.Error("Error waiting for request limiter", "error", err)
return
}
loadCertsLimitDuration.Observe(time.Since(waitStart).Seconds())

secret, err := pki.vault.Logical().Read(fmt.Sprintf("%scert/%s", pki.path, serial))
if err != nil || secret == nil || secret.Data == nil {
slog.Error("Failed to get certificate", "pki", pki.path, "serial", serial, "error", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/vault-mon/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func PromWatchCerts(pkimon *PKIMon, interval time.Duration) {

duration := time.Since(startTime).Seconds()
promWatchCertsDuration.Observe(duration)
slog.Info("PromWatchCerts loop completed", "duration_seconds", duration, "pkis_processed", len(pkis))
slog.Info("Sleeping after PromWatchCerts loop completed", "duration_seconds", duration, "pkis_processed", len(pkis), "interval", interval)
time.Sleep(interval)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/vault/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (vault *ClientWrapper) GetSecret(path string, fn secretCallback) error {
if secret.LeaseDuration > 0 {
go func() {
for {
slog.Info("Sleeping before refreshing vault", "interval", time.Duration(secret.LeaseDuration))
time.Sleep(time.Duration(secret.LeaseDuration) * time.Second)
secret, err = vault.Client.Logical().Read(path)
if err != nil {
Expand Down Expand Up @@ -175,6 +176,7 @@ func watch_renewer_vault(renewer *vaultapi.Renewer) {
go func() {
for {
// Prevent loop when secret wasn't renewed before expiration
slog.Info("Waiting before calling another renew", "interval", time.Second)
time.Sleep(time.Second)
renewer.Renew()
}
Expand Down

0 comments on commit 4ca3150

Please sign in to comment.