diff --git a/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto b/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto index 5cbcaf64a90e..79325aec29b5 100644 --- a/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto +++ b/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto @@ -171,9 +171,9 @@ message LocalRateLimit { // Specifies the max dynamic descriptors kept in the cache for a particular wildcarded descriptor // configured in the global :ref:`descriptors`. // Wildcarded descriptor means descriptor has one or more entries `value` omitted. For example if user has configured two descriptors - // with blank value entries, then max dynamic descriptors stored in the LRU cache will be 2 * dynamic_descripters_lru_cache_limit. + // with blank value entries, then max dynamic descriptors stored in the LRU cache will be 2 * max_dynamic_descriptors. // Actual number of dynamic descriptors will depend on the cardinality of unique values received from the http request for the omitted // values. // Default is 20. - uint32 dynamic_descripters_lru_cache_limit = 18; + google.protobuf.UInt32Value max_dynamic_descriptors = 18; } diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index a06e49b182ea..37b07681d190 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -169,6 +169,10 @@ FALSE_RUNTIME_GUARD(envoy_reloadable_features_logging_with_fast_json_formatter); // before downstream. FALSE_RUNTIME_GUARD(envoy_reloadable_features_allow_multiplexed_upstream_half_close); +// A flag to enable the usage of dynamic buckets for local rate limiting. For example, dynamically +// created token buckets for each unique value of a request header. +FALSE_RUNTIME_GUARD(envoy_reloadable_features_local_rate_limiting_with_dynamic_buckets); + // Block of non-boolean flags. Use of int flags is deprecated. Do not add more. ABSL_FLAG(uint64_t, re2_max_program_size_error_level, 100, ""); // NOLINT ABSL_FLAG(uint64_t, re2_max_program_size_warn_level, // NOLINT diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc index 037bb8e4885e..290dc529c127 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc @@ -207,6 +207,11 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( new_descriptor.entries_.reserve(descriptor.entries_size()); for (const auto& entry : descriptor.entries()) { if (entry.value().empty()) { + if (!(Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.local_rate_limiting_with_dynamic_buckets"))) { + throw EnvoyException("local_rate_limiting_with_dynamic_buckets is disabled. Local rate " + "descriptor value cannot be empty"); + } if (per_connection) { throw EnvoyException( "local rate descriptor value cannot be empty in per connection rate limit mode"); @@ -241,8 +246,7 @@ LocalRateLimiterImpl::LocalRateLimiterImpl( } if (wildcard_found) { DynamicDescriptorSharedPtr dynamic_descriptor = std::make_shared( - per_descriptor_token_bucket, (lru_size == 0 ? 20 : lru_size), dispatcher.timeSource(), - *this); + per_descriptor_token_bucket, lru_size, dispatcher.timeSource(), *this); dynamic_descriptors_.addDescriptor(std::move(new_descriptor), std::move(dynamic_descriptor)); continue; } @@ -285,17 +289,17 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( // In most cases the request descriptors has only few elements. We use a inlined vector to // avoid heap allocation. - absl::InlinedVector matched_descriptors; + absl::InlinedVector matched_descriptors; // Find all matched descriptors. for (const auto& request_descriptor : request_descriptors) { auto iter = descriptors_.find(request_descriptor); if (iter != descriptors_.end()) { - matched_descriptors.push_back(iter->second.get()); + matched_descriptors.push_back(iter->second); } else { auto token_bucket = dynamic_descriptors_.getBucket(request_descriptor); if (token_bucket != nullptr) { - matched_descriptors.push_back(token_bucket.get()); + matched_descriptors.push_back(token_bucket); } } } @@ -304,7 +308,7 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( // Sort the matched descriptors by token bucket fill rate to ensure the descriptor with the // smallest fill rate is consumed first. std::sort(matched_descriptors.begin(), matched_descriptors.end(), - [](const RateLimitTokenBucket* lhs, const RateLimitTokenBucket* rhs) { + [](const RateLimitTokenBucketSharedPtr lhs, const RateLimitTokenBucketSharedPtr rhs) { return lhs->fillRate() < rhs->fillRate(); }); } @@ -313,11 +317,11 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( share_provider_ != nullptr ? share_provider_->getTokensShareFactor() : 1.0; // See if the request is forbidden by any of the matched descriptors. - for (auto descriptor : matched_descriptors) { + for (const auto& descriptor : matched_descriptors) { if (!descriptor->consume(share_factor)) { // If the request is forbidden by a descriptor, return the result and the descriptor // token bucket. - return {false, makeOptRefFromPtr(descriptor)}; + return {false, std::shared_ptr(descriptor)}; } else { ENVOY_LOG(trace, "request allowed by descriptor with fill rate: {}, maxToken: {}, remainingToken {}", @@ -330,26 +334,25 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed( if (const bool result = default_token_bucket_->consume(share_factor); !result) { // If the request is forbidden by the default token bucket, return the result and the // default token bucket. - return {false, makeOptRefFromPtr(default_token_bucket_.get())}; + return {false, std::shared_ptr(default_token_bucket_)}; } // If the request is allowed then return the result the token bucket. The descriptor // token bucket will be selected as priority if it exists. - return {true, makeOptRefFromPtr(matched_descriptors.empty() - ? default_token_bucket_.get() - : matched_descriptors[0])}; + return {true, matched_descriptors.empty() ? default_token_bucket_ : matched_descriptors[0]}; }; ASSERT(!matched_descriptors.empty()); - return {true, makeOptRefFromPtr(matched_descriptors[0])}; + std::shared_ptr bucket_context = + std::shared_ptr(matched_descriptors[0]); + return {true, bucket_context}; } // Compare the request descriptor entries with the user descriptor entries. If all non-empty user -// descriptor values match the request descriptor values, return true and fill the new descriptor +// descriptor values match the request descriptor values, return true bool DynamicDescriptorMap::compareDescriptorEntries( const std::vector& request_entries, - const std::vector& user_entries, - std::vector& new_descriptor_entries) { + const std::vector& user_entries) { // Check for equality of sizes if (request_entries.size() != user_entries.size()) { return false; @@ -371,7 +374,6 @@ bool DynamicDescriptorMap::compareDescriptorEntries( if (user_entries[i].value_.empty()) { has_empty_value = true; } - new_descriptor_entries.push_back({request_entries[i].key_, request_entries[i].value_}); } return has_empty_value; } @@ -392,11 +394,9 @@ DynamicDescriptorMap::getBucket(RateLimit::LocalDescriptor request_descriptor) { if (user_descriptor.entries_.size() != request_descriptor.entries_.size()) { continue; } - RateLimit::LocalDescriptor new_descriptor; bool wildcard_found = false; - new_descriptor.entries_.reserve(user_descriptor.entries_.size()); - wildcard_found = compareDescriptorEntries(request_descriptor.entries_, user_descriptor.entries_, - new_descriptor.entries_); + wildcard_found = + compareDescriptorEntries(request_descriptor.entries_, user_descriptor.entries_); if (!wildcard_found) { continue; @@ -426,10 +426,12 @@ DynamicDescriptor::addOrGetDescriptor(const RateLimit::LocalDescriptor& request_ absl::WriterMutexLock lock(&dyn_desc_lock_); auto iter = dynamic_descriptors_.find(request_descriptor); if (iter != dynamic_descriptors_.end()) { - lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second); + if (iter->second.second != lru_list_.begin()) { + lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second); + } return iter->second.first; } - // add a new descriptor to the set along with its toekn bucket + // add a new descriptor to the set along with its token bucket RateLimitTokenBucketSharedPtr per_descriptor_token_bucket; if (no_timer_based_rate_limit_token_bucket_) { ENVOY_LOG(trace, "creating atomic token bucket for dynamic descriptor"); @@ -451,15 +453,18 @@ DynamicDescriptor::addOrGetDescriptor(const RateLimit::LocalDescriptor& request_ ENVOY_LOG(trace, "DynamicDescriptor::addorGetDescriptor: adding dynamic descriptor: {}", request_descriptor.toString()); - // add this bucket to cache. - // After updating cache, make a copy of the cache and update the tls with the new cache. + lru_list_.emplace_front(request_descriptor); auto result = dynamic_descriptors_.emplace( request_descriptor, std::pair(per_descriptor_token_bucket, lru_list_.begin())); - lru_list_.emplace_front(request_descriptor); if (lru_list_.size() >= lru_size_) { + ENVOY_LOG(trace, + "DynamicDescriptor::addorGetDescriptor: lru_size({}) overflow. Removing dynamic " + "descriptor: {}", + lru_size_, lru_list_.back().toString()); dynamic_descriptors_.erase(lru_list_.back()); lru_list_.pop_back(); } + ASSERT(lru_list_.size() == dynamic_descriptors_.size()); return result.first->second.first; } diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h index 8ea5b6f7fe4e..6c2d50ee40ed 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h @@ -65,8 +65,7 @@ class DynamicDescriptorMap : public Logger::Loggable& request_entries, - const std::vector& user_entries, - std::vector& new_descriptor_entries); + const std::vector& user_entries); RateLimit::LocalDescriptor::Map user_descriptors_; }; @@ -198,7 +197,7 @@ class LocalRateLimiterImpl : public Logger::Loggable token_bucket_context{}; + std::shared_ptr token_bucket_context{}; }; LocalRateLimiterImpl( diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc index c79d76ddf4b1..941b2c33c2de 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc @@ -107,11 +107,14 @@ FilterConfig::FilterConfig( share_provider = share_provider_manager_->getShareProvider(config.local_cluster_rate_limit()); } + uint32_t max_dynamic_descriptors = 20; + if (config.has_max_dynamic_descriptors()) { + max_dynamic_descriptors = config.max_dynamic_descriptors().value(); + } rate_limiter_ = std::make_unique( fill_interval_, max_tokens_, tokens_per_fill_, dispatcher_, descriptors_, - always_consume_default_token_bucket_, std::move(share_provider), - config.dynamic_descripters_lru_cache_limit(), + always_consume_default_token_bucket_, std::move(share_provider), max_dynamic_descriptors, config.local_rate_limit_per_downstream_connection()); } @@ -199,7 +202,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, Http::FilterHeadersStatus Filter::encodeHeaders(Http::ResponseHeaderMap& headers, bool) { // We can never assume the decodeHeaders() was called before encodeHeaders(). - if (used_config_->enableXRateLimitHeaders() && token_bucket_context_.has_value()) { + if (used_config_->enableXRateLimitHeaders() && token_bucket_context_) { headers.addReferenceKey( HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitLimit, token_bucket_context_->maxTokens()); diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.h b/source/extensions/filters/http/local_ratelimit/local_ratelimit.h index 038a7001add1..17639f4762fe 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.h +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.h @@ -203,7 +203,7 @@ class Filter : public Http::PassThroughFilter, Logger::Loggable token_bucket_context_; + std::shared_ptr token_bucket_context_; VhRateLimitOptions vh_rate_limits_; }; diff --git a/test/extensions/filters/http/local_ratelimit/BUILD b/test/extensions/filters/http/local_ratelimit/BUILD index e71f31db0857..123c974affd3 100644 --- a/test/extensions/filters/http/local_ratelimit/BUILD +++ b/test/extensions/filters/http/local_ratelimit/BUILD @@ -51,6 +51,7 @@ envoy_extension_cc_test( ], deps = [ "//source/extensions/filters/http/local_ratelimit:config", + "//test/test_common:test_runtime_lib", "//test/integration:http_protocol_integration_lib", ], ) diff --git a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc index 7ce0030dfc82..2ce070658365 100644 --- a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc +++ b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc @@ -1,6 +1,7 @@ #include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h" #include "test/integration/http_protocol_integration.h" +#include "test/test_common/test_runtime.h" #include "gtest/gtest.h" @@ -376,6 +377,9 @@ INSTANTIATE_TEST_SUITE_P( HttpProtocolIntegrationTest::protocolTestParamsToString); TEST_P(LocalRateLimitFilterIntegrationTest, DynamicDesciptorsBasicTest) { + TestScopedRuntime runtime; + runtime.mergeValues( + {{"envoy.reloadable_features.local_rate_limiting_with_dynamic_buckets", "true"}}); initializeFilter(fmt::format(filter_config_with_blank_value_descriptor_, "false")); // filter is adding dynamic descriptors based on the request header // 'x-envoy-downstream-service-cluster' and the token bucket is set to 1 token per fill interval