From e09a85a7cc099426bf3a608e212eaa5b0c80c18f Mon Sep 17 00:00:00 2001 From: Sebastian Schimper Date: Fri, 7 Feb 2025 14:15:33 +0100 Subject: [PATCH 1/4] adding splunk observability scaler --- pkg/scalers/splunk_observability_scaler.go | 174 +++++++++++++++ .../splunk_observability_scaler_test.go | 89 ++++++++ pkg/scaling/scalers_builder.go | 2 + tests/.env | 2 + .../splunk_observability_test.go | 207 ++++++++++++++++++ 5 files changed, 474 insertions(+) create mode 100644 pkg/scalers/splunk_observability_scaler.go create mode 100644 pkg/scalers/splunk_observability_scaler_test.go create mode 100644 tests/scalers/splunk_observability/splunk_observability_test.go diff --git a/pkg/scalers/splunk_observability_scaler.go b/pkg/scalers/splunk_observability_scaler.go new file mode 100644 index 00000000000..e575b06ee7e --- /dev/null +++ b/pkg/scalers/splunk_observability_scaler.go @@ -0,0 +1,174 @@ +package scalers + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/go-logr/logr" + "github.com/signalfx/signalflow-client-go/v2/signalflow" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type splunkObservabilityMetadata struct { + TriggerIndex int + + AccessToken string `keda:"name=accessToken, order=authParams"` + Realm string `keda:"name=realm, order=authParams"` + Query string `keda:"name=query, order=triggerMetadata"` + Duration int `keda:"name=duration, order=triggerMetadata"` + TargetValue float64 `keda:"name=targetValue, order=triggerMetadata"` + QueryAggregator string `keda:"name=queryAggregator, order=triggerMetadata"` + ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata"` +} + +type splunkObservabilityScaler struct { + metadata *splunkObservabilityMetadata + apiClient *signalflow.Client + logger logr.Logger +} + +func parseSplunkObservabilityMetadata(config *scalersconfig.ScalerConfig) (*splunkObservabilityMetadata, error) { + meta := &splunkObservabilityMetadata{} + meta.TriggerIndex = config.TriggerIndex + + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing splunk observability metadata: %w", err) + } + + return meta, nil +} + +func newSplunkO11yConnection(meta *splunkObservabilityMetadata, logger logr.Logger) (*signalflow.Client, error) { + if meta.Realm == "" || meta.AccessToken == "" { + return nil, fmt.Errorf("error: Could not find splunk access token or realm") + } + + apiClient, err := signalflow.NewClient( + signalflow.StreamURLForRealm(meta.Realm), + signalflow.AccessToken(meta.AccessToken), + signalflow.OnError(func(err error) { + logger.Error(err, "error in SignalFlow client") + })) + if err != nil { + return nil, fmt.Errorf("error creating SignalFlow client: %w", err) + } + + return apiClient, nil +} + +func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { + logger := InitializeLogger(config, "splunk_observability_scaler") + + meta, err := parseSplunkObservabilityMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing Splunk metadata: %w", err) + } + + apiClient, err := newSplunkO11yConnection(meta, logger) + if err != nil { + return nil, fmt.Errorf("error establishing Splunk Observability Cloud connection: %w", err) + } + + return &splunkObservabilityScaler{ + metadata: meta, + apiClient: apiClient, + logger: logger, + }, nil +} + +func (s *splunkObservabilityScaler) getQueryResult() (float64, error) { + comp, err := s.apiClient.Execute(context.Background(), &signalflow.ExecuteRequest{ + Program: s.metadata.Query, + }) + if err != nil { + return -1, fmt.Errorf("could not execute signalflow query: %w", err) + } + + s.logger.V(1).Info("Started MTS stream.") + + time.Sleep(time.Duration(s.metadata.Duration * int(time.Second))) + if err := comp.Stop(context.Background()); err != nil { + return -1, fmt.Errorf("error creating SignalFlow client: %w", err) + } + + s.logger.V(1).Info("Closed MTS stream.") + + max := math.Inf(-1) + min := math.Inf(1) + valueSum := 0.0 + valueCount := 0 + s.logger.V(1).Info("Now iterating over results.") + for msg := range comp.Data() { + if len(msg.Payloads) == 0 { + s.logger.V(1).Info("No data retrieved.") + continue + } + for _, pl := range msg.Payloads { + value, ok := pl.Value().(float64) + if !ok { + return -1, fmt.Errorf("could not convert Splunk Observability metric value to float64") + } + s.logger.V(1).Info(fmt.Sprintf("Encountering value %.4f\n", value)) + max = math.Max(max, value) + min = math.Min(min, value) + valueSum += value + valueCount++ + } + } + + if valueCount > 1 && s.metadata.QueryAggregator == "" { + return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series or add a queryAggregator") + } + + switch s.metadata.QueryAggregator { + case "max": + s.logger.V(1).Info(fmt.Sprintf("Returning max value: %.4f\n", max)) + return max, nil + case "min": + s.logger.V(1).Info(fmt.Sprintf("Returning min value: %.4f\n", min)) + return min, nil + case "avg": + avg := valueSum / float64(valueCount) + s.logger.V(1).Info(fmt.Sprintf("Returning avg value: %.4f\n", avg)) + return avg, nil + default: + return max, nil + } +} + +func (s *splunkObservabilityScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + num, err := s.getQueryResult() + + if err != nil { + s.logger.Error(err, "error getting metrics from Splunk Observability Cloud.") + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error getting metrics from Splunk Observability Cloud: %w", err) + } + metric := GenerateMetricInMili(metricName, num) + + return []external_metrics.ExternalMetricValue{metric}, num > s.metadata.ActivationTargetValue, nil +} + +func (s *splunkObservabilityScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + metricName := kedautil.NormalizeString("signalfx") + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName), + }, + Target: GetMetricTargetMili(v2.ValueMetricType, s.metadata.TargetValue), + } + metricSpec := v2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2.MetricSpec{metricSpec} +} + +func (s *splunkObservabilityScaler) Close(context.Context) error { + return nil +} + diff --git a/pkg/scalers/splunk_observability_scaler_test.go b/pkg/scalers/splunk_observability_scaler_test.go new file mode 100644 index 00000000000..2e60cb1d3b5 --- /dev/null +++ b/pkg/scalers/splunk_observability_scaler_test.go @@ -0,0 +1,89 @@ +package scalers + +import ( + "context" + "testing" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" +) + +type parseSplunkObservabilityMetadataTestData struct { + metadata map[string]string + authParams map[string]string + isError bool +} + +type SplunkObservabilityMetricIdentifier struct { + metadataTestData *parseSplunkObservabilityMetadataTestData + triggerIndex int + metricName string +} + +var validSplunkObservabilityAuthParams = map[string]string{ + "accessToken": "my-suyper-secret-access-token", + "realm": "my-realm", +} + +var validSplunkObservabilityMetadata = map[string]string{ + "query": "data('demo.trans.latency').max().publish()", + "duration": "10", + "targetValue": "200.0", + "queryAggregator": "avg", + "ActivationTargetValue": "1.1", +} + +var testSplunkObservabilityMetadata = []parseSplunkObservabilityMetadataTestData{ + // Valid metadata and valid auth params, pass. + {validSplunkObservabilityMetadata, validSplunkObservabilityAuthParams, false}, + // no params at all, fail + {map[string]string{}, map[string]string{}, true}, + // No meta dada but valid auth, fail. + {map[string]string{}, validSplunkObservabilityAuthParams, true}, + // Valid meta dada but no auth params, fail. + {validSplunkObservabilityMetadata, map[string]string{}, true}, + // Missing 'query' field, fail + {map[string]string{"duration": "10", "targetValue": "200.0", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true}, + // Missing 'duration' field, fail + {map[string]string{"query": "data('demo.trans.latency').max().publish()", "targetValue": "200.0", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true}, + // Missing 'targetValue' field, fail + {map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "queryAggregator": "avg", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true}, + // Missing 'queryAggregator' field, fail + {map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "activationTargetValue": "1.1"}, validSplunkObservabilityAuthParams, true}, + // Missing 'activationTargetValue' field, fail + {map[string]string{"query": "data('demo.trans.latency').max().publish()", "duration": "10", "targetValue": "200.0", "queryAggregator": "avg"}, validSplunkObservabilityAuthParams, true}, +} + +var SplunkObservabilityMetricIdentifiers = []SplunkObservabilityMetricIdentifier{ + {&testSplunkObservabilityMetadata[0], 0, "demo-trans-latency"}, + {&testSplunkObservabilityMetadata[0], 1, "demo-trans-latency"}, +} + +func TestSplunkObservabilityParseMetadata(t *testing.T) { + for _, testData := range testSplunkObservabilityMetadata { + _, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } else if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestSplunkObservabilityGetMetricSpecForScaling(t *testing.T) { + for _, testData := range SplunkObservabilityMetricIdentifiers { + ctx := context.Background() + meta, err := parseSplunkObservabilityMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validSplunkObservabilityAuthParams, TriggerIndex: testData.triggerIndex}) + if err != nil { + t.Fatal("Could not parse Splunk Observability metadata:", err) + } + mockSplunkObservabilityScaler := splunkObservabilityScaler{ + metadata: meta, + } + + metricSpec := mockSplunkObservabilityScaler.GetMetricSpecForScaling(ctx) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.metricName { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 77fb21de301..a8dd7b876bc 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -256,6 +256,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewSolrScaler(config) case "splunk": return scalers.NewSplunkScaler(config) + case "splunk-observability": + return scalers.NewSplunkObservabilityScaler(config) case "stan": return scalers.NewStanScaler(config) case "temporal": diff --git a/tests/.env b/tests/.env index f87c28a3445..9b54bfd3511 100644 --- a/tests/.env +++ b/tests/.env @@ -55,3 +55,5 @@ GH_AUTOMATION_PAT= GH_APP_ID= GH_INST_ID= GH_APP_KEY= +SPLUNK_OBSERVABILITY_ACCESS_TOKEN= +SPLUNK_OBSERVABILITY_REALM= \ No newline at end of file diff --git a/tests/scalers/splunk_observability/splunk_observability_test.go b/tests/scalers/splunk_observability/splunk_observability_test.go new file mode 100644 index 00000000000..e4025f869a2 --- /dev/null +++ b/tests/scalers/splunk_observability/splunk_observability_test.go @@ -0,0 +1,207 @@ +//go:build e2e +// +build e2e + +package splunk_observability_test + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "testing" + "time" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "splunk-observability-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + authName = fmt.Sprintf("%s-auth", testName) + accessToken = os.Getenv("SPLUNK_OBSERVABILITY_ACCESS_TOKEN") + realm = os.Getenv("SPLUNK_OBSERVABILITY_REALM") + signalflowQuery = "data('keda-test-metric').publish()" + duration = "10" + maxReplicaCount = 10 + minReplicaCount = 1 + scaleInTargetValue = "400" + scaleInActivationValue = "1.1" +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + AuthName string + AccessToken string + Realm string + SignalflowQuery string + Duration string + MinReplicaCount string + MaxReplicaCount string + TargetValue string + ActivationTargetValue string +} + +const ( + authTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: splunk-secrets + namespace: {{.TestNamespace}} +data: + accessToken: {{.AccessToken}} + realm: {{.Realm}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-splunk-secret + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: accessToken + name: splunk-secrets + key: accessToken + - parameter: realm + name: splunk-secrets + key: realm +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nginx + namespace: {{.TestNamespace}} +spec: + selector: + matchLabels: + app: nginx + replicas: 1 + template: + metadata: + labels: + app: nginx + spec: + containers: + - name: nginx + image: nginx:1.14.2 + ports: + - containerPort: 80 +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: keda + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: nginx + pollingInterval: 3 + cooldownPeriod: 1 + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + triggers: + - type: splunk-observability + metricType: Value + metadata: + query: {{.SignalflowQuery}} + duration: "10" + targetValue: "250" + activationTargetValue: "1.1" + queryAggregator: "max" # 'min', 'max', or 'avg' + authenticationRef: + name: keda-trigger-auth-splunk-secret +` +) + +func TestSplunkObservabilityScaler(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) + defer cancel() + + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + + t.Log("access token is " + data.AccessToken) + t.Log("realm is " + data.Realm) + + t.Cleanup(func() { + DeleteKubernetesResources(t, testNamespace, data, templates) + }) + + // Create kubernetes resources + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + // Ensure nginx deployment is ready + assert.True(t, WaitForAllPodRunningInNamespace(t, kc, testNamespace, minReplicaCount, 120), + "replica count should be %d after 2 minutes", minReplicaCount) + + // test scaling + testScaleOut(ctx, t, kc, testNamespace) + testScaleIn(ctx, t, kc) +} + +func getPodCount(ctx context.Context, kc *kubernetes.Clientset, namespace string) int { + pods, err := kc.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + panic(err.Error()) + } + return len(pods.Items) +} + +func testScaleOut(ctx context.Context, t *testing.T, kc *kubernetes.Clientset, namespace string) { + t.Log("--- testing scale out ---") + t.Log("waiting for 3 minutes") + time.Sleep(time.Duration(180) * time.Second) + + assert.True(t, getPodCount(ctx, kc, namespace) > minReplicaCount, "number of pods in deployment should be more than %d after 3 minutes", minReplicaCount) +} + +func testScaleIn(ctx context.Context, t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + t.Log("waiting for 10 minutes") + time.Sleep(time.Duration(600) * time.Second) + + assert.True(t, getPodCount(ctx, kc, testNamespace) > minReplicaCount, "number of pods in deployment should be less than %d after 10 minutes", maxReplicaCount) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + AuthName: authName, + AccessToken: base64.StdEncoding.EncodeToString([]byte(accessToken)), + Realm: base64.StdEncoding.EncodeToString([]byte(realm)), + SignalflowQuery: signalflowQuery, + Duration: duration, + MinReplicaCount: fmt.Sprintf("%v", minReplicaCount), + MaxReplicaCount: fmt.Sprintf("%v", maxReplicaCount), + TargetValue: scaleInTargetValue, + ActivationTargetValue: scaleInActivationValue, + }, []Template{ + {Name: "authTemplate", Config: authTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + } +} From 68e3f18f0411ee31df5c00af97526a082a893f4d Mon Sep 17 00:00:00 2001 From: Sebastian Schimper Date: Fri, 7 Feb 2025 14:26:32 +0100 Subject: [PATCH 2/4] added updated changelog for splunk observability scaler --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b84a84bb2e0..72704648e88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General**: Enable OpenSSF Scorecard to enhance security practices across the project ([#5913](https://github.com/kedacore/keda/issues/5913)) - **General**: Introduce new NSQ scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) - **General**: Operator flag to control patching of webhook resources certificates ([#6184](https://github.com/kedacore/keda/issues/6184)) +- **General**: Add Splunk Observability Scaler ([#6192](https://github.com/kedacore/keda/pull/6192)) #### Experimental From cbb84c388a363b81ae2fbd3f48eb151f9651b5ba Mon Sep 17 00:00:00 2001 From: Sebastian Schimper Date: Mon, 10 Feb 2025 08:13:16 +0100 Subject: [PATCH 3/4] working in PR feedback from JorTurFer, alinging lines, changing test names and propagate context to getQueryResult() func --- go.mod | 2 + go.sum | 4 + pkg/scalers/splunk_observability_scaler.go | 23 +- .../splunk_observability_scaler_test.go | 4 +- .../signalfx/signalflow-client-go/v2/LICENSE | 201 +++++++++ .../v2/signalflow/client.go | 374 +++++++++++++++++ .../v2/signalflow/computation.go | 385 +++++++++++++++++ .../v2/signalflow/conn.go | 196 +++++++++ .../signalflow-client-go/v2/signalflow/doc.go | 17 + .../v2/signalflow/fake_backend.go | 396 ++++++++++++++++++ .../v2/signalflow/messages/binary.go | 188 +++++++++ .../v2/signalflow/messages/control.go | 33 ++ .../v2/signalflow/messages/error.go | 22 + .../v2/signalflow/messages/event.go | 8 + .../v2/signalflow/messages/info.go | 125 ++++++ .../v2/signalflow/messages/json.go | 47 +++ .../v2/signalflow/messages/metadata.go | 80 ++++ .../v2/signalflow/messages/types.go | 129 ++++++ .../v2/signalflow/requests.go | 94 +++++ .../github.com/signalfx/signalfx-go/LICENSE | 201 +++++++++ .../signalfx/signalfx-go/idtool/idtool.go | 44 ++ vendor/modules.txt | 7 + 22 files changed, 2566 insertions(+), 14 deletions(-) create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/LICENSE create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/client.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/computation.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/conn.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/doc.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/fake_backend.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/binary.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/control.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/error.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/event.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/info.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/json.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/metadata.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/types.go create mode 100644 vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/requests.go create mode 100644 vendor/github.com/signalfx/signalfx-go/LICENSE create mode 100644 vendor/github.com/signalfx/signalfx-go/idtool/idtool.go diff --git a/go.mod b/go.mod index eff0def7b9e..4260d4d2c04 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/segmentio/kafka-go v0.4.47 github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0 + github.com/signalfx/signalflow-client-go/v2 v2.3.0 github.com/spf13/cast v1.7.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.10.0 @@ -130,6 +131,7 @@ require ( github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect github.com/google/go-github/v66 v66.0.0 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + github.com/signalfx/signalfx-go v1.34.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.31.0 // indirect go.temporal.io/api v1.43.0 // indirect diff --git a/go.sum b/go.sum index a20bc35931f..c4a05f3b693 100644 --- a/go.sum +++ b/go.sum @@ -2322,6 +2322,10 @@ github.com/shurcooL/go v0.0.0-20200502201357-93f07166e636/go.mod h1:TDJrrUr11Vxr github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= +github.com/signalfx/signalflow-client-go/v2 v2.3.0 h1:CMhvEfDDWbdPCfMNiQTAymRIRzVbgveGbTq5wr8OHuM= +github.com/signalfx/signalflow-client-go/v2 v2.3.0/go.mod h1:ir6CHksVkhh1vlslldjf6k5qD88QQxWW8WMG5PxSQco= +github.com/signalfx/signalfx-go v1.34.0 h1:OQ6tyMY4efWB57EPIQqrpWrAfcSdyfa+bLtmAe7GLfE= +github.com/signalfx/signalfx-go v1.34.0/go.mod h1:IpGZLPvCKNFyspAXoS480jB02mocTpo0KYd8jbl6/T8= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= diff --git a/pkg/scalers/splunk_observability_scaler.go b/pkg/scalers/splunk_observability_scaler.go index e575b06ee7e..49da892cc67 100644 --- a/pkg/scalers/splunk_observability_scaler.go +++ b/pkg/scalers/splunk_observability_scaler.go @@ -18,12 +18,12 @@ import ( type splunkObservabilityMetadata struct { TriggerIndex int - AccessToken string `keda:"name=accessToken, order=authParams"` - Realm string `keda:"name=realm, order=authParams"` - Query string `keda:"name=query, order=triggerMetadata"` - Duration int `keda:"name=duration, order=triggerMetadata"` - TargetValue float64 `keda:"name=targetValue, order=triggerMetadata"` - QueryAggregator string `keda:"name=queryAggregator, order=triggerMetadata"` + AccessToken string `keda:"name=accessToken, order=authParams"` + Realm string `keda:"name=realm, order=authParams"` + Query string `keda:"name=query, order=triggerMetadata"` + Duration int `keda:"name=duration, order=triggerMetadata"` + TargetValue float64 `keda:"name=targetValue, order=triggerMetadata"` + QueryAggregator string `keda:"name=queryAggregator, order=triggerMetadata"` ActivationTargetValue float64 `keda:"name=activationTargetValue, order=triggerMetadata"` } @@ -82,8 +82,8 @@ func NewSplunkObservabilityScaler(config *scalersconfig.ScalerConfig) (Scaler, e }, nil } -func (s *splunkObservabilityScaler) getQueryResult() (float64, error) { - comp, err := s.apiClient.Execute(context.Background(), &signalflow.ExecuteRequest{ +func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64, error) { + comp, err := s.apiClient.Execute(ctx, &signalflow.ExecuteRequest{ Program: s.metadata.Query, }) if err != nil { @@ -93,7 +93,7 @@ func (s *splunkObservabilityScaler) getQueryResult() (float64, error) { s.logger.V(1).Info("Started MTS stream.") time.Sleep(time.Duration(s.metadata.Duration * int(time.Second))) - if err := comp.Stop(context.Background()); err != nil { + if err := comp.Stop(ctx); err != nil { return -1, fmt.Errorf("error creating SignalFlow client: %w", err) } @@ -142,8 +142,8 @@ func (s *splunkObservabilityScaler) getQueryResult() (float64, error) { } } -func (s *splunkObservabilityScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - num, err := s.getQueryResult() +func (s *splunkObservabilityScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + num, err := s.getQueryResult(ctx) if err != nil { s.logger.Error(err, "error getting metrics from Splunk Observability Cloud.") @@ -171,4 +171,3 @@ func (s *splunkObservabilityScaler) GetMetricSpecForScaling(context.Context) []v func (s *splunkObservabilityScaler) Close(context.Context) error { return nil } - diff --git a/pkg/scalers/splunk_observability_scaler_test.go b/pkg/scalers/splunk_observability_scaler_test.go index 2e60cb1d3b5..c3a0f560163 100644 --- a/pkg/scalers/splunk_observability_scaler_test.go +++ b/pkg/scalers/splunk_observability_scaler_test.go @@ -54,8 +54,8 @@ var testSplunkObservabilityMetadata = []parseSplunkObservabilityMetadataTestData } var SplunkObservabilityMetricIdentifiers = []SplunkObservabilityMetricIdentifier{ - {&testSplunkObservabilityMetadata[0], 0, "demo-trans-latency"}, - {&testSplunkObservabilityMetadata[0], 1, "demo-trans-latency"}, + {&testSplunkObservabilityMetadata[0], 0, "s0-signalfx"}, + {&testSplunkObservabilityMetadata[0], 1, "s1-signalfx"}, } func TestSplunkObservabilityParseMetadata(t *testing.T) { diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/LICENSE b/vendor/github.com/signalfx/signalflow-client-go/v2/LICENSE new file mode 100644 index 00000000000..261eeb9e9f8 --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/client.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/client.go new file mode 100644 index 00000000000..608513a55bb --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/client.go @@ -0,0 +1,374 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package signalflow + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/url" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" + "github.com/signalfx/signalflow-client-go/v2/signalflow/messages" +) + +// Client for SignalFlow via websockets (SSE is not currently supported). +type Client struct { + // Access token for the org + token string + userAgent string + defaultMetadataTimeout time.Duration + nextChannelNum int64 + conn *wsConn + readTimeout time.Duration + // How long to wait for writes to the websocket to finish + writeTimeout time.Duration + streamURL *url.URL + onError OnErrorFunc + channelsByName map[string]chan messages.Message + + // These are the lower-level WebSocket level channels for byte messages + outgoingTextMsgs chan *outgoingMessage + incomingTextMsgs chan []byte + incomingBinaryMsgs chan []byte + connectedCh chan struct{} + + isClosed atomic.Bool + sync.Mutex + cancel context.CancelFunc +} + +type clientMessageRequest struct { + msg interface{} + resultCh chan error +} + +// ClientParam is the common type of configuration functions for the SignalFlow client +type ClientParam func(*Client) error + +// StreamURL lets you set the full URL to the stream endpoint, including the +// path. +func StreamURL(streamEndpoint string) ClientParam { + return func(c *Client) error { + var err error + c.streamURL, err = url.Parse(streamEndpoint) + return err + } +} + +// StreamURLForRealm can be used to configure the websocket url for a specific +// SignalFx realm. +func StreamURLForRealm(realm string) ClientParam { + return func(c *Client) error { + var err error + c.streamURL, err = url.Parse(fmt.Sprintf("wss://stream.%s.signalfx.com/v2/signalflow", realm)) + return err + } +} + +// AccessToken can be used to provide a SignalFx organization access token or +// user access token to the SignalFlow client. +func AccessToken(token string) ClientParam { + return func(c *Client) error { + c.token = token + return nil + } +} + +// UserAgent allows setting the `userAgent` field when authenticating to +// SignalFlow. This can be useful for accounting how many jobs are started +// from each client. +func UserAgent(userAgent string) ClientParam { + return func(c *Client) error { + c.userAgent = userAgent + return nil + } +} + +// ReadTimeout sets the duration to wait between messages that come on the +// websocket. If the resolution of the job is very low, this should be +// increased. +func ReadTimeout(timeout time.Duration) ClientParam { + return func(c *Client) error { + if timeout <= 0 { + return errors.New("ReadTimeout cannot be <= 0") + } + c.readTimeout = timeout + return nil + } +} + +// WriteTimeout sets the maximum duration to wait to send a single message when +// writing messages to the SignalFlow server over the WebSocket connection. +func WriteTimeout(timeout time.Duration) ClientParam { + return func(c *Client) error { + if timeout <= 0 { + return errors.New("WriteTimeout cannot be <= 0") + } + c.writeTimeout = timeout + return nil + } +} + +type OnErrorFunc func(err error) + +func OnError(f OnErrorFunc) ClientParam { + return func(c *Client) error { + c.onError = f + return nil + } +} + +// NewClient makes a new SignalFlow client that will immediately try and +// connect to the SignalFlow backend. +func NewClient(options ...ClientParam) (*Client, error) { + c := &Client{ + streamURL: &url.URL{ + Scheme: "wss", + Host: "stream.us0.signalfx.com", + Path: "/v2/signalflow", + }, + readTimeout: 1 * time.Minute, + writeTimeout: 5 * time.Second, + channelsByName: make(map[string]chan messages.Message), + + outgoingTextMsgs: make(chan *outgoingMessage), + incomingTextMsgs: make(chan []byte), + incomingBinaryMsgs: make(chan []byte), + connectedCh: make(chan struct{}), + } + + for i := range options { + if err := options[i](c); err != nil { + return nil, err + } + } + + c.conn = &wsConn{ + StreamURL: c.streamURL, + OutgoingTextMsgs: c.outgoingTextMsgs, + IncomingTextMsgs: c.incomingTextMsgs, + IncomingBinaryMsgs: c.incomingBinaryMsgs, + ConnectedCh: c.connectedCh, + ConnectTimeout: 10 * time.Second, + ReadTimeout: c.readTimeout, + WriteTimeout: c.writeTimeout, + OnError: c.onError, + PostDisconnectCallback: func() { + c.closeRegisteredChannels() + }, + PostConnectMessage: func() []byte { + bytes, err := c.makeAuthRequest() + if err != nil { + c.sendErrIfWanted(fmt.Errorf("failed to send auth: %w", err)) + return nil + } + return bytes + }, + } + + var ctx context.Context + ctx, c.cancel = context.WithCancel(context.Background()) + + go c.conn.Run(ctx) + go c.run(ctx) + + return c, nil +} + +func (c *Client) newUniqueChannelName() string { + name := fmt.Sprintf("ch-%d", atomic.AddInt64(&c.nextChannelNum, 1)) + return name +} + +func (c *Client) sendErrIfWanted(err error) { + if c.onError != nil { + c.onError(err) + } +} + +// Writes all messages from a single goroutine since that is required by +// websocket library. +func (c *Client) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case msg := <-c.incomingTextMsgs: + err := c.handleMessage(msg, websocket.TextMessage) + if err != nil { + c.sendErrIfWanted(fmt.Errorf("error handling SignalFlow text message: %w", err)) + } + case msg := <-c.incomingBinaryMsgs: + err := c.handleMessage(msg, websocket.BinaryMessage) + if err != nil { + c.sendErrIfWanted(fmt.Errorf("error handling SignalFlow binary message: %w", err)) + } + } + } +} + +func (c *Client) sendMessage(ctx context.Context, message interface{}) error { + msgBytes, err := c.serializeMessage(message) + if err != nil { + return err + } + + resultCh := make(chan error, 1) + select { + case c.outgoingTextMsgs <- &outgoingMessage{ + bytes: msgBytes, + resultCh: resultCh, + }: + return <-resultCh + case <-ctx.Done(): + close(resultCh) + return ctx.Err() + } +} + +func (c *Client) serializeMessage(message interface{}) ([]byte, error) { + msgBytes, err := json.Marshal(message) + if err != nil { + return nil, fmt.Errorf("could not marshal SignalFlow request: %w", err) + } + return msgBytes, nil +} + +func (c *Client) handleMessage(msgBytes []byte, msgTyp int) error { + message, err := messages.ParseMessage(msgBytes, msgTyp == websocket.TextMessage) + if err != nil { + return fmt.Errorf("could not parse SignalFlow message: %w", err) + } + + if cm, ok := message.(messages.ChannelMessage); ok { + channelName := cm.Channel() + c.Lock() + channel, ok := c.channelsByName[channelName] + if !ok { + // The channel should have existed before, but now doesn't, + // probably because it was closed. + return nil + } else if channelName == "" { + c.acceptMessage(message) + return nil + } + channel <- message + c.Unlock() + } else { + return c.acceptMessage(message) + } + return nil +} + +// acceptMessages accepts non-channel specific messages. The only one that I +// know of is the authenticated response. +func (c *Client) acceptMessage(message messages.Message) error { + if _, ok := message.(*messages.AuthenticatedMessage); ok { + return nil + } else if msg, ok := message.(*messages.BaseJSONMessage); ok { + data := msg.RawData() + if data != nil && data["event"] == "KEEP_ALIVE" { + // Ignore keep alive messages + return nil + } + } + + return fmt.Errorf("unknown SignalFlow message received: %v", message) +} + +// Sends the authenticate message but does not wait for a response. +func (c *Client) makeAuthRequest() ([]byte, error) { + return c.serializeMessage(&AuthRequest{ + Token: c.token, + UserAgent: c.userAgent, + }) +} + +// Execute a SignalFlow job and return a channel upon which informational +// messages and data will flow. +// See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Execute-a-computation +func (c *Client) Execute(ctx context.Context, req *ExecuteRequest) (*Computation, error) { + if req.Channel == "" { + req.Channel = c.newUniqueChannelName() + } + + err := c.sendMessage(ctx, req) + if err != nil { + return nil, err + } + + return newComputation(c.registerChannel(req.Channel), req.Channel, c), nil +} + +// Detach from a computation but keep it running. See +// https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Detach-from-a-computation. +func (c *Client) Detach(ctx context.Context, req *DetachRequest) error { + // We are assuming that the detach request will always come from the same + // client that started it with the Execute method above, and thus the + // connection is still active (i.e. we don't need to call ensureInitialized + // here). If the websocket connection does drop, all jobs started by that + // connection get detached/stopped automatically. + return c.sendMessage(ctx, req) +} + +// Stop sends a job stop request message to the backend. It does not wait for +// jobs to actually be stopped. +// See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Stop-a-computation +func (c *Client) Stop(ctx context.Context, req *StopRequest) error { + // We are assuming that the stop request will always come from the same + // client that started it with the Execute method above, and thus the + // connection is still active (i.e. we don't need to call ensureInitialized + // here). If the websocket connection does drop, all jobs started by that + // connection get stopped automatically. + return c.sendMessage(ctx, req) +} + +func (c *Client) registerChannel(name string) chan messages.Message { + ch := make(chan messages.Message) + + c.Lock() + c.channelsByName[name] = ch + c.Unlock() + + return ch +} + +func (c *Client) closeRegisteredChannels() { + c.Lock() + for _, ch := range c.channelsByName { + close(ch) + } + c.channelsByName = map[string]chan messages.Message{} + c.Unlock() +} + +// Close the client and shutdown any ongoing connections and goroutines. The client cannot be +// reused after Close. Calling any of the client methods after Close() is undefined and will likely +// result in a panic. +func (c *Client) Close() { + if c.isClosed.Load() { + panic("cannot close client more than once") + } + c.isClosed.Store(true) + + c.cancel() + c.closeRegisteredChannels() + +DRAIN: + for { + select { + case outMsg := <-c.outgoingTextMsgs: + outMsg.resultCh <- io.EOF + default: + break DRAIN + } + } + close(c.outgoingTextMsgs) +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/computation.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/computation.go new file mode 100644 index 00000000000..3ba70be0ef0 --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/computation.go @@ -0,0 +1,385 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package signalflow + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/signalfx/signalflow-client-go/v2/signalflow/messages" + "github.com/signalfx/signalfx-go/idtool" +) + +// Computation is a single running SignalFlow job +type Computation struct { + sync.Mutex + channel <-chan messages.Message + name string + client *Client + dataCh chan *messages.DataMessage + // An intermediate channel for data messages where they can be buffered if + // nothing is currently pulling data messages. + dataChBuffer chan *messages.DataMessage + eventCh chan *messages.EventMessage + infoCh chan *messages.InfoMessage + eventChBuffer chan *messages.EventMessage + expirationCh chan *messages.ExpiredTSIDMessage + expirationChBuffer chan *messages.ExpiredTSIDMessage + infoChBuffer chan *messages.InfoMessage + + errMutex sync.RWMutex + lastError error + + handle asyncMetadata[string] + resolutionMS asyncMetadata[int] + lagMS asyncMetadata[int] + maxDelayMS asyncMetadata[int] + matchedSize asyncMetadata[int] + limitSize asyncMetadata[int] + matchedNoTimeseriesQuery asyncMetadata[string] + groupByMissingProperties asyncMetadata[[]string] + + tsidMetadata map[idtool.ID]*asyncMetadata[*messages.MetadataProperties] +} + +// ComputationError exposes the underlying metadata of a computation error +type ComputationError struct { + Code int + Message string + ErrorType string +} + +func (e *ComputationError) Error() string { + err := fmt.Sprintf("%v", e.Code) + if e.ErrorType != "" { + err = fmt.Sprintf("%v (%v)", e.Code, e.ErrorType) + } + if e.Message != "" { + err = fmt.Sprintf("%v: %v", err, e.Message) + } + return err +} + +func newComputation(channel <-chan messages.Message, name string, client *Client) *Computation { + comp := &Computation{ + channel: channel, + name: name, + client: client, + dataCh: make(chan *messages.DataMessage), + dataChBuffer: make(chan *messages.DataMessage), + eventCh: make(chan *messages.EventMessage), + infoCh: make(chan *messages.InfoMessage), + eventChBuffer: make(chan *messages.EventMessage), + expirationCh: make(chan *messages.ExpiredTSIDMessage), + expirationChBuffer: make(chan *messages.ExpiredTSIDMessage), + infoChBuffer: make(chan *messages.InfoMessage), + tsidMetadata: make(map[idtool.ID]*asyncMetadata[*messages.MetadataProperties]), + } + + go bufferMessages(comp.dataChBuffer, comp.dataCh) + go bufferMessages(comp.expirationChBuffer, comp.expirationCh) + go bufferMessages(comp.eventChBuffer, comp.eventCh) + go bufferMessages(comp.infoChBuffer, comp.infoCh) + + go func() { + err := comp.watchMessages() + + if !errors.Is(err, errChannelClosed) { + comp.errMutex.Lock() + comp.lastError = err + comp.errMutex.Unlock() + } + + comp.shutdown() + }() + + return comp +} + +// Handle of the computation. Will wait as long as the given ctx is not closed. If ctx is closed an +// error will be returned. +func (c *Computation) Handle(ctx context.Context) (string, error) { + return c.handle.Get(ctx) +} + +// Resolution of the job. Will wait as long as the given ctx is not closed. If ctx is closed an +// error will be returned. +func (c *Computation) Resolution(ctx context.Context) (time.Duration, error) { + resMS, err := c.resolutionMS.Get(ctx) + return time.Duration(resMS) * time.Millisecond, err +} + +// Lag detected for the job. Will wait as long as the given ctx is not closed. If ctx is closed an +// error will be returned. +func (c *Computation) Lag(ctx context.Context) (time.Duration, error) { + lagMS, err := c.lagMS.Get(ctx) + return time.Duration(lagMS) * time.Millisecond, err +} + +// MaxDelay detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an +// error will be returned. +func (c *Computation) MaxDelay(ctx context.Context) (time.Duration, error) { + maxDelayMS, err := c.maxDelayMS.Get(ctx) + return time.Duration(maxDelayMS) * time.Millisecond, err +} + +// MatchedSize detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an +// error will be returned. +func (c *Computation) MatchedSize(ctx context.Context) (int, error) { + return c.matchedSize.Get(ctx) +} + +// LimitSize detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an +// error will be returned. +func (c *Computation) LimitSize(ctx context.Context) (int, error) { + return c.limitSize.Get(ctx) +} + +// MatchedNoTimeseriesQuery if it matched no active timeseries. Will wait as long as the given ctx +// is not closed. If ctx is closed an error will be returned. +func (c *Computation) MatchedNoTimeseriesQuery(ctx context.Context) (string, error) { + return c.matchedNoTimeseriesQuery.Get(ctx) +} + +// GroupByMissingProperties are timeseries that don't contain the required dimensions. Will wait as +// long as the given ctx is not closed. If ctx is closed an error will be returned. +func (c *Computation) GroupByMissingProperties(ctx context.Context) ([]string, error) { + return c.groupByMissingProperties.Get(ctx) +} + +// TSIDMetadata for a particular tsid. Will wait as long as the given ctx is not closed. If ctx is closed an +// error will be returned. +func (c *Computation) TSIDMetadata(ctx context.Context, tsid idtool.ID) (*messages.MetadataProperties, error) { + c.Lock() + if _, ok := c.tsidMetadata[tsid]; !ok { + c.tsidMetadata[tsid] = &asyncMetadata[*messages.MetadataProperties]{} + } + md := c.tsidMetadata[tsid] + c.Unlock() + return md.Get(ctx) +} + +// Err returns the last fatal error that caused the computation to stop, if +// any. Will be nil if the computation stopped in an expected manner. +func (c *Computation) Err() error { + c.errMutex.RLock() + defer c.errMutex.RUnlock() + + return c.lastError +} + +func (c *Computation) watchMessages() error { + for { + m, ok := <-c.channel + if !ok { + return nil + } + if err := c.processMessage(m); err != nil { + return err + } + } +} + +var errChannelClosed = errors.New("computation channel is closed") + +func (c *Computation) processMessage(m messages.Message) error { + switch v := m.(type) { + case *messages.JobStartControlMessage: + c.handle.Set(v.Handle) + case *messages.EndOfChannelControlMessage, *messages.ChannelAbortControlMessage: + return errChannelClosed + case *messages.DataMessage: + c.dataChBuffer <- v + case *messages.ExpiredTSIDMessage: + c.Lock() + delete(c.tsidMetadata, idtool.IDFromString(v.TSID)) + c.Unlock() + c.expirationChBuffer <- v + case *messages.InfoMessage: + switch v.MessageBlock.Code { + case messages.JobRunningResolution: + c.resolutionMS.Set(v.MessageBlock.Contents.(messages.JobRunningResolutionContents).ResolutionMS()) + case messages.JobDetectedLag: + c.lagMS.Set(v.MessageBlock.Contents.(messages.JobDetectedLagContents).LagMS()) + case messages.JobInitialMaxDelay: + c.maxDelayMS.Set(v.MessageBlock.Contents.(messages.JobInitialMaxDelayContents).MaxDelayMS()) + case messages.FindLimitedResultSet: + c.matchedSize.Set(v.MessageBlock.Contents.(messages.FindLimitedResultSetContents).MatchedSize()) + c.limitSize.Set(v.MessageBlock.Contents.(messages.FindLimitedResultSetContents).LimitSize()) + case messages.FindMatchedNoTimeseries: + c.matchedNoTimeseriesQuery.Set(v.MessageBlock.Contents.(messages.FindMatchedNoTimeseriesContents).MatchedNoTimeseriesQuery()) + case messages.GroupByMissingProperty: + c.groupByMissingProperties.Set(v.MessageBlock.Contents.(messages.GroupByMissingPropertyContents).GroupByMissingProperties()) + } + c.infoChBuffer <- v + case *messages.ErrorMessage: + rawData := v.RawData() + computationError := ComputationError{} + if code, ok := rawData["error"]; ok { + computationError.Code = int(code.(float64)) + } + if msg, ok := rawData["message"]; ok && msg != nil { + computationError.Message = msg.(string) + } + if errType, ok := rawData["errorType"]; ok { + computationError.ErrorType = errType.(string) + } + return &computationError + case *messages.MetadataMessage: + c.Lock() + if _, ok := c.tsidMetadata[v.TSID]; !ok { + c.tsidMetadata[v.TSID] = &asyncMetadata[*messages.MetadataProperties]{} + } + c.tsidMetadata[v.TSID].Set(&v.Properties) + c.Unlock() + case *messages.EventMessage: + c.eventChBuffer <- v + } + return nil +} + +func bufferMessages[T any](in chan *T, out chan *T) { + buffer := make([]*T, 0) + var nextMessage *T + + defer func() { + if nextMessage != nil { + out <- nextMessage + } + for i := range buffer { + out <- buffer[i] + } + + close(out) + }() + for { + if len(buffer) > 0 { + if nextMessage == nil { + nextMessage, buffer = buffer[0], buffer[1:] + } + + select { + case out <- nextMessage: + nextMessage = nil + case msg, ok := <-in: + if !ok { + return + } + buffer = append(buffer, msg) + } + } else { + msg, ok := <-in + if !ok { + return + } + buffer = append(buffer, msg) + } + } +} + +// Data returns the channel on which data messages come. This channel will be closed when the +// computation is finished. To prevent goroutine leaks, you should read all messages from this +// channel until it is closed. +func (c *Computation) Data() <-chan *messages.DataMessage { + return c.dataCh +} + +// Expirations returns a channel that will be sent messages about expired TSIDs, i.e. time series +// that are no longer valid for this computation. This channel will be closed when the computation +// is finished. To prevent goroutine leaks, you should read all messages from this channel until it +// is closed. +func (c *Computation) Expirations() <-chan *messages.ExpiredTSIDMessage { + return c.expirationCh +} + +// Events returns a channel that receives event/alert messages from the signalflow computation. +func (c *Computation) Events() <-chan *messages.EventMessage { + return c.eventCh +} + +// Info returns a channel that receives info messages from the signalflow computation. +func (c *Computation) Info() <-chan *messages.InfoMessage { + return c.infoCh +} + +// Detach the computation on the backend +func (c *Computation) Detach(ctx context.Context) error { + return c.DetachWithReason(ctx, "") +} + +// DetachWithReason detaches the computation with a given reason. This reason will +// be reflected in the control message that signals the end of the job/channel +func (c *Computation) DetachWithReason(ctx context.Context, reason string) error { + return c.client.Detach(ctx, &DetachRequest{ + Reason: reason, + Channel: c.name, + }) +} + +// Stop the computation on the backend. +func (c *Computation) Stop(ctx context.Context) error { + return c.StopWithReason(ctx, "") +} + +// StopWithReason stops the computation with a given reason. This reason will +// be reflected in the control message that signals the end of the job/channel. +func (c *Computation) StopWithReason(ctx context.Context, reason string) error { + handle, err := c.handle.Get(ctx) + if err != nil { + return err + } + return c.client.Stop(ctx, &StopRequest{ + Reason: reason, + Handle: handle, + }) +} + +func (c *Computation) shutdown() { + close(c.dataChBuffer) + close(c.expirationChBuffer) + close(c.infoChBuffer) + close(c.eventChBuffer) +} + +var ErrMetadataTimeout = errors.New("metadata value did not come in time") + +type asyncMetadata[T any] struct { + sync.Mutex + sig chan struct{} + isSet bool + val T +} + +func (a *asyncMetadata[T]) ensureInit() { + a.Lock() + if a.sig == nil { + a.sig = make(chan struct{}) + } + a.Unlock() +} + +func (a *asyncMetadata[T]) Set(val T) { + a.ensureInit() + a.Lock() + a.val = val + if !a.isSet { + close(a.sig) + a.isSet = true + } + a.Unlock() +} + +func (a *asyncMetadata[T]) Get(ctx context.Context) (T, error) { + a.ensureInit() + select { + case <-ctx.Done(): + var t T + return t, ErrMetadataTimeout + case <-a.sig: + return a.val, nil + } +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/conn.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/conn.go new file mode 100644 index 00000000000..8260128a1b4 --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/conn.go @@ -0,0 +1,196 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package signalflow + +import ( + "context" + "fmt" + "net/url" + "path" + "time" + + "github.com/gorilla/websocket" +) + +// How long to wait between connections in case of a bad connection. +var reconnectDelay = 5 * time.Second + +type wsConn struct { + StreamURL *url.URL + + OutgoingTextMsgs chan *outgoingMessage + IncomingTextMsgs chan []byte + IncomingBinaryMsgs chan []byte + ConnectedCh chan struct{} + + ConnectTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + OnError OnErrorFunc + PostDisconnectCallback func() + PostConnectMessage func() []byte +} + +type outgoingMessage struct { + bytes []byte + resultCh chan error +} + +// Run keeps the connection alive and puts all incoming messages into a channel +// as needed. +func (c *wsConn) Run(ctx context.Context) { + var conn *websocket.Conn + defer func() { + if conn != nil { + conn.Close() + } + }() + + for { + if conn != nil { + conn.Close() + time.Sleep(reconnectDelay) + } + // This will get run on before the first connection as well. + if c.PostDisconnectCallback != nil { + c.PostDisconnectCallback() + } + + if ctx.Err() != nil { + return + } + + dialCtx, cancel := context.WithTimeout(ctx, c.ConnectTimeout) + var err error + conn, err = c.connect(dialCtx) + cancel() + if err != nil { + c.sendErrIfWanted(fmt.Errorf("Error connecting to SignalFlow websocket: %w", err)) + continue + } + + err = c.postConnect(conn) + if err != nil { + c.sendErrIfWanted(fmt.Errorf("Error setting up SignalFlow websocket: %w", err)) + continue + } + + err = c.readAndWriteMessages(conn) + if err == nil { + return + } + c.sendErrIfWanted(fmt.Errorf("Error in SignalFlow websocket: %w", err)) + } +} + +type messageWithType struct { + bytes []byte + msgType int +} + +func (c *wsConn) readAndWriteMessages(conn *websocket.Conn) error { + readMessageCh := make(chan messageWithType) + readErrCh := make(chan error) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + bytes, typ, err := readNextMessage(conn, c.ReadTimeout) + if err != nil { + select { + case readErrCh <- err: + case <-ctx.Done(): + } + return + } + readMessageCh <- messageWithType{ + bytes: bytes, + msgType: typ, + } + } + }() + + for { + select { + case msg, ok := <-readMessageCh: + if !ok { + return nil + } + if msg.msgType == websocket.TextMessage { + c.IncomingTextMsgs <- msg.bytes + } else { + c.IncomingBinaryMsgs <- msg.bytes + } + case err := <-readErrCh: + return err + case msg, ok := <-c.OutgoingTextMsgs: + if !ok { + return nil + } + err := c.writeMessage(conn, msg.bytes) + msg.resultCh <- err + if err != nil { + return err + } + } + } +} + +func (c *wsConn) sendErrIfWanted(err error) { + if c.OnError != nil { + c.OnError(err) + } +} + +func (c *wsConn) Close() { + close(c.IncomingTextMsgs) + close(c.IncomingBinaryMsgs) +} + +func (c *wsConn) connect(ctx context.Context) (*websocket.Conn, error) { + connectURL := *c.StreamURL + connectURL.Path = path.Join(c.StreamURL.Path, "connect") + conn, _, err := websocket.DefaultDialer.DialContext(ctx, connectURL.String(), nil) + if err != nil { + return nil, fmt.Errorf("could not connect Signalflow websocket: %w", err) + } + return conn, nil +} + +func (c *wsConn) postConnect(conn *websocket.Conn) error { + if c.PostConnectMessage != nil { + msg := c.PostConnectMessage() + if msg != nil { + return c.writeMessage(conn, msg) + } + } + return nil +} + +func readNextMessage(conn *websocket.Conn, timeout time.Duration) (data []byte, msgType int, err error) { + if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil { + return nil, 0, fmt.Errorf("could not set read timeout in SignalFlow client: %w", err) + } + + typ, bytes, err := conn.ReadMessage() + if err != nil { + return nil, 0, err + } + return bytes, typ, nil +} + +func (c *wsConn) writeMessage(conn *websocket.Conn, msgBytes []byte) error { + err := conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + if err != nil { + return fmt.Errorf("could not set write timeout for SignalFlow request: %w", err) + } + + err = conn.WriteMessage(websocket.TextMessage, msgBytes) + if err != nil { + return err + } + return nil +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/doc.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/doc.go new file mode 100644 index 00000000000..69a3f0d97ef --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/doc.go @@ -0,0 +1,17 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +/* +Package signalflow contains a SignalFx SignalFlow client, +which can be used to execute analytics jobs against the SignalFx backend. + +Not all SignalFlow messages are handled at this time, +and some will be silently dropped. +All of the most important and useful ones are supported though. + +The client will automatically attempt to reconnect to the backend +if the connection is broken after a short delay. + +SignalFlow is documented at https://dev.splunk.com/observability/docs/signalflow/messages. +*/ +package signalflow diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/fake_backend.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/fake_backend.go new file mode 100644 index 00000000000..e52fd3237da --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/fake_backend.go @@ -0,0 +1,396 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package signalflow + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "net/http/httptest" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/signalfx/signalflow-client-go/v2/signalflow/messages" + "github.com/signalfx/signalfx-go/idtool" +) + +var upgrader = websocket.Upgrader{} // use default options + +type tsidVal struct { + TSID idtool.ID + Val float64 +} + +// FakeBackend is useful for testing, both internal to this package and +// externally. It supports basic messages and allows for the specification of +// metadata and data messages that map to a particular program. +type FakeBackend struct { + sync.Mutex + + AccessToken string + authenticated bool + + conns map[*websocket.Conn]bool + + received []map[string]interface{} + metadataByTSID map[idtool.ID]*messages.MetadataProperties + dataByTSID map[idtool.ID]*float64 + tsidsByProgram map[string][]idtool.ID + programErrors map[string]string + runningJobsByProgram map[string]int + cancelFuncsByHandle map[string]context.CancelFunc + cancelFuncsByChannel map[string]context.CancelFunc + server *httptest.Server + handleIdx int + + logger *log.Logger +} + +func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithCancel(context.Background()) + + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + panic(err) + } + f.registerConn(c) + defer c.Close() + defer cancel() + + textMsgs := make(chan string) + binMsgs := make(chan []byte) + go func() { + for { + var err error + select { + case m := <-textMsgs: + err = c.WriteMessage(websocket.TextMessage, []byte(m)) + case m := <-binMsgs: + err = c.WriteMessage(websocket.BinaryMessage, m) + case <-ctx.Done(): + f.unregisterConn(c) + return + } + if err != nil { + f.logger.Printf("Could not write message: %v", err) + } + } + }() + + for { + _, message, err := c.ReadMessage() + if err != nil { + if !errors.Is(err, net.ErrClosed) { + f.logger.Println("read err:", err) + } + return + } + + var in map[string]interface{} + if err := json.Unmarshal(message, &in); err != nil { + f.logger.Println("error unmarshalling: ", err) + } + f.received = append(f.received, in) + + err = f.handleMessage(ctx, in, textMsgs, binMsgs) + if err != nil { + f.logger.Printf("Error handling fake backend message, closing connection: %v", err) + return + } + } +} + +func (f *FakeBackend) registerConn(conn *websocket.Conn) { + f.Lock() + f.conns[conn] = true + f.Unlock() +} + +func (f *FakeBackend) unregisterConn(conn *websocket.Conn) { + f.Lock() + delete(f.conns, conn) + f.Unlock() +} + +func (f *FakeBackend) handleMessage(ctx context.Context, message map[string]interface{}, textMsgs chan<- string, binMsgs chan<- []byte) error { + typ, ok := message["type"].(string) + if !ok { + textMsgs <- `{"type": "error"}` + return nil + } + + switch typ { + case "authenticate": + token, _ := message["token"].(string) + if f.AccessToken == "" || token == f.AccessToken { + textMsgs <- `{"type": "authenticated"}` + f.authenticated = true + } else { + textMsgs <- `{"type": "error", "message": "Invalid auth token"}` + return errors.New("bad auth token") + } + case "stop": + if cancel := f.cancelFuncsByHandle[message["handle"].(string)]; cancel != nil { + cancel() + } + case "detach": + if cancel := f.cancelFuncsByChannel[message["channel"].(string)]; cancel != nil { + cancel() + } + case "execute": + if !f.authenticated { + return errors.New("not authenticated") + } + program, _ := message["program"].(string) + ch, _ := message["channel"].(string) + + if errMsg := f.programErrors[program]; errMsg != "" { + textMsgs <- fmt.Sprintf(`{"type": "error", "message": "%s", "channel": "%s"}`, errMsg, ch) + } + + programTSIDs := f.tsidsByProgram[program] + + handle := fmt.Sprintf("handle-%d", f.handleIdx) + f.handleIdx++ + + execCtx, cancel := context.WithCancel(ctx) + f.cancelFuncsByHandle[handle] = cancel + f.cancelFuncsByChannel[ch] = cancel + + f.logger.Printf("Executing SignalFlow program %s with tsids %v and handle %s", program, programTSIDs, handle) + f.runningJobsByProgram[program]++ + + var resolutionMs int + for _, tsid := range programTSIDs { + if md := f.metadataByTSID[tsid]; md != nil { + if md.ResolutionMS > resolutionMs { + resolutionMs = md.ResolutionMS + } + } + } + + messageResMs, _ := message["resolution"].(float64) + if messageResMs != 0.0 { + resolutionMs = int(messageResMs) + } + + if resolutionMs == 0 { + resolutionMs = 1000 + } + + // use start and stop to control ending the fakebackend + var stopMs uint64 + var startMs uint64 + messageStopMs, _ := message["stop"].(float64) + if messageStopMs != 0.0 { + stopMs = uint64(messageStopMs) + } + + messageStartMs, _ := message["start"].(float64) + if messageStartMs != 0.0 { + startMs = uint64(messageStartMs) + } + + if startMs == 0 { + startMs = uint64(time.Now().UnixNano() / (1000 * 1000)) + } + + textMsgs <- fmt.Sprintf(`{"type": "control-message", "channel": "%s", "event": "STREAM_START"}`, ch) + textMsgs <- fmt.Sprintf(`{"type": "control-message", "channel": "%s", "event": "JOB_START", "handle": "%s"}`, ch, handle) + textMsgs <- fmt.Sprintf(`{"type": "message", "channel": "%s", "logicalTimestampMs": 1464736034000, "message": {"contents": {"resolutionMs" : %d}, "messageCode": "JOB_RUNNING_RESOLUTION", "timestampMs": 1464736033000}}`, ch, int64(resolutionMs)) + + for _, tsid := range programTSIDs { + if md := f.metadataByTSID[tsid]; md != nil { + propJSON, err := json.Marshal(md) + if err != nil { + f.logger.Printf("Error serializing metadata to json: %v", err) + continue + } + textMsgs <- fmt.Sprintf(`{"type": "metadata", "tsId": "%s", "channel": "%s", "properties": %s}`, tsid, ch, propJSON) + } + } + + f.logger.Print("done sending metadata messages") + + // Send data periodically until the connection is closed. + iterations := 0 + go func() { + t := time.NewTicker(time.Duration(resolutionMs) * time.Millisecond) + for { + select { + case <-execCtx.Done(): + f.logger.Printf("sending done") + f.Lock() + f.runningJobsByProgram[program]-- + f.Unlock() + return + case <-t.C: + f.Lock() + valsWithTSID := []tsidVal{} + for _, tsid := range programTSIDs { + if data := f.dataByTSID[tsid]; data != nil { + valsWithTSID = append(valsWithTSID, tsidVal{TSID: tsid, Val: *data}) + } + } + f.Unlock() + metricTime := startMs + uint64(iterations*resolutionMs) + if stopMs != 0 && metricTime > stopMs { + f.logger.Printf("sending channel end") + // tell the client the computation is complete + textMsgs <- fmt.Sprintf(`{"type": "control-message", "channel": "%s", "event": "END_OF_CHANNEL", "handle": "%s"}`, ch, handle) + return + } + f.logger.Printf("sending data message") + binMsgs <- makeDataMessage(ch, valsWithTSID, metricTime) + f.logger.Printf("done sending data message") + iterations++ + } + } + }() + } + return nil +} + +func makeDataMessage(channel string, valsWithTSID []tsidVal, now uint64) []byte { + var ch [16]byte + copy(ch[:], channel) + header := messages.BinaryMessageHeader{ + Version: 1, + MessageType: 5, + Flags: 0, + Reserved: 0, + Channel: ch, + } + w := new(bytes.Buffer) + binary.Write(w, binary.BigEndian, &header) + + dataHeader := messages.DataMessageHeader{ + TimestampMillis: now, + ElementCount: uint32(len(valsWithTSID)), + } + binary.Write(w, binary.BigEndian, &dataHeader) + + for i := range valsWithTSID { + var valBytes [8]byte + buf := new(bytes.Buffer) + binary.Write(buf, binary.BigEndian, valsWithTSID[i].Val) + copy(valBytes[:], buf.Bytes()) + + payload := messages.DataPayload{ + Type: messages.ValTypeDouble, + TSID: valsWithTSID[i].TSID, + Val: valBytes, + } + + binary.Write(w, binary.BigEndian, &payload) + } + + return w.Bytes() +} + +func (f *FakeBackend) Start() { + f.metadataByTSID = map[idtool.ID]*messages.MetadataProperties{} + f.dataByTSID = map[idtool.ID]*float64{} + f.tsidsByProgram = map[string][]idtool.ID{} + f.programErrors = map[string]string{} + f.runningJobsByProgram = map[string]int{} + f.cancelFuncsByHandle = map[string]context.CancelFunc{} + f.cancelFuncsByChannel = map[string]context.CancelFunc{} + f.conns = map[*websocket.Conn]bool{} + f.server = httptest.NewServer(f) +} + +func (f *FakeBackend) Stop() { + f.KillExistingConnections() + f.server.Close() +} + +func (f *FakeBackend) Restart() { + l, err := net.Listen("tcp", f.server.Listener.Addr().String()) + if err != nil { + panic("Could not relisten: " + err.Error()) + } + f.server = httptest.NewUnstartedServer(f) + f.server.Listener = l + f.server.Start() +} + +func (f *FakeBackend) Client() (*Client, error) { + return NewClient(StreamURL(f.URL()), AccessToken(f.AccessToken)) +} + +func (f *FakeBackend) AddProgramError(program string, errorMsg string) { + f.Lock() + f.programErrors[program] = errorMsg + f.Unlock() +} + +func (f *FakeBackend) AddProgramTSIDs(program string, tsids []idtool.ID) { + f.Lock() + f.tsidsByProgram[program] = tsids + f.Unlock() +} + +func (f *FakeBackend) AddTSIDMetadata(tsid idtool.ID, props *messages.MetadataProperties) { + f.Lock() + f.metadataByTSID[tsid] = props + f.Unlock() +} + +func (f *FakeBackend) SetTSIDFloatData(tsid idtool.ID, val float64) { + f.Lock() + f.dataByTSID[tsid] = &val + f.Unlock() +} + +func (f *FakeBackend) RemoveTSIDData(tsid idtool.ID) { + f.Lock() + delete(f.dataByTSID, tsid) + f.Unlock() +} + +func (f *FakeBackend) URL() string { + return strings.Replace(f.server.URL, "http", "ws", 1) +} + +func (f *FakeBackend) KillExistingConnections() { + f.Lock() + for conn := range f.conns { + conn.Close() + } + f.Unlock() +} + +// RunningJobsForProgram returns how many currently executing jobs there are +// for a particular program text. +func (f *FakeBackend) RunningJobsForProgram(program string) int { + f.Lock() + defer f.Unlock() + return f.runningJobsByProgram[program] +} + +// SetLogger sets the internal logger. +func (f *FakeBackend) SetLogger(logger *log.Logger) { + f.Lock() + f.logger = logger + f.Unlock() +} + +func NewRunningFakeBackend() *FakeBackend { + f := &FakeBackend{ + AccessToken: "abcd", + logger: log.New(io.Discard, "", 0), + } + f.Start() + return f +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/binary.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/binary.go new file mode 100644 index 00000000000..55c4081dadb --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/binary.go @@ -0,0 +1,188 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package messages + +import ( + "bytes" + "compress/gzip" + "encoding/binary" + "errors" + "fmt" + "io" + "math" + + "github.com/signalfx/signalfx-go/idtool" +) + +type DataPayload struct { + Type ValType + TSID idtool.ID + Val [8]byte +} + +// Value returns the numeric value as an interface{}. +func (dp *DataPayload) Value() interface{} { + switch dp.Type { + case ValTypeLong: + return dp.Int64() + case ValTypeDouble: + return dp.Float64() + case ValTypeInt: + return dp.Int32() + default: + return nil + } +} + +func (dp *DataPayload) Int64() int64 { + n := binary.BigEndian.Uint64(dp.Val[:]) + return int64(n) +} + +func (dp *DataPayload) Float64() float64 { + bits := binary.BigEndian.Uint64(dp.Val[:]) + return math.Float64frombits(bits) +} + +func (dp *DataPayload) Int32() int32 { + var n int32 + _ = binary.Read(bytes.NewBuffer(dp.Val[:]), binary.BigEndian, &n) + return n +} + +// DataMessage is a set of datapoints that share a common timestamp +type DataMessage struct { + BaseMessage + BaseChannelMessage + TimestampedMessage + Payloads []DataPayload +} + +func (dm *DataMessage) String() string { + pls := make([]map[string]interface{}, 0) + for _, pl := range dm.Payloads { + pls = append(pls, map[string]interface{}{ + "type": pl.Type, + "tsid": pl.TSID, + "value": pl.Value(), + }) + } + + return fmt.Sprintf("%v", map[string]interface{}{ + "channel": dm.Channel(), + "timestamp": dm.Timestamp(), + "payloads": pls, + }) +} + +type DataMessageHeader struct { + TimestampMillis uint64 + ElementCount uint32 +} + +type ValType uint8 + +const ( + ValTypeLong ValType = 1 + ValTypeDouble ValType = 2 + ValTypeInt ValType = 3 +) + +func (vt ValType) String() string { + switch vt { + case ValTypeLong: + return "long" + case ValTypeDouble: + return "double" + case ValTypeInt: + return "int32" + } + return "Unknown" +} + +// BinaryMessageHeader represents the first 20 bytes of every binary websocket +// message from the backend. +// https://developers.signalfx.com/signalflow_analytics/rest_api_messages/stream_messages_specification.html#_binary_encoding_of_websocket_messages +type BinaryMessageHeader struct { + Version uint8 + MessageType uint8 + Flags uint8 + Reserved uint8 + Channel [16]byte +} + +const ( + compressed uint8 = 1 << iota + jsonEncoded = 1 << iota +) + +func parseBinaryHeader(msg []byte) (string, bool /* isCompressed */, bool /* isJSON */, []byte /* rest of message */, error) { + if len(msg) <= 20 { + return "", false, false, nil, fmt.Errorf("expected SignalFlow message of at least 21 bytes, got %d bytes", len(msg)) + } + + r := bytes.NewReader(msg[:20]) + var header BinaryMessageHeader + err := binary.Read(r, binary.BigEndian, &header) + if err != nil { + return "", false, false, nil, err + } + + isCompressed := header.Flags&compressed != 0 + isJSON := header.Flags&jsonEncoded != 0 + + return string(header.Channel[:bytes.IndexByte(header.Channel[:], 0)]), isCompressed, isJSON, msg[20:], err +} + +func parseBinaryMessage(msg []byte) (Message, error) { + channel, isCompressed, isJSON, rest, err := parseBinaryHeader(msg) + if err != nil { + return nil, err + } + + if isCompressed { + reader, err := gzip.NewReader(bytes.NewReader(rest)) + if err != nil { + return nil, err + } + rest, err = io.ReadAll(reader) + if err != nil { + return nil, err + } + } + + if isJSON { + return nil, errors.New("cannot handle json binary message") + } + + r := bytes.NewReader(rest[:12]) + var header DataMessageHeader + err = binary.Read(r, binary.BigEndian, &header) + if err != nil { + return nil, err + } + + var payloads []DataPayload + for i := 0; i < int(header.ElementCount); i++ { + r := bytes.NewReader(rest[12+17*i : 12+17*(i+1)]) + var payload DataPayload + if err := binary.Read(r, binary.BigEndian, &payload); err != nil { + return nil, err + } + payloads = append(payloads, payload) + } + + return &DataMessage{ + BaseMessage: BaseMessage{ + Typ: DataType, + }, + BaseChannelMessage: BaseChannelMessage{ + Chan: channel, + }, + TimestampedMessage: TimestampedMessage{ + TimestampMillis: header.TimestampMillis, + }, + Payloads: payloads, + }, nil +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/control.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/control.go new file mode 100644 index 00000000000..930ad1394c4 --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/control.go @@ -0,0 +1,33 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package messages + +// The event types used in the control-message messages. This are not used for +// "event" type messages. +const ( + StreamStartEvent = "STREAM_START" + JobStartEvent = "JOB_START" + JobProgressEvent = "JOB_PROGRESS" + ChannelAbortEvent = "CHANNEL_ABORT" + EndOfChannelEvent = "END_OF_CHANNEL" +) + +type BaseControlMessage struct { + BaseJSONChannelMessage + TimestampedMessage + Event string `json:"event"` +} + +type JobStartControlMessage struct { + BaseControlMessage + Handle string `json:"handle"` +} + +type EndOfChannelControlMessage struct { + BaseControlMessage +} + +type ChannelAbortControlMessage struct { + BaseControlMessage +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/error.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/error.go new file mode 100644 index 00000000000..9aa6e08d287 --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/error.go @@ -0,0 +1,22 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package messages + +type ErrorContext struct { + BindingName string `json:"bindingName"` + Column int `json:"column"` + Line int `json:"line"` + ProgramText string `json:"programText"` + Reference string `json:"reference"` + Traceback interface{} `json:"traceback"` +} + +type ErrorMessage struct { + BaseJSONChannelMessage + + Context ErrorContext `json:"context"` + Error int `json:"error"` + ErrorType string `json:"errorType"` + Message string `json:"message"` +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/event.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/event.go new file mode 100644 index 00000000000..6bdaa5c5a58 --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/event.go @@ -0,0 +1,8 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package messages + +type EventMessage struct { + BaseJSONChannelMessage +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/info.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/info.go new file mode 100644 index 00000000000..5ac2757cef3 --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/info.go @@ -0,0 +1,125 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package messages + +import ( + "encoding/json" + "time" +) + +const ( + JobRunningResolution = "JOB_RUNNING_RESOLUTION" + JobDetectedLag = "JOB_DETECTED_LAG" + JobInitialMaxDelay = "JOB_INITIAL_MAX_DELAY" + FindLimitedResultSet = "FIND_LIMITED_RESULT_SET" + FindMatchedNoTimeseries = "FIND_MATCHED_NO_TIMESERIES" + GroupByMissingProperty = "GROUPBY_MISSING_PROPERTY" +) + +type MessageBlock struct { + TimestampedMessage + Code string `json:"messageCode"` + Level string `json:"messageLevel"` + NumInputTimeseries int `json:"numInputTimeSeries"` + // If the messageCode field in the message is known, this will be an + // instance that has more specific methods to access the known fields. You + // can always access the original content by treating this value as a + // map[string]interface{}. + Contents interface{} `json:"-"` + ContentsRaw map[string]interface{} `json:"contents"` +} + +type InfoMessage struct { + BaseJSONChannelMessage + LogicalTimestampMillis uint64 `json:"logicalTimestampMs"` + MessageBlock `json:"message"` +} + +func (im *InfoMessage) UnmarshalJSON(raw []byte) error { + type IM InfoMessage + if err := json.Unmarshal(raw, (*IM)(im)); err != nil { + return err + } + + mb := &im.MessageBlock + switch mb.Code { + case JobRunningResolution: + mb.Contents = JobRunningResolutionContents(mb.ContentsRaw) + case JobDetectedLag: + mb.Contents = JobDetectedLagContents(mb.ContentsRaw) + case JobInitialMaxDelay: + mb.Contents = JobInitialMaxDelayContents(mb.ContentsRaw) + case FindLimitedResultSet: + mb.Contents = FindLimitedResultSetContents(mb.ContentsRaw) + case FindMatchedNoTimeseries: + mb.Contents = FindMatchedNoTimeseriesContents(mb.ContentsRaw) + case GroupByMissingProperty: + mb.Contents = GroupByMissingPropertyContents(mb.ContentsRaw) + default: + mb.Contents = mb.ContentsRaw + } + + return nil +} + +func (im *InfoMessage) LogicalTimestamp() time.Time { + return time.Unix(0, int64(im.LogicalTimestampMillis*uint64(time.Millisecond))) +} + +type JobRunningResolutionContents map[string]interface{} + +func (jm JobRunningResolutionContents) ResolutionMS() int { + field, _ := jm["resolutionMs"].(float64) + return int(field) +} + +type JobDetectedLagContents map[string]interface{} + +func (jm JobDetectedLagContents) LagMS() int { + field, _ := jm["lagMs"].(float64) + return int(field) +} + +type JobInitialMaxDelayContents map[string]interface{} + +func (jm JobInitialMaxDelayContents) MaxDelayMS() int { + field, _ := jm["maxDelayMs"].(float64) + return int(field) +} + +type FindLimitedResultSetContents map[string]interface{} + +func (jm FindLimitedResultSetContents) MatchedSize() int { + field, _ := jm["matchedSize"].(float64) + return int(field) +} + +func (jm FindLimitedResultSetContents) LimitSize() int { + field, _ := jm["limitSize"].(float64) + return int(field) +} + +type FindMatchedNoTimeseriesContents map[string]interface{} + +func (jm FindMatchedNoTimeseriesContents) MatchedNoTimeseriesQuery() string { + field, _ := jm["query"].(string) + return field +} + +type GroupByMissingPropertyContents map[string]interface{} + +func (jm GroupByMissingPropertyContents) GroupByMissingProperties() []string { + propNames := make([]string, len(jm["propertyNames"].([]interface{}))) + for i, v := range jm["propertyNames"].([]interface{}) { + propNames[i] = v.(string) + } + return propNames +} + +// ExpiredTSIDMessage is received when a timeseries has expired and is no +// longer relvant to a computation. +type ExpiredTSIDMessage struct { + BaseJSONChannelMessage + TSID string `json:"tsId"` +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/json.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/json.go new file mode 100644 index 00000000000..a00043e2c7b --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/json.go @@ -0,0 +1,47 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package messages + +import ( + "encoding/json" +) + +func parseJSONMessage(baseMessage Message, msg []byte) (JSONMessage, error) { + var out JSONMessage + switch baseMessage.Type() { + case AuthenticatedType: + out = &AuthenticatedMessage{} + case ControlMessageType: + var base BaseControlMessage + if err := json.Unmarshal(msg, &base); err != nil { + return nil, err + } + + switch base.Event { + case JobStartEvent: + out = &JobStartControlMessage{} + case EndOfChannelEvent: + out = &EndOfChannelControlMessage{} + case ChannelAbortEvent: + out = &ChannelAbortControlMessage{} + default: + return &base, nil + } + case ErrorType: + out = &ErrorMessage{} + case MetadataType: + out = &MetadataMessage{} + case ExpiredTSIDType: + out = &ExpiredTSIDMessage{} + case MessageType: + out = &InfoMessage{} + case EventType: + out = &EventMessage{} + default: + out = &BaseJSONMessage{} + } + err := json.Unmarshal(msg, out) + out.JSONBase().rawMessage = msg + return out, err +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/metadata.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/metadata.go new file mode 100644 index 00000000000..e356efd09b1 --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/metadata.go @@ -0,0 +1,80 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package messages + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/signalfx/signalfx-go/idtool" +) + +type MetadataMessage struct { + BaseJSONChannelMessage + TSID idtool.ID `json:"tsId"` + Properties MetadataProperties `json:"properties"` +} + +type MetadataProperties struct { + Metric string `json:"sf_metric"` + OriginatingMetric string `json:"sf_originatingMetric"` + ResolutionMS int `json:"sf_resolutionMs"` + CreatedOnMS int `json:"sf_createdOnMs"` + // Additional SignalFx-generated properties about this time series. Many + // of these are exposed directly in fields on this struct. + InternalProperties map[string]interface{} `json:"-"` + // Custom properties applied to the timeseries through various means, + // including dimensions, properties on matching dimensions, etc. + CustomProperties map[string]string `json:"-"` +} + +func (mp *MetadataProperties) UnmarshalJSON(b []byte) error { + // Deserialize it at first to get all the well-known fields put in place so + // we don't have to manually assign them below. + type Alias MetadataProperties + if err := json.Unmarshal(b, (*Alias)(mp)); err != nil { + return err + } + + // Deserialize it again to a generic map so we can get at all the fields. + var propMap map[string]interface{} + if err := json.Unmarshal(b, &propMap); err != nil { + return err + } + + mp.InternalProperties = make(map[string]interface{}) + mp.CustomProperties = make(map[string]string) + for k, v := range propMap { + if strings.HasPrefix(k, "sf_") { + mp.InternalProperties[k] = v + } else { + mp.CustomProperties[k] = fmt.Sprintf("%v", v) + } + } + return nil +} + +func (mp *MetadataProperties) MarshalJSON() ([]byte, error) { + type Alias MetadataProperties + intermediate, err := json.Marshal((*Alias)(mp)) + if err != nil { + return nil, err + } + + out := map[string]interface{}{} + err = json.Unmarshal(intermediate, &out) + if err != nil { + return nil, err + } + + for k, v := range mp.InternalProperties { + out[k] = v + } + for k, v := range mp.CustomProperties { + out[k] = v + } + + return json.Marshal(out) +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/types.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/types.go new file mode 100644 index 00000000000..72718924d18 --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/messages/types.go @@ -0,0 +1,129 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package messages + +import ( + "encoding/json" + "fmt" + "time" +) + +// See https://developers.signalfx.com/signalflow_analytics/rest_api_messages/stream_messages_specification.html +const ( + AuthenticatedType = "authenticated" + ControlMessageType = "control-message" + ErrorType = "error" + MetadataType = "metadata" + MessageType = "message" + DataType = "data" + EventType = "event" + WebsocketErrorType = "websocket-error" + ExpiredTSIDType = "expired-tsid" +) + +type BaseMessage struct { + Typ string `json:"type"` +} + +func (bm *BaseMessage) Type() string { + return bm.Typ +} + +func (bm *BaseMessage) String() string { + return fmt.Sprintf("%s message", bm.Typ) +} + +func (bm *BaseMessage) Base() *BaseMessage { + return bm +} + +var _ Message = &BaseMessage{} + +type Message interface { + Type() string + Base() *BaseMessage +} + +type ChannelMessage interface { + Channel() string +} + +type BaseChannelMessage struct { + Chan string `json:"channel,omitempty"` +} + +func (bcm *BaseChannelMessage) Channel() string { + return bcm.Chan +} + +type JSONMessage interface { + Message + JSONBase() *BaseJSONMessage + RawData() map[string]interface{} +} + +type BaseJSONMessage struct { + BaseMessage + rawMessage []byte + rawData map[string]interface{} +} + +func (j *BaseJSONMessage) JSONBase() *BaseJSONMessage { + return j +} + +// The raw message deserialized from JSON. Only applicable for JSON +// Useful if the message type doesn't have a concrete struct type implemented +// in this library (e.g. due to an upgrade to the SignalFlow protocol). +func (j *BaseJSONMessage) RawData() map[string]interface{} { + if j.rawData == nil { + if err := json.Unmarshal(j.rawMessage, &j.rawData); err != nil { + // This shouldn't ever error since it wouldn't have been initially + // deserialized if there were parse errors. But in case it does + // just return nil. + return nil + } + } + return j.rawData +} + +func (j *BaseJSONMessage) String() string { + return j.BaseMessage.String() + string(j.rawMessage) +} + +type BaseJSONChannelMessage struct { + BaseJSONMessage + BaseChannelMessage +} + +func (j *BaseJSONChannelMessage) String() string { + return string(j.BaseJSONMessage.rawMessage) +} + +type TimestampedMessage struct { + TimestampMillis uint64 `json:"timestampMs"` +} + +func (m *TimestampedMessage) Timestamp() time.Time { + return time.Unix(0, int64(m.TimestampMillis*uint64(time.Millisecond))) +} + +type AuthenticatedMessage struct { + BaseJSONMessage + OrgID string `json:"orgId"` + UserID string `json:"userId"` +} + +// The way to distinguish between JSON and binary messages is the websocket +// message type. +func ParseMessage(msg []byte, isText bool) (Message, error) { + if isText { + var baseMessage BaseMessage + if err := json.Unmarshal(msg, &baseMessage); err != nil { + return nil, fmt.Errorf("couldn't unmarshal JSON websocket message: %w", err) + } + return parseJSONMessage(&baseMessage, msg) + } + return parseBinaryMessage(msg) +} diff --git a/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/requests.go b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/requests.go new file mode 100644 index 00000000000..351be6ee43d --- /dev/null +++ b/vendor/github.com/signalfx/signalflow-client-go/v2/signalflow/requests.go @@ -0,0 +1,94 @@ +// Copyright Splunk Inc. +// SPDX-License-Identifier: Apache-2.0 + +package signalflow + +import ( + "encoding/json" + "time" +) + +type AuthType string + +func (at AuthType) MarshalJSON() ([]byte, error) { + return []byte(`"authenticate"`), nil +} + +type AuthRequest struct { + // This should not be set manually. + Type AuthType `json:"type"` + // The Auth token for the org + Token string `json:"token"` + UserAgent string `json:"userAgent,omitempty"` +} + +type ExecuteType string + +func (ExecuteType) MarshalJSON() ([]byte, error) { + return []byte(`"execute"`), nil +} + +// See +// https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Execute-message-properties +// for details on the fields. +type ExecuteRequest struct { + // This should not be set manually + Type ExecuteType `json:"type"` + Program string `json:"program"` + Channel string `json:"channel"` + Start time.Time `json:"-"` + Stop time.Time `json:"-"` + Resolution time.Duration `json:"-"` + MaxDelay time.Duration `json:"-"` + StartMs int64 `json:"start"` + StopMs int64 `json:"stop"` + ResolutionMs int64 `json:"resolution"` + MaxDelayMs int64 `json:"maxDelay"` + Immediate bool `json:"immediate"` + Timezone string `json:"timezone"` +} + +// MarshalJSON does some assignments to allow using more native Go types for +// time/duration. +func (er ExecuteRequest) MarshalJSON() ([]byte, error) { + if !er.Start.IsZero() { + er.StartMs = er.Start.UnixNano() / int64(time.Millisecond) + } + if !er.Stop.IsZero() { + er.StopMs = er.Stop.UnixNano() / int64(time.Millisecond) + } + if er.Resolution != 0 { + er.ResolutionMs = er.Resolution.Nanoseconds() / int64(time.Millisecond) + } + if er.MaxDelay != 0 { + er.MaxDelayMs = er.MaxDelay.Nanoseconds() / int64(time.Millisecond) + } + type alias ExecuteRequest + return json.Marshal(alias(er)) +} + +type DetachType string + +func (DetachType) MarshalJSON() ([]byte, error) { + return []byte(`"detach"`), nil +} + +type DetachRequest struct { + // This should not be set manually + Type DetachType `json:"type"` + Channel string `json:"channel"` + Reason string `json:"reason"` +} + +type StopType string + +func (StopType) MarshalJSON() ([]byte, error) { + return []byte(`"stop"`), nil +} + +type StopRequest struct { + // This should not be set manually + Type StopType `json:"type"` + Handle string `json:"handle"` + Reason string `json:"reason"` +} diff --git a/vendor/github.com/signalfx/signalfx-go/LICENSE b/vendor/github.com/signalfx/signalfx-go/LICENSE new file mode 100644 index 00000000000..8dada3edaf5 --- /dev/null +++ b/vendor/github.com/signalfx/signalfx-go/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/signalfx/signalfx-go/idtool/idtool.go b/vendor/github.com/signalfx/signalfx-go/idtool/idtool.go new file mode 100644 index 00000000000..cb8489e063f --- /dev/null +++ b/vendor/github.com/signalfx/signalfx-go/idtool/idtool.go @@ -0,0 +1,44 @@ +package idtool + +import ( + "encoding/base64" + "encoding/binary" + "encoding/json" + "strings" +) + +// ID is used to identify many SignalFx resources, including time series. +type ID int64 + +// String returns the string representation commonly used instead of an int64 +func (id ID) String() string { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(id)) + return strings.TrimRight(base64.URLEncoding.EncodeToString(b), "=") +} + +// UnmarshalJSON assumes that the id is always serialized in the string format. +func (id *ID) UnmarshalJSON(b []byte) error { + var s string + if err := json.Unmarshal(b, &s); err != nil { + return err + } + id2 := IDFromString(s) + *id = id2 + return nil +} + +// IDFromString creates an ID from a pseudo-base64 string +func IDFromString(idstr string) ID { + if idstr != "" { + if idstr[len(idstr)-1] != '=' { + idstr = idstr + "=" + } + buff, err := base64.URLEncoding.DecodeString(idstr) + if err == nil { + output := binary.BigEndian.Uint64(buff) + return ID(output) + } + } + return ID(0) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c81a9862670..38de66bb9e5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1458,6 +1458,13 @@ github.com/sergi/go-diff/diffmatchpatch # github.com/shopspring/decimal v1.3.1 ## explicit; go 1.13 github.com/shopspring/decimal +# github.com/signalfx/signalflow-client-go/v2 v2.3.0 +## explicit; go 1.21 +github.com/signalfx/signalflow-client-go/v2/signalflow +github.com/signalfx/signalflow-client-go/v2/signalflow/messages +# github.com/signalfx/signalfx-go v1.34.0 +## explicit; go 1.17 +github.com/signalfx/signalfx-go/idtool # github.com/sirupsen/logrus v1.9.3 ## explicit; go 1.13 github.com/sirupsen/logrus From 8114ced6c43f49e687f0b3fcf07782432bf53481 Mon Sep 17 00:00:00 2001 From: Sebastian Schimper Date: Mon, 10 Feb 2025 08:34:13 +0100 Subject: [PATCH 4/4] working in feedback from failed 'Static Checks' test --- pkg/scalers/splunk_observability_scaler.go | 18 +++++++++--------- tests/.env | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/scalers/splunk_observability_scaler.go b/pkg/scalers/splunk_observability_scaler.go index 49da892cc67..bc1f530a262 100644 --- a/pkg/scalers/splunk_observability_scaler.go +++ b/pkg/scalers/splunk_observability_scaler.go @@ -99,8 +99,8 @@ func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64 s.logger.V(1).Info("Closed MTS stream.") - max := math.Inf(-1) - min := math.Inf(1) + maxValue := math.Inf(-1) + minValue := math.Inf(1) valueSum := 0.0 valueCount := 0 s.logger.V(1).Info("Now iterating over results.") @@ -115,8 +115,8 @@ func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64 return -1, fmt.Errorf("could not convert Splunk Observability metric value to float64") } s.logger.V(1).Info(fmt.Sprintf("Encountering value %.4f\n", value)) - max = math.Max(max, value) - min = math.Min(min, value) + maxValue = math.Max(maxValue, value) + minValue = math.Min(minValue, value) valueSum += value valueCount++ } @@ -128,17 +128,17 @@ func (s *splunkObservabilityScaler) getQueryResult(ctx context.Context) (float64 switch s.metadata.QueryAggregator { case "max": - s.logger.V(1).Info(fmt.Sprintf("Returning max value: %.4f\n", max)) - return max, nil + s.logger.V(1).Info(fmt.Sprintf("Returning max value: %.4f\n", maxValue)) + return maxValue, nil case "min": - s.logger.V(1).Info(fmt.Sprintf("Returning min value: %.4f\n", min)) - return min, nil + s.logger.V(1).Info(fmt.Sprintf("Returning min value: %.4f\n", minValue)) + return minValue, nil case "avg": avg := valueSum / float64(valueCount) s.logger.V(1).Info(fmt.Sprintf("Returning avg value: %.4f\n", avg)) return avg, nil default: - return max, nil + return maxValue, nil } } diff --git a/tests/.env b/tests/.env index 9b54bfd3511..feed064e3ce 100644 --- a/tests/.env +++ b/tests/.env @@ -56,4 +56,4 @@ GH_APP_ID= GH_INST_ID= GH_APP_KEY= SPLUNK_OBSERVABILITY_ACCESS_TOKEN= -SPLUNK_OBSERVABILITY_REALM= \ No newline at end of file +SPLUNK_OBSERVABILITY_REALM=