Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Vikas Choudhary (vikasc) <choudharyvikas16@gmail.com>
  • Loading branch information
vikaschoudhary16 committed Jan 14, 2025
1 parent bb34179 commit b8fa581
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ message LocalRateLimit {
// Specifies the max dynamic descriptors kept in the cache for a particular wildcarded descriptor
// configured in the global :ref:`descriptors<envoy_v3_api_field_extensions.filters.http.local_ratelimit.v3.LocalRateLimit.descriptors>`.
// Wildcarded descriptor means descriptor has one or more entries `value` omitted. For example if user has configured two descriptors
// with blank value entries, then max dynamic descriptors stored in the LRU cache will be 2 * dynamic_descripters_lru_cache_limit.
// with blank value entries, then max dynamic descriptors stored in the LRU cache will be 2 * max_dynamic_descriptors.
// Actual number of dynamic descriptors will depend on the cardinality of unique values received from the http request for the omitted
// values.
// Default is 20.
uint32 dynamic_descripters_lru_cache_limit = 18;
google.protobuf.UInt32Value max_dynamic_descriptors = 18;
}
4 changes: 4 additions & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ FALSE_RUNTIME_GUARD(envoy_reloadable_features_logging_with_fast_json_formatter);
// before downstream.
FALSE_RUNTIME_GUARD(envoy_reloadable_features_allow_multiplexed_upstream_half_close);

// A flag to enable the usage of dynamic buckets for local rate limiting. For example, dynamically
// created token buckets for each unique value of a request header.
FALSE_RUNTIME_GUARD(envoy_reloadable_features_local_rate_limiting_with_dynamic_buckets);

// Block of non-boolean flags. Use of int flags is deprecated. Do not add more.
ABSL_FLAG(uint64_t, re2_max_program_size_error_level, 100, ""); // NOLINT
ABSL_FLAG(uint64_t, re2_max_program_size_warn_level, // NOLINT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
new_descriptor.entries_.reserve(descriptor.entries_size());
for (const auto& entry : descriptor.entries()) {
if (entry.value().empty()) {
if (!(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.local_rate_limiting_with_dynamic_buckets"))) {
throw EnvoyException("local_rate_limiting_with_dynamic_buckets is disabled. Local rate "
"descriptor value cannot be empty");
}
if (per_connection) {
throw EnvoyException(
"local rate descriptor value cannot be empty in per connection rate limit mode");
Expand Down Expand Up @@ -241,8 +246,7 @@ LocalRateLimiterImpl::LocalRateLimiterImpl(
}
if (wildcard_found) {
DynamicDescriptorSharedPtr dynamic_descriptor = std::make_shared<DynamicDescriptor>(
per_descriptor_token_bucket, (lru_size == 0 ? 20 : lru_size), dispatcher.timeSource(),
*this);
per_descriptor_token_bucket, lru_size, dispatcher.timeSource(), *this);
dynamic_descriptors_.addDescriptor(std::move(new_descriptor), std::move(dynamic_descriptor));
continue;
}
Expand Down Expand Up @@ -285,17 +289,17 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed(

// In most cases the request descriptors has only few elements. We use a inlined vector to
// avoid heap allocation.
absl::InlinedVector<RateLimitTokenBucket*, 8> matched_descriptors;
absl::InlinedVector<RateLimitTokenBucketSharedPtr, 8> matched_descriptors;

// Find all matched descriptors.
for (const auto& request_descriptor : request_descriptors) {
auto iter = descriptors_.find(request_descriptor);
if (iter != descriptors_.end()) {
matched_descriptors.push_back(iter->second.get());
matched_descriptors.push_back(iter->second);
} else {
auto token_bucket = dynamic_descriptors_.getBucket(request_descriptor);
if (token_bucket != nullptr) {
matched_descriptors.push_back(token_bucket.get());
matched_descriptors.push_back(token_bucket);
}
}
}
Expand All @@ -304,7 +308,7 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed(
// Sort the matched descriptors by token bucket fill rate to ensure the descriptor with the
// smallest fill rate is consumed first.
std::sort(matched_descriptors.begin(), matched_descriptors.end(),
[](const RateLimitTokenBucket* lhs, const RateLimitTokenBucket* rhs) {
[](const RateLimitTokenBucketSharedPtr lhs, const RateLimitTokenBucketSharedPtr rhs) {
return lhs->fillRate() < rhs->fillRate();
});
}
Expand All @@ -313,11 +317,11 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed(
share_provider_ != nullptr ? share_provider_->getTokensShareFactor() : 1.0;

// See if the request is forbidden by any of the matched descriptors.
for (auto descriptor : matched_descriptors) {
for (const auto& descriptor : matched_descriptors) {
if (!descriptor->consume(share_factor)) {
// If the request is forbidden by a descriptor, return the result and the descriptor
// token bucket.
return {false, makeOptRefFromPtr<TokenBucketContext>(descriptor)};
return {false, std::shared_ptr<TokenBucketContext>(descriptor)};
} else {
ENVOY_LOG(trace,
"request allowed by descriptor with fill rate: {}, maxToken: {}, remainingToken {}",
Expand All @@ -330,26 +334,25 @@ LocalRateLimiterImpl::Result LocalRateLimiterImpl::requestAllowed(
if (const bool result = default_token_bucket_->consume(share_factor); !result) {
// If the request is forbidden by the default token bucket, return the result and the
// default token bucket.
return {false, makeOptRefFromPtr<TokenBucketContext>(default_token_bucket_.get())};
return {false, std::shared_ptr<TokenBucketContext>(default_token_bucket_)};
}

// If the request is allowed then return the result the token bucket. The descriptor
// token bucket will be selected as priority if it exists.
return {true, makeOptRefFromPtr<TokenBucketContext>(matched_descriptors.empty()
? default_token_bucket_.get()
: matched_descriptors[0])};
return {true, matched_descriptors.empty() ? default_token_bucket_ : matched_descriptors[0]};
};

ASSERT(!matched_descriptors.empty());
return {true, makeOptRefFromPtr<TokenBucketContext>(matched_descriptors[0])};
std::shared_ptr<TokenBucketContext> bucket_context =
std::shared_ptr<TokenBucketContext>(matched_descriptors[0]);
return {true, bucket_context};
}

// Compare the request descriptor entries with the user descriptor entries. If all non-empty user
// descriptor values match the request descriptor values, return true and fill the new descriptor
// descriptor values match the request descriptor values, return true
bool DynamicDescriptorMap::compareDescriptorEntries(
const std::vector<RateLimit::DescriptorEntry>& request_entries,
const std::vector<RateLimit::DescriptorEntry>& user_entries,
std::vector<RateLimit::DescriptorEntry>& new_descriptor_entries) {
const std::vector<RateLimit::DescriptorEntry>& user_entries) {
// Check for equality of sizes
if (request_entries.size() != user_entries.size()) {
return false;
Expand All @@ -371,7 +374,6 @@ bool DynamicDescriptorMap::compareDescriptorEntries(
if (user_entries[i].value_.empty()) {
has_empty_value = true;
}
new_descriptor_entries.push_back({request_entries[i].key_, request_entries[i].value_});
}
return has_empty_value;
}
Expand All @@ -392,11 +394,9 @@ DynamicDescriptorMap::getBucket(RateLimit::LocalDescriptor request_descriptor) {
if (user_descriptor.entries_.size() != request_descriptor.entries_.size()) {
continue;
}
RateLimit::LocalDescriptor new_descriptor;
bool wildcard_found = false;
new_descriptor.entries_.reserve(user_descriptor.entries_.size());
wildcard_found = compareDescriptorEntries(request_descriptor.entries_, user_descriptor.entries_,
new_descriptor.entries_);
wildcard_found =
compareDescriptorEntries(request_descriptor.entries_, user_descriptor.entries_);

if (!wildcard_found) {
continue;
Expand Down Expand Up @@ -426,10 +426,12 @@ DynamicDescriptor::addOrGetDescriptor(const RateLimit::LocalDescriptor& request_
absl::WriterMutexLock lock(&dyn_desc_lock_);
auto iter = dynamic_descriptors_.find(request_descriptor);
if (iter != dynamic_descriptors_.end()) {
lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second);
if (iter->second.second != lru_list_.begin()) {
lru_list_.splice(lru_list_.begin(), lru_list_, iter->second.second);
}
return iter->second.first;
}
// add a new descriptor to the set along with its toekn bucket
// add a new descriptor to the set along with its token bucket
RateLimitTokenBucketSharedPtr per_descriptor_token_bucket;
if (no_timer_based_rate_limit_token_bucket_) {
ENVOY_LOG(trace, "creating atomic token bucket for dynamic descriptor");
Expand All @@ -451,15 +453,18 @@ DynamicDescriptor::addOrGetDescriptor(const RateLimit::LocalDescriptor& request_

ENVOY_LOG(trace, "DynamicDescriptor::addorGetDescriptor: adding dynamic descriptor: {}",
request_descriptor.toString());
// add this bucket to cache.
// After updating cache, make a copy of the cache and update the tls with the new cache.
lru_list_.emplace_front(request_descriptor);
auto result = dynamic_descriptors_.emplace(
request_descriptor, std::pair(per_descriptor_token_bucket, lru_list_.begin()));
lru_list_.emplace_front(request_descriptor);
if (lru_list_.size() >= lru_size_) {
ENVOY_LOG(trace,
"DynamicDescriptor::addorGetDescriptor: lru_size({}) overflow. Removing dynamic "
"descriptor: {}",
lru_size_, lru_list_.back().toString());
dynamic_descriptors_.erase(lru_list_.back());
lru_list_.pop_back();
}
ASSERT(lru_list_.size() == dynamic_descriptors_.size());
return result.first->second.first;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ class DynamicDescriptorMap : public Logger::Loggable<Logger::Id::rate_limit_quot

private:
bool compareDescriptorEntries(const std::vector<RateLimit::DescriptorEntry>& request_entries,
const std::vector<RateLimit::DescriptorEntry>& user_entries,
std::vector<RateLimit::DescriptorEntry>& new_descriptor_entries);
const std::vector<RateLimit::DescriptorEntry>& user_entries);
RateLimit::LocalDescriptor::Map<DynamicDescriptorSharedPtr> user_descriptors_;
};

Expand Down Expand Up @@ -198,7 +197,7 @@ class LocalRateLimiterImpl : public Logger::Loggable<Logger::Id::rate_limit_quot
public:
struct Result {
bool allowed{};
OptRef<const TokenBucketContext> token_bucket_context{};
std::shared_ptr<const TokenBucketContext> token_bucket_context{};
};

LocalRateLimiterImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,14 @@ FilterConfig::FilterConfig(

share_provider = share_provider_manager_->getShareProvider(config.local_cluster_rate_limit());
}
uint32_t max_dynamic_descriptors = 20;
if (config.has_max_dynamic_descriptors()) {
max_dynamic_descriptors = config.max_dynamic_descriptors().value();
}

rate_limiter_ = std::make_unique<Filters::Common::LocalRateLimit::LocalRateLimiterImpl>(
fill_interval_, max_tokens_, tokens_per_fill_, dispatcher_, descriptors_,
always_consume_default_token_bucket_, std::move(share_provider),
config.dynamic_descripters_lru_cache_limit(),
always_consume_default_token_bucket_, std::move(share_provider), max_dynamic_descriptors,
config.local_rate_limit_per_downstream_connection());
}

Expand Down Expand Up @@ -199,7 +202,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,

Http::FilterHeadersStatus Filter::encodeHeaders(Http::ResponseHeaderMap& headers, bool) {
// We can never assume the decodeHeaders() was called before encodeHeaders().
if (used_config_->enableXRateLimitHeaders() && token_bucket_context_.has_value()) {
if (used_config_->enableXRateLimitHeaders() && token_bucket_context_) {
headers.addReferenceKey(
HttpFilters::Common::RateLimit::XRateLimitHeaders::get().XRateLimitLimit,
token_bucket_context_->maxTokens());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class Filter : public Http::PassThroughFilter, Logger::Loggable<Logger::Id::filt
// Actual config used for the current request. Is config_ by default, but can be overridden by
// per-route config.
const FilterConfig* used_config_{};
OptRef<const Filters::Common::LocalRateLimit::TokenBucketContext> token_bucket_context_;
std::shared_ptr<const Filters::Common::LocalRateLimit::TokenBucketContext> token_bucket_context_;

VhRateLimitOptions vh_rate_limits_;
};
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/http/local_ratelimit/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ envoy_extension_cc_test(
],
deps = [
"//source/extensions/filters/http/local_ratelimit:config",
"//test/test_common:test_runtime_lib",
"//test/integration:http_protocol_integration_lib",
],
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h"

#include "test/integration/http_protocol_integration.h"
#include "test/test_common/test_runtime.h"

#include "gtest/gtest.h"

Expand Down Expand Up @@ -376,6 +377,9 @@ INSTANTIATE_TEST_SUITE_P(
HttpProtocolIntegrationTest::protocolTestParamsToString);

TEST_P(LocalRateLimitFilterIntegrationTest, DynamicDesciptorsBasicTest) {
TestScopedRuntime runtime;
runtime.mergeValues(
{{"envoy.reloadable_features.local_rate_limiting_with_dynamic_buckets", "true"}});
initializeFilter(fmt::format(filter_config_with_blank_value_descriptor_, "false"));
// filter is adding dynamic descriptors based on the request header
// 'x-envoy-downstream-service-cluster' and the token bucket is set to 1 token per fill interval
Expand Down

0 comments on commit b8fa581

Please sign in to comment.