Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splunk observability scaler #6534

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Enable OpenSSF Scorecard to enhance security practices across the project ([#5913](https://github.com/kedacore/keda/issues/5913))
- **General**: Introduce new NSQ scaler ([#3281](https://github.com/kedacore/keda/issues/3281))
- **General**: Operator flag to control patching of webhook resources certificates ([#6184](https://github.com/kedacore/keda/issues/6184))
- **General**: Add Splunk Observability Scaler ([#6192](https://github.com/kedacore/keda/pull/6192))

#### Experimental

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/segmentio/kafka-go v0.4.47
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0
github.com/signalfx/signalflow-client-go/v2 v2.3.0
github.com/spf13/cast v1.7.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -130,6 +131,7 @@ require (
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/google/go-github/v66 v66.0.0 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/signalfx/signalfx-go v1.34.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.31.0 // indirect
go.temporal.io/api v1.43.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2322,6 +2322,10 @@ github.com/shurcooL/go v0.0.0-20200502201357-93f07166e636/go.mod h1:TDJrrUr11Vxr
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/signalfx/signalflow-client-go/v2 v2.3.0 h1:CMhvEfDDWbdPCfMNiQTAymRIRzVbgveGbTq5wr8OHuM=
github.com/signalfx/signalflow-client-go/v2 v2.3.0/go.mod h1:ir6CHksVkhh1vlslldjf6k5qD88QQxWW8WMG5PxSQco=
github.com/signalfx/signalfx-go v1.34.0 h1:OQ6tyMY4efWB57EPIQqrpWrAfcSdyfa+bLtmAe7GLfE=
github.com/signalfx/signalfx-go v1.34.0/go.mod h1:IpGZLPvCKNFyspAXoS480jB02mocTpo0KYd8jbl6/T8=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand Down
173 changes: 173 additions & 0 deletions pkg/scalers/splunk_observability_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package scalers

import (
"context"
"fmt"
"math"
"time"

"github.com/go-logr/logr"
"github.com/signalfx/signalflow-client-go/v2/signalflow"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type splunkObservabilityMetadata struct {
TriggerIndex int

AccessToken string `keda:"name=accessToken, order=authParams"`
Realm string `keda:"name=realm, order=authParams"`
Query string `keda:"name=query, order=triggerMetadata"`
Duration int `keda:"name=duration, order=triggerMetadata"`
TargetValue float64 `keda:"name=targetValue, order=triggerMetadata"`
QueryAggregator string `keda:"name=queryAggregator, order=triggerMetadata"`
ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata"`
}

type splunkObservabilityScaler struct {
metadata *splunkObservabilityMetadata
apiClient *signalflow.Client
logger logr.Logger
}

func parseSplunkObservabilityMetadata(config *scalersconfig.ScalerConfig) (*splunkObservabilityMetadata, error) {
meta := &splunkObservabilityMetadata{}
meta.TriggerIndex = config.TriggerIndex

if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing splunk observability metadata: %w", err)
}

return meta, nil
}

func newSplunkO11yConnection(meta *splunkObservabilityMetadata, logger logr.Logger) (*signalflow.Client, error) {
if meta.Realm == "" || meta.AccessToken == "" {
return nil, fmt.Errorf("error: Could not find splunk access token or realm")
}

apiClient, err := signalflow.NewClient(
signalflow.StreamURLForRealm(meta.Realm),
signalflow.AccessToken(meta.AccessToken),
signalflow.OnError(func(err error) {
logger.Error(err, "error in SignalFlow client")
}))
if err != nil {
return nil, fmt.Errorf("error creating SignalFlow client: %w", err)
}

return apiClient, nil
}

func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
logger := InitializeLogger(config, "splunk_observability_scaler")

meta, err := parseSplunkObservabilityMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing Splunk metadata: %w", err)
}

apiClient, err := newSplunkO11yConnection(meta, logger)
if err != nil {
return nil, fmt.Errorf("error establishing Splunk Observability Cloud connection: %w", err)
}

return &splunkObservabilityScaler{
metadata: meta,
apiClient: apiClient,
logger: logger,
}, nil
}

func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64, error) {
comp, err := s.apiClient.Execute(ctx, &signalflow.ExecuteRequest{
Program: s.metadata.Query,
})
if err != nil {
return -1, fmt.Errorf("could not execute signalflow query: %w", err)
}

s.logger.V(1).Info("Started MTS stream.")

time.Sleep(time.Duration(s.metadata.Duration * int(time.Second)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sleep required?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SignalFlow Go Client library that I am using to integrate with Splunk O11y does not let you query a precise value, instead it provides a stream value series for a specified timeframe. I use this sleep to say "let the stream open for <s.metadata.Duration> seconds and then close it.
There might be some more elegant ways, but I am still fairly new to Golang and would very much appreciate feedback. An example that uses Goroutines can be found here: https://github.com/signalfx/signalflow-client-go/blob/main/signalflow/example_test.go
But I found this a bit over-kill

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, please, we should avoid using Sleep

if err := comp.Stop(ctx); err != nil {
return -1, fmt.Errorf("error creating SignalFlow client: %w", err)
}

s.logger.V(1).Info("Closed MTS stream.")

maxValue := math.Inf(-1)
minValue := math.Inf(1)
valueSum := 0.0
valueCount := 0
s.logger.V(1).Info("Now iterating over results.")
for msg := range comp.Data() {
if len(msg.Payloads) == 0 {
s.logger.V(1).Info("No data retrieved.")
continue
}
for _, pl := range msg.Payloads {
value, ok := pl.Value().(float64)
if !ok {
return -1, fmt.Errorf("could not convert Splunk Observability metric value to float64")
}
s.logger.V(1).Info(fmt.Sprintf("Encountering value %.4f\n", value))
maxValue = math.Max(maxValue, value)
minValue = math.Min(minValue, value)
valueSum += value
valueCount++
}
}

if valueCount > 1 && s.metadata.QueryAggregator == "" {
return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series or add a queryAggregator")
}

switch s.metadata.QueryAggregator {
case "max":
s.logger.V(1).Info(fmt.Sprintf("Returning max value: %.4f\n", maxValue))
return maxValue, nil
case "min":
s.logger.V(1).Info(fmt.Sprintf("Returning min value: %.4f\n", minValue))
return minValue, nil
case "avg":
avg := valueSum / float64(valueCount)
s.logger.V(1).Info(fmt.Sprintf("Returning avg value: %.4f\n", avg))
return avg, nil
default:
return maxValue, nil
}
}

func (s *splunkObservabilityScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
num, err := s.getQueryResult(ctx)

if err != nil {
s.logger.Error(err, "error getting metrics from Splunk Observability Cloud.")
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error getting metrics from Splunk Observability Cloud: %w", err)
}
metric := GenerateMetricInMili(metricName, num)

return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.ActivationTargetValue, nil
}

func (s *splunkObservabilityScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString("signalfx")
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName),
},
Target: GetMetricTargetMili(v2.ValueMetricType, s.metadata.TargetValue),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2.MetricSpec{metricSpec}
}

func (s *splunkObservabilityScaler) Close(context.Context) error {
return nil
}
89 changes: 89 additions & 0 deletions pkg/scalers/splunk_observability_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package scalers

import (
"context"
"testing"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

type parseSplunkObservabilityMetadataTestData struct {
metadata map[string]string
authParams map[string]string
isError bool
}

type SplunkObservabilityMetricIdentifier struct {
metadataTestData *parseSplunkObservabilityMetadataTestData
triggerIndex int
metricName string
}

var validSplunkObservabilityAuthParams = map[string]string{
"accessToken": "my-suyper-secret-access-token",
"realm": "my-realm",
}

var validSplunkObservabilityMetadata = map[string]string{
"query": "data('demo.trans.latency').max().publish()",
"duration": "10",
"targetValue": "200.0",
"queryAggregator": "avg",
"ActivationTargetValue": "1.1",
}

var testSplunkObservabilityMetadata = []parseSplunkObservabilityMetadataTestData{
// Valid metadata and valid auth params, pass.
{validSplunkObservabilityMetadata, validSplunkObservabilityAuthParams, false},
// no params at all, fail
{map[string]string{}, map[string]string{}, true},
// No meta dada but valid auth, fail.
{map[string]string{}, validSplunkObservabilityAuthParams, true},
// Valid meta dada but no auth params, fail.
{validSplunkObservabilityMetadata, map[string]string{}, true},
// Missing 'query' field, fail
{map[string]string{"duration": "10", "targetValue": "200.0", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'duration' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "targetValue": "200.0", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'targetValue' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'queryAggregator' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true},
// Missing 'activationTargetValue' field, fail
{map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "queryAggregator": "avg"}, validSplunkObservabilityAuthParams, true},
}

var SplunkObservabilityMetricIdentifiers = []SplunkObservabilityMetricIdentifier{
{&testSplunkObservabilityMetadata[0], 0, "s0-signalfx"},
{&testSplunkObservabilityMetadata[0], 1, "s1-signalfx"},
}

func TestSplunkObservabilityParseMetadata(t *testing.T) {
for _, testData := range testSplunkObservabilityMetadata {
_, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
} else if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestSplunkObservabilityGetMetricSpecForScaling(t *testing.T) {
for _, testData := range SplunkObservabilityMetricIdentifiers {
ctx := context.Background()
meta, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validSplunkObservabilityAuthParams, TriggerIndex: testData.triggerIndex})
if err != nil {
t.Fatal("Could not parse Splunk Observability metadata:", err)
}
mockSplunkObservabilityScaler := splunkObservabilityScaler{
metadata: meta,
}

metricSpec := mockSplunkObservabilityScaler.GetMetricSpecForScaling(ctx)
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.metricName {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scalers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewSolrScaler(config)
case "splunk":
return scalers.NewSplunkScaler(config)
case "splunk-observability":
return scalers.NewSplunkObservabilityScaler(config)
case "stan":
return scalers.NewStanScaler(config)
case "temporal":
Expand Down
2 changes: 2 additions & 0 deletions tests/.env
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@ GH_AUTOMATION_PAT=
GH_APP_ID=
GH_INST_ID=
GH_APP_KEY=
SPLUNK_OBSERVABILITY_ACCESS_TOKEN=
SPLUNK_OBSERVABILITY_REALM=
Loading
Loading