Skip to content

Commit

Permalink
#375 - Remove Streamdal Action
Browse files Browse the repository at this point in the history
  • Loading branch information
blinktag committed Dec 1, 2024
1 parent 21c852f commit 7086a38
Show file tree
Hide file tree
Showing 31 changed files with 594 additions and 4,416 deletions.
53 changes: 2 additions & 51 deletions backends/gcppubsub/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,16 @@ package gcppubsub

import (
"context"
"fmt"
"sync"
"time"

"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
sdk "github.com/streamdal/streamdal/sdks/go"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/streamdal/plumber/backends/gcppubsub/types"
"github.com/streamdal/plumber/util"

"github.com/streamdal/plumber/prometheus"
"github.com/streamdal/plumber/validate"
)
Expand All @@ -33,14 +28,6 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel
"backend": "gcp-pubsub",
})

// streamdal sdk BEGIN
sc, err := util.SetupStreamdalSDK(relayOpts, llog)
if err != nil {
return errors.Wrap(err, "kafka.Relay(): unable to create new streamdal client")
}
// defer sc.Close()
// streamdal sdk END

var m sync.Mutex

var readFunc = func(ctx context.Context, msg *pubsub.Message) {
Expand All @@ -53,42 +40,6 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel

prometheus.Incr("gcp-pubsub-relay-consumer", 1)

// streamdal sdk BEGIN
// If streamdal integration is enabled, process message via sdk
if sc != nil {
g.log.Debug("Processing message via streamdal SDK")

operationName := "relay"

if relayOpts != nil && relayOpts.GcpPubsub != nil && relayOpts.GcpPubsub.GetArgs() != nil {
if relayOpts.GcpPubsub.GetArgs().SubscriptionId == "" {
operationName = "relay-unknown-subid"
} else {
operationName = "relay-" + relayOpts.GcpPubsub.GetArgs().SubscriptionId
}
}

resp := sc.Process(ctx, &sdk.ProcessRequest{
ComponentName: "gcp-pubsub",
OperationType: sdk.OperationTypeConsumer,
OperationName: operationName,
Data: msg.Data,
})

if resp.Status == sdk.ExecStatusError {
wrappedErr := fmt.Errorf("unable to process message via streamdal: %v", resp.StatusMessage)

prometheus.IncrPromCounter("plumber_sdk_errors", 1)
util.WriteError(llog, errorCh, wrappedErr)

return
}

// Update msg value with processed data
msg.Data = resp.Data
}
// streamdal sdk END

g.log.Debug("Writing message to relay channel")

relayCh <- &types.RelayMessage{
Expand Down
36 changes: 0 additions & 36 deletions backends/kafka/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/streamdal/plumber/prometheus"
"github.com/streamdal/plumber/util"
"github.com/streamdal/plumber/validate"

sdk "github.com/streamdal/streamdal/sdks/go"
)

const (
Expand All @@ -36,14 +34,6 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh

defer reader.Close()

// streamdal sdk BEGIN
sc, err := util.SetupStreamdalSDK(relayOpts, k.log)
if err != nil {
return errors.Wrap(err, "kafka.Relay(): unable to create new streamdal client")
}
// defer sc.Close()
// streamdal sdk END

llog := k.log.WithFields(logrus.Fields{
"relay-id": relayOpts.XRelayId,
"backend": "kafka",
Expand Down Expand Up @@ -74,32 +64,6 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh

prometheus.Incr("kafka-relay-consumer", 1)

// streamdal sdk BEGIN
// If streamdal integration is enabled, process message via sdk
if sc != nil {
k.log.Debug("Processing message via streamdal SDK")

resp := sc.Process(ctx, &sdk.ProcessRequest{
ComponentName: "kafka",
OperationType: sdk.OperationTypeConsumer,
OperationName: "relay",
Data: msg.Value,
})

if resp.Status == sdk.ExecStatusError {
wrappedErr := fmt.Errorf("unable to process message via streamdal: %v", resp.StatusMessage)

prometheus.IncrPromCounter("plumber_sdk_errors", 1)
util.WriteError(llog, errorCh, wrappedErr)

continue
}

// Update msg value with processed data
msg.Value = resp.Data
}
// streamdal sdk END

k.log.Debugf("Writing Kafka message to relay channel: %s", msg.Value)

relayCh <- &types.RelayMessage{
Expand Down
154 changes: 0 additions & 154 deletions backends/streamdal/auth.go

This file was deleted.

63 changes: 0 additions & 63 deletions backends/streamdal/auth_test.go

This file was deleted.

Loading

0 comments on commit 7086a38

Please sign in to comment.