-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Splunk observability scaler #6534
Open
sschimper-splunk
wants to merge
4
commits into
kedacore:main
Choose a base branch
from
sschimper-splunk:splunk-observability-scaler
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
e09a85a
adding splunk observability scaler
sschimper-splunk 68e3f18
added updated changelog for splunk observability scaler
sschimper-splunk cbb84c3
working in PR feedback from JorTurFer, alinging lines, changing test …
sschimper-splunk 8114ced
working in feedback from failed 'Static Checks' test
sschimper-splunk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
package scalers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
"time" | ||
|
||
"github.com/go-logr/logr" | ||
"github.com/signalfx/signalflow-client-go/v2/signalflow" | ||
v2 "k8s.io/api/autoscaling/v2" | ||
"k8s.io/metrics/pkg/apis/external_metrics" | ||
|
||
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" | ||
kedautil "github.com/kedacore/keda/v2/pkg/util" | ||
) | ||
|
||
type splunkObservabilityMetadata struct { | ||
TriggerIndex int | ||
|
||
AccessToken string `keda:"name=accessToken, order=authParams"` | ||
Realm string `keda:"name=realm, order=authParams"` | ||
Query string `keda:"name=query, order=triggerMetadata"` | ||
Duration int `keda:"name=duration, order=triggerMetadata"` | ||
TargetValue float64 `keda:"name=targetValue, order=triggerMetadata"` | ||
QueryAggregator string `keda:"name=queryAggregator, order=triggerMetadata"` | ||
ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata"` | ||
} | ||
|
||
type splunkObservabilityScaler struct { | ||
metadata *splunkObservabilityMetadata | ||
apiClient *signalflow.Client | ||
logger logr.Logger | ||
} | ||
|
||
func parseSplunkObservabilityMetadata(config *scalersconfig.ScalerConfig) (*splunkObservabilityMetadata, error) { | ||
meta := &splunkObservabilityMetadata{} | ||
meta.TriggerIndex = config.TriggerIndex | ||
|
||
if err := config.TypedConfig(meta); err != nil { | ||
return nil, fmt.Errorf("error parsing splunk observability metadata: %w", err) | ||
} | ||
|
||
return meta, nil | ||
} | ||
|
||
func newSplunkO11yConnection(meta *splunkObservabilityMetadata, logger logr.Logger) (*signalflow.Client, error) { | ||
if meta.Realm == "" || meta.AccessToken == "" { | ||
return nil, fmt.Errorf("error: Could not find splunk access token or realm") | ||
} | ||
|
||
apiClient, err := signalflow.NewClient( | ||
signalflow.StreamURLForRealm(meta.Realm), | ||
signalflow.AccessToken(meta.AccessToken), | ||
signalflow.OnError(func(err error) { | ||
logger.Error(err, "error in SignalFlow client") | ||
})) | ||
if err != nil { | ||
return nil, fmt.Errorf("error creating SignalFlow client: %w", err) | ||
} | ||
|
||
return apiClient, nil | ||
} | ||
|
||
func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { | ||
logger := InitializeLogger(config, "splunk_observability_scaler") | ||
|
||
meta, err := parseSplunkObservabilityMetadata(config) | ||
if err != nil { | ||
return nil, fmt.Errorf("error parsing Splunk metadata: %w", err) | ||
} | ||
|
||
apiClient, err := newSplunkO11yConnection(meta, logger) | ||
if err != nil { | ||
return nil, fmt.Errorf("error establishing Splunk Observability Cloud connection: %w", err) | ||
} | ||
|
||
return &splunkObservabilityScaler{ | ||
metadata: meta, | ||
apiClient: apiClient, | ||
logger: logger, | ||
}, nil | ||
} | ||
|
||
func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64, error) { | ||
comp, err := s.apiClient.Execute(ctx, &signalflow.ExecuteRequest{ | ||
Program: s.metadata.Query, | ||
}) | ||
if err != nil { | ||
return -1, fmt.Errorf("could not execute signalflow query: %w", err) | ||
} | ||
|
||
s.logger.V(1).Info("Started MTS stream.") | ||
|
||
time.Sleep(time.Duration(s.metadata.Duration * int(time.Second))) | ||
if err := comp.Stop(ctx); err != nil { | ||
return -1, fmt.Errorf("error creating SignalFlow client: %w", err) | ||
} | ||
|
||
s.logger.V(1).Info("Closed MTS stream.") | ||
|
||
maxValue := math.Inf(-1) | ||
minValue := math.Inf(1) | ||
valueSum := 0.0 | ||
valueCount := 0 | ||
s.logger.V(1).Info("Now iterating over results.") | ||
for msg := range comp.Data() { | ||
if len(msg.Payloads) == 0 { | ||
s.logger.V(1).Info("No data retrieved.") | ||
continue | ||
} | ||
for _, pl := range msg.Payloads { | ||
value, ok := pl.Value().(float64) | ||
if !ok { | ||
return -1, fmt.Errorf("could not convert Splunk Observability metric value to float64") | ||
} | ||
s.logger.V(1).Info(fmt.Sprintf("Encountering value %.4f\n", value)) | ||
maxValue = math.Max(maxValue, value) | ||
minValue = math.Min(minValue, value) | ||
valueSum += value | ||
valueCount++ | ||
} | ||
} | ||
|
||
if valueCount > 1 && s.metadata.QueryAggregator == "" { | ||
return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series or add a queryAggregator") | ||
} | ||
|
||
switch s.metadata.QueryAggregator { | ||
case "max": | ||
s.logger.V(1).Info(fmt.Sprintf("Returning max value: %.4f\n", maxValue)) | ||
return maxValue, nil | ||
case "min": | ||
s.logger.V(1).Info(fmt.Sprintf("Returning min value: %.4f\n", minValue)) | ||
return minValue, nil | ||
case "avg": | ||
avg := valueSum / float64(valueCount) | ||
s.logger.V(1).Info(fmt.Sprintf("Returning avg value: %.4f\n", avg)) | ||
return avg, nil | ||
default: | ||
return maxValue, nil | ||
} | ||
} | ||
|
||
func (s *splunkObservabilityScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { | ||
num, err := s.getQueryResult(ctx) | ||
|
||
if err != nil { | ||
s.logger.Error(err, "error getting metrics from Splunk Observability Cloud.") | ||
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error getting metrics from Splunk Observability Cloud: %w", err) | ||
} | ||
metric := GenerateMetricInMili(metricName, num) | ||
|
||
return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.ActivationTargetValue, nil | ||
} | ||
|
||
func (s *splunkObservabilityScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { | ||
metricName := kedautil.NormalizeString("signalfx") | ||
externalMetric := &v2.ExternalMetricSource{ | ||
Metric: v2.MetricIdentifier{ | ||
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName), | ||
}, | ||
Target: GetMetricTargetMili(v2.ValueMetricType, s.metadata.TargetValue), | ||
} | ||
metricSpec := v2.MetricSpec{ | ||
External: externalMetric, Type: externalMetricType, | ||
} | ||
return []v2.MetricSpec{metricSpec} | ||
} | ||
|
||
func (s *splunkObservabilityScaler) Close(context.Context) error { | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package scalers | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" | ||
) | ||
|
||
type parseSplunkObservabilityMetadataTestData struct { | ||
metadata map[string]string | ||
authParams map[string]string | ||
isError bool | ||
} | ||
|
||
type SplunkObservabilityMetricIdentifier struct { | ||
metadataTestData *parseSplunkObservabilityMetadataTestData | ||
triggerIndex int | ||
metricName string | ||
} | ||
|
||
var validSplunkObservabilityAuthParams = map[string]string{ | ||
"accessToken": "my-suyper-secret-access-token", | ||
"realm": "my-realm", | ||
} | ||
|
||
var validSplunkObservabilityMetadata = map[string]string{ | ||
"query": "data('demo.trans.latency').max().publish()", | ||
"duration": "10", | ||
"targetValue": "200.0", | ||
"queryAggregator": "avg", | ||
"ActivationTargetValue": "1.1", | ||
} | ||
|
||
var testSplunkObservabilityMetadata = []parseSplunkObservabilityMetadataTestData{ | ||
// Valid metadata and valid auth params, pass. | ||
{validSplunkObservabilityMetadata, validSplunkObservabilityAuthParams, false}, | ||
// no params at all, fail | ||
{map[string]string{}, map[string]string{}, true}, | ||
// No meta dada but valid auth, fail. | ||
{map[string]string{}, validSplunkObservabilityAuthParams, true}, | ||
// Valid meta dada but no auth params, fail. | ||
{validSplunkObservabilityMetadata, map[string]string{}, true}, | ||
// Missing 'query' field, fail | ||
{map[string]string{"duration": "10", "targetValue": "200.0", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true}, | ||
// Missing 'duration' field, fail | ||
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "targetValue": "200.0", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true}, | ||
// Missing 'targetValue' field, fail | ||
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true}, | ||
// Missing 'queryAggregator' field, fail | ||
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true}, | ||
// Missing 'activationTargetValue' field, fail | ||
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "queryAggregator": "avg"}, validSplunkObservabilityAuthParams, true}, | ||
} | ||
|
||
var SplunkObservabilityMetricIdentifiers = []SplunkObservabilityMetricIdentifier{ | ||
{&testSplunkObservabilityMetadata[0], 0, "s0-signalfx"}, | ||
{&testSplunkObservabilityMetadata[0], 1, "s1-signalfx"}, | ||
} | ||
|
||
func TestSplunkObservabilityParseMetadata(t *testing.T) { | ||
for _, testData := range testSplunkObservabilityMetadata { | ||
_, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) | ||
if err != nil && !testData.isError { | ||
t.Error("Expected success but got error", err) | ||
} else if testData.isError && err == nil { | ||
t.Error("Expected error but got success") | ||
} | ||
} | ||
} | ||
|
||
func TestSplunkObservabilityGetMetricSpecForScaling(t *testing.T) { | ||
for _, testData := range SplunkObservabilityMetricIdentifiers { | ||
ctx := context.Background() | ||
meta, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validSplunkObservabilityAuthParams, TriggerIndex: testData.triggerIndex}) | ||
if err != nil { | ||
t.Fatal("Could not parse Splunk Observability metadata:", err) | ||
} | ||
mockSplunkObservabilityScaler := splunkObservabilityScaler{ | ||
metadata: meta, | ||
} | ||
|
||
metricSpec := mockSplunkObservabilityScaler.GetMetricSpecForScaling(ctx) | ||
metricName := metricSpec[0].External.Metric.Name | ||
if metricName != testData.metricName { | ||
t.Error("Wrong External metric source name:", metricName) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this sleep required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SignalFlow Go Client library that I am using to integrate with Splunk O11y does not let you query a precise value, instead it provides a stream value series for a specified timeframe. I use this sleep to say "let the stream open for <s.metadata.Duration> seconds and then close it.
There might be some more elegant ways, but I am still fairly new to Golang and would very much appreciate feedback. An example that uses Goroutines can be found here: https://github.com/signalfx/signalflow-client-go/blob/main/signalflow/example_test.go
But I found this a bit over-kill
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, please, we should avoid using Sleep