Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat : Add aggregation from kube service endpoints feature in metrics API scaler #6565

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Fix the check whether Fallback is enabled when using ScalingModifiers ([#6521](https://github.com/kedacore/keda/pull/6521))
- **General**: Fix waiting to reach `failureThreshold` before fallback ([#6520](https://github.com/kedacore/keda/pull/6520))
- **General**: Introduce new Temporal scaler ([#4724](https://github.com/kedacore/keda/issues/4724))
- **Metrics API scaler**: Introduce new `aggregateFromKubeServiceEndpoints` and `aggregationType` metadata fields to `metrics-api` so it is able to fetch metrics from all endpoints behind a kubernetes service and aggregate them ([#6565](https://github.com/kedacore/keda/pull/6565))

### New

Expand Down
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ rules:
resources:
- configmaps
- configmaps/status
- endpoints
- external
- pods
- secrets
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import (

// +kubebuilder:rbac:groups=keda.sh,resources=scaledobjects;scaledobjects/finalizers;scaledobjects/status,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;update;patch;create;delete
// +kubebuilder:rbac:groups="",resources=configmaps;configmaps/status,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=endpoints;configmaps;configmaps/status,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// +kubebuilder:rbac:groups="",resources=pods;services;services;secrets;external,verbs=get;list;watch
// +kubebuilder:rbac:groups="*",resources="*/scale",verbs=get;list;watch;update;patch
Expand Down
210 changes: 196 additions & 14 deletions pkg/scalers/metrics_api_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import (
neturl "net/url"
"strconv"
"strings"
"sync"

"github.com/go-logr/logr"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/prometheus/promql/parser"
"github.com/tidwall/gjson"
"gopkg.in/yaml.v3"
v2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kedacore/keda/v2/pkg/scalers/authentication"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand All @@ -30,15 +33,18 @@ type metricsAPIScaler struct {
metadata *metricsAPIScalerMetadata
httpClient *http.Client
logger logr.Logger
kubeClient client.Client
}

type metricsAPIScalerMetadata struct {
targetValue float64
activationTargetValue float64
url string
format APIFormat
valueLocation string
unsafeSsl bool
targetValue float64
activationTargetValue float64
url string
format APIFormat
aggregationType AggregationType
valueLocation string
unsafeSsl bool
aggregateFromKubeServiceEndpoints bool

// apiKeyAuth
enableAPIKeyAuth bool
Expand Down Expand Up @@ -71,6 +77,8 @@ const (
valueLocationWrongErrorMsg = "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
)

const secureHTTPScheme = "https"

type APIFormat string

// Options for APIFormat:
Expand All @@ -90,8 +98,27 @@ var (
}
)

type AggregationType string

// Options for APIFormat:
const (
AverageAggregationType AggregationType = "average"
SumAggregationType AggregationType = "sum"
MaxAggregationType AggregationType = "max"
MinAggregationType AggregationType = "min"
)

var (
supportedAggregationTypes = []AggregationType{
AverageAggregationType,
SumAggregationType,
MaxAggregationType,
MinAggregationType,
}
)

// NewMetricsAPIScaler creates a new HTTP scaler
func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig, kubeClient client.Client) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
Expand All @@ -116,6 +143,7 @@ func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
httpClient: httpClient,
kubeClient: kubeClient,
logger: InitializeLogger(config, "metrics_api_scaler"),
}, nil
}
Expand All @@ -133,6 +161,15 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca
meta.unsafeSsl = unsafeSsl
}

meta.aggregateFromKubeServiceEndpoints = false
if val, ok := config.TriggerMetadata["aggregateFromKubeServiceEndpoints"]; ok {
aggregateFromKubeServiceEndpoints, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing aggregateFromKubeServiceEndpoints: %w", err)
}
meta.aggregateFromKubeServiceEndpoints = aggregateFromKubeServiceEndpoints
}

if val, ok := config.TriggerMetadata["targetValue"]; ok {
targetValue, err := strconv.ParseFloat(val, 64)
if err != nil {
Expand Down Expand Up @@ -172,6 +209,16 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca
meta.format = JSONFormat
}

if val, ok := config.TriggerMetadata["aggregationType"]; ok {
meta.aggregationType = AggregationType(strings.TrimSpace(val))
if !kedautil.Contains(supportedAggregationTypes, meta.aggregationType) {
return nil, fmt.Errorf("aggregation type %s not supported", meta.aggregationType)
}
} else {
// default aggregation type is average
meta.aggregationType = AverageAggregationType
}

if val, ok := config.TriggerMetadata["valueLocation"]; ok {
meta.valueLocation = val
} else {
Expand Down Expand Up @@ -417,8 +464,140 @@ func getValueFromYAMLResponse(body []byte, valueLocation string) (float64, error
}
}

func (s *metricsAPIScaler) getEndpointsUrlsFromServiceURL(ctx context.Context, serviceURL string) (endpointUrls []string, err error) {
// parse service name from s.meta.url
url, err := neturl.Parse(serviceURL)
if err != nil {
s.logger.Error(err, "Failed parsing url for metrics API")
} else {
splittedHost := strings.Split(url.Host, ".")
if len(splittedHost) < 2 {
return nil, fmt.Errorf("invalid hostname %s : expected at least 2 elements, first being service name and second being the namespace", url.Host)
}
serviceName := splittedHost[0]
namespace := splittedHost[1]
podPort := url.Port()
// infer port from service scheme when not set explicitly
if podPort == "" {
if url.Scheme == secureHTTPScheme {
podPort = "443"
} else {
podPort = "80"
}
}
// get service serviceEndpoints
serviceEndpoints := &corev1.Endpoints{}

err := s.kubeClient.Get(ctx, client.ObjectKey{
Namespace: namespace,
Name: serviceName,
}, serviceEndpoints)
if err != nil {
return nil, err
}

for _, subset := range serviceEndpoints.Subsets {
foundPort := ""
for _, port := range subset.Ports {
if strconv.Itoa(int(port.Port)) == podPort {
foundPort = fmt.Sprintf(":%d", port.Port)
break
}
}
if foundPort == "" {
s.logger.Info(fmt.Sprintf("Warning : could not find port %s in endpoint slice for service %s.%s definition. Will infer port from %s scheme", podPort, serviceName, namespace, url.Scheme))
}
for _, address := range subset.Addresses {
if address.NodeName != nil {
endpointUrls = append(endpointUrls, fmt.Sprintf("%s://%s%s%s", url.Scheme, address.IP, foundPort, url.Path))
}
}
}
}
return endpointUrls, err
}

func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error) {
request, err := getMetricAPIServerRequest(ctx, s.metadata)
// if we wish to aggregate metric from a kubernetes service then we need to query each endpoint behind the service
if s.metadata.aggregateFromKubeServiceEndpoints {
endpointsUrls, err := s.getEndpointsUrlsFromServiceURL(ctx, s.metadata.url)
if err != nil {
return 0, fmt.Errorf("failed to get kubernetes endpoints urls from configured service URL")
}
if len(endpointsUrls) == 0 {
return 0, fmt.Errorf("no endpoints URLs were given for the service name")
}
return s.aggregateMetricsFromMultipleEndpoints(ctx, endpointsUrls)
}
// get single/unaggregated metric
metric, err := s.getMetricValueFromURL(ctx, nil)
if err == nil {
s.logger.V(1).Info(fmt.Sprintf("fetched single metric from metrics API url : %s. Value is %v\n", s.metadata.url, metric))
}
return metric, err
}

func (s *metricsAPIScaler) aggregateMetricsFromMultipleEndpoints(ctx context.Context, endpointsUrls []string) (float64, error) {
// call s.getMetricValueFromURL() for each endpointsUrls in parallel goroutines (maximum 5 at a time) and sum them up
const maxGoroutines = 5
var mu sync.Mutex
var wg sync.WaitGroup
sem := make(chan struct{}, maxGoroutines)
expectedNbMetrics := len(endpointsUrls)
nbErrors := 0
var err error
var firstMetricEncountered bool
var aggregation float64
for _, endpointURL := range endpointsUrls {
wg.Add(1)
sem <- struct{}{} // Acquire semaphore slot
go func(url string) {
defer wg.Done()
metric, err := s.getMetricValueFromURL(ctx, &endpointURL)

if err != nil {
s.logger.Info(fmt.Sprintf("Error fetching metric for %s: %v\n", url, err))
// we will ignore metric for computing aggregation when encountering error : decrease expectedNbMetrics
mu.Lock()
expectedNbMetrics--
nbErrors++
mu.Unlock()
} else {
mu.Lock()
switch s.metadata.aggregationType {
case MinAggregationType:
if !firstMetricEncountered || metric < aggregation {
firstMetricEncountered = true
aggregation = metric
}
case MaxAggregationType:
if !firstMetricEncountered || metric > aggregation {
firstMetricEncountered = true
aggregation = metric
}
default:
// sum metrics if we are not looking for min or max value
aggregation += metric
}
mu.Unlock()
}
<-sem // Release semaphore slot
}(endpointURL)
}

wg.Wait()
if nbErrors > 0 && nbErrors == len(endpointsUrls) {
err = fmt.Errorf("could not get any metric successfully from the %d provided endpoints", len(endpointsUrls))
}
if s.metadata.aggregationType == AverageAggregationType {
aggregation /= float64(expectedNbMetrics)
}
s.logger.V(1).Info(fmt.Sprintf("fetched %d metrics out of %d endpoints from kubernetes service : %s is %v\n", expectedNbMetrics, len(endpointsUrls), s.metadata.aggregationType, aggregation))
return aggregation, err
}

func (s *metricsAPIScaler) getMetricValueFromURL(ctx context.Context, url *string) (float64, error) {
request, err := getMetricAPIServerRequest(ctx, s.metadata, url)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -479,14 +658,17 @@ func (s *metricsAPIScaler) GetMetricsAndActivity(ctx context.Context, metricName
return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.activationTargetValue, nil
}

func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetadata) (*http.Request, error) {
func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetadata, url *string) (*http.Request, error) {
var req *http.Request
var err error

if url == nil {
url = &meta.url
}
switch {
case meta.enableAPIKeyAuth:
if meta.method == methodValueQuery {
url, _ := neturl.Parse(meta.url)
url, _ := neturl.Parse(*url)
queryString := url.Query()
if len(meta.keyParamName) == 0 {
queryString.Set("api_key", meta.apiKey)
Expand All @@ -501,7 +683,7 @@ func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetada
}
} else {
// default behaviour is to use header method
req, err = http.NewRequestWithContext(ctx, "GET", meta.url, nil)
req, err = http.NewRequestWithContext(ctx, "GET", *url, nil)
if err != nil {
return nil, err
}
Expand All @@ -513,20 +695,20 @@ func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetada
}
}
case meta.enableBaseAuth:
req, err = http.NewRequestWithContext(ctx, "GET", meta.url, nil)
req, err = http.NewRequestWithContext(ctx, "GET", *url, nil)
if err != nil {
return nil, err
}

req.SetBasicAuth(meta.username, meta.password)
case meta.enableBearerAuth:
req, err = http.NewRequestWithContext(ctx, "GET", meta.url, nil)
req, err = http.NewRequestWithContext(ctx, "GET", *url, nil)
if err != nil {
return nil, err
}
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", meta.bearerToken))
default:
req, err = http.NewRequestWithContext(ctx, "GET", meta.url, nil)
req, err = http.NewRequestWithContext(ctx, "GET", *url, nil)
if err != nil {
return nil, err
}
Expand Down
30 changes: 13 additions & 17 deletions pkg/scalers/metrics_api_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,13 @@ var metricsAPIMetricIdentifiers = []metricsAPIMetricIdentifier{

func TestMetricsAPIGetMetricSpecForScaling(t *testing.T) {
for _, testData := range metricsAPIMetricIdentifiers {
s, err := NewMetricsAPIScaler(
&scalersconfig.ScalerConfig{
ResolvedEnv: map[string]string{},
TriggerMetadata: testData.metadataTestData.metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 3000 * time.Millisecond,
TriggerIndex: testData.triggerIndex,
},
)
s, err := NewMetricsAPIScaler(&scalersconfig.ScalerConfig{
ResolvedEnv: map[string]string{},
TriggerMetadata: testData.metadataTestData.metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 3000 * time.Millisecond,
TriggerIndex: testData.triggerIndex,
}, nil)
if err != nil {
t.Errorf("Error creating the Scaler")
}
Expand Down Expand Up @@ -222,14 +220,12 @@ func TestBearerAuth(t *testing.T) {
"authMode": "bearer",
}

s, err := NewMetricsAPIScaler(
&scalersconfig.ScalerConfig{
ResolvedEnv: map[string]string{},
TriggerMetadata: metadata,
AuthParams: authentication,
GlobalHTTPTimeout: 3000 * time.Millisecond,
},
)
s, err := NewMetricsAPIScaler(&scalersconfig.ScalerConfig{
ResolvedEnv: map[string]string{},
TriggerMetadata: metadata,
AuthParams: authentication,
GlobalHTTPTimeout: 3000 * time.Millisecond,
}, nil)
if err != nil {
t.Errorf("Error creating the Scaler")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scaling/scalers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
case "memory":
return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config)
case "metrics-api":
return scalers.NewMetricsAPIScaler(config)
return scalers.NewMetricsAPIScaler(config, client)
case "mongodb":
return scalers.NewMongoDBScaler(ctx, config)
case "mssql":
Expand Down
Loading
Loading