diff --git a/go.mod b/go.mod index f7c7e1da0c..f3a9b25e10 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ replace github.com/99designs/keyring => github.com/Jeffail/keyring v1.2.3 require ( cloud.google.com/go/bigquery v1.59.0 + cloud.google.com/go/errorreporting v0.3.0 cloud.google.com/go/pubsub v1.36.1 cloud.google.com/go/storage v1.37.0 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 diff --git a/go.sum b/go.sum index 5b8bd10a87..8f6d071c17 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ cloud.google.com/go/datacatalog v1.19.3 h1:A0vKYCQdxQuV4Pi0LL9p39Vwvg4jH5yYveMv5 cloud.google.com/go/datacatalog v1.19.3/go.mod h1:ra8V3UAsciBpJKQ+z9Whkxzxv7jmQg1hfODr3N3YPJ4= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= +cloud.google.com/go/errorreporting v0.3.0 h1:kj1XEWMu8P0qlLhm3FwcaFsUvXChV/OraZwA70trRR0= +cloud.google.com/go/errorreporting v0.3.0/go.mod h1:xsP2yaAp+OAW4OIm60An2bbLpqIhKXdWR/tawvl7QzU= cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc= cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI= cloud.google.com/go/kms v1.15.5 h1:pj1sRfut2eRbD9pFRjNnPNg/CzJPuQAzUujMIM1vVeM= diff --git a/internal/impl/gcp/processor_errorreporting.go b/internal/impl/gcp/processor_errorreporting.go new file mode 100644 index 0000000000..fa5387cf1e --- /dev/null +++ b/internal/impl/gcp/processor_errorreporting.go @@ -0,0 +1,117 @@ +package gcp + +import ( + "context" + "errors" + + "cloud.google.com/go/errorreporting" + "github.com/redpanda-data/benthos/v4/public/service" +) + +var ( + _ service.Processor = (*errorReportingProcessor)(nil) +) + +func init() { + err := service.RegisterProcessor( + "gcp_errorreporting", newErrorReportingProcessorConfig(), + newErrorReportingProcessor, + ) + if err != nil { + panic(err) + } +} + +type errorReportingProcessor struct { + c *errorreporting.Client + project string +} + +func newErrorReportingProcessorConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Beta(). + Summary("Report an individual error event and record the event to a log."). + Field(service.NewStringField("project"). + Description("GCP project where the query job will execute.")). + Field(service.NewObjectField("service_context", + service.NewStringField("name"). + Description("Name identifies the running program and is included in the error reports"), + service.NewStringField("version"). + Description("Version identifies the version of the running program and is included in the error reports.").Optional(), + ). + Description("The service context in which this error has occurred."). + Optional(), + ). + Field(service.NewStringField("message"). + Description("The error message.")) +} + +func newErrorReportingProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + + project, err := conf.FieldString("project") + if err != nil { + panic(err) + } + ctx := context.Background() + + serviceName, err := conf.FieldString("service_context.name") + if err != nil { + panic(err) + } + + serviceVersion, err := conf.FieldString("service_context.version") + if err != nil { + panic(err) + } + + cfg := errorreporting.Config{ + ServiceName: serviceName, + ServiceVersion: serviceVersion, + } + + c, err := errorreporting.NewClient(ctx, project, cfg) + if err != nil { + return nil, err + } + return &errorReportingProcessor{ + c: c, + project: project, + }, nil +} + +func (p *errorReportingProcessor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) { + + s, err := msg.AsStructured() + if err != nil { + return nil, err + } + + m, _ := s.(map[string]any) + if m == nil { + return nil, errors.New("not provided expected struct to ErrorReporting") + } + + errAny := m["message"] + if errAny == "" { + return nil, errors.New("not provided expected error message") + } + + errMsg, _ := errAny.(string) + if errMsg == "" { + return nil, errors.New("error message in wrong type") + } + + userAny := m["user"] + userString, _ := userAny.(string) + + p.c.Report(errorreporting.Entry{ + Error: errors.New(errMsg), + User: userString, + }) + + return service.MessageBatch{msg}, nil +} + +func (p *errorReportingProcessor) Close(context.Context) error { + return p.c.Close() +} diff --git a/internal/impl/gcp/processor_errorreporting_test.go b/internal/impl/gcp/processor_errorreporting_test.go new file mode 100644 index 0000000000..67580e41e3 --- /dev/null +++ b/internal/impl/gcp/processor_errorreporting_test.go @@ -0,0 +1 @@ +package gcp