Skip to content

Commit

Permalink
Merge pull request #2650 from redpanda-data/lambda-serverless
Browse files Browse the repository at this point in the history
Add lambda binaries back into releases
  • Loading branch information
Jeffail authored Jun 13, 2024
2 parents dcbe9cb + 0407480 commit 6d472c0
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 22 deletions.
44 changes: 22 additions & 22 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ builds:
-X main.Version={{.Version}}
-X main.DateBuilt={{.Date}}
-X main.BinaryName=redpanda-connect
# - id: connect-lambda
# main: cmd/serverless/connect-lambda/main.go
# binary: redpanda-connect-lambda
# env:
# - CGO_ENABLED=0
# goos: [ linux ]
# goarch: [ amd64 ]
# - id: connect-lambda-al2
# main: cmd/serverless/connect-lambda/main.go
# binary: bootstrap
# env:
# - CGO_ENABLED=0
# goos: [ linux ]
# goarch: [ amd64, arm64 ]
- id: connect-lambda
main: cmd/serverless/connect-lambda/main.go
binary: redpanda-connect-lambda
env:
- CGO_ENABLED=0
goos: [ linux ]
goarch: [ amd64 ]
- id: connect-lambda-al2
main: cmd/serverless/connect-lambda/main.go
binary: bootstrap
env:
- CGO_ENABLED=0
goos: [ linux ]
goarch: [ amd64, arm64 ]
archives:
- id: connect
builds: [ connect ]
Expand All @@ -50,14 +50,14 @@ archives:
- README.md
- CHANGELOG.md
- licenses
# - id: connect-lambda
# builds: [ connect-lambda ]
# format: zip
# name_template: "{{ .Binary }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}"
# - id: connect-lambda-al2
# builds: [ connect-lambda-al2 ]
# format: zip
# name_template: "connect-lambda-al2_{{ .Version }}_{{ .Os }}_{{ .Arch }}"
- id: connect-lambda
builds: [ connect-lambda ]
format: zip
name_template: "{{ .Binary }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}"
- id: connect-lambda-al2
builds: [ connect-lambda-al2 ]
format: zip
name_template: "connect-lambda-al2_{{ .Version }}_{{ .Os }}_{{ .Arch }}"
dist: target/dist
release:
github:
Expand Down
12 changes: 12 additions & 0 deletions cmd/serverless/connect-lambda/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import (
"github.com/redpanda-data/connect/v4/internal/impl/aws"

// Import all plugins defined within the repo.
_ "github.com/redpanda-data/connect/v4/public/components/all"
)

func main() {
aws.RunLambda()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/PaesslerAG/gval v1.2.2
github.com/PaesslerAG/jsonpath v0.1.1
github.com/apache/pulsar-client-go v0.12.0
github.com/aws/aws-lambda-go v1.47.0
github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2/config v1.26.6
github.com/aws/aws-sdk-go-v2/credentials v1.16.16
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dE
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.4 h1:Xqf+7f2Vhl9tsqDYmXhnXInUdcrtgpRNpIA15/uldSc=
github.com/armon/go-metrics v0.3.4/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1sXVI=
github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v1.7.1/go.mod h1:L5LuPC1ZgDr2xQS7AmIec/Jlc7O/Y1u2KxJyNVab250=
Expand Down
74 changes: 74 additions & 0 deletions internal/impl/aws/lambda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package aws

import (
"context"
"fmt"
"os"
"time"

"github.com/aws/aws-lambda-go/lambda"

"github.com/redpanda-data/connect/v4/internal/serverless"
)

var handler *serverless.Handler

// RunLambda executes Benthos as an AWS Lambda function. Configuration can be
// stored within the environment variable CONNECT_CONFIG.
func RunLambda() {
// A list of default config paths to check for if not explicitly defined
defaultPaths := []string{
"./redpanda-connect.yaml",
"/redpanda-connect.yaml",
"/etc/redpanda-connect/config.yaml",
"/etc/redpanda-connect.yaml",

"./connect.yaml",
"/connect.yaml",
"/etc/connect/config.yaml",
"/etc/connect.yaml",

"./benthos.yaml",
"./config.yaml",
"/benthos.yaml",
"/etc/benthos/config.yaml",
"/etc/benthos.yaml",
}
if path := os.Getenv("BENTHOS_CONFIG_PATH"); path != "" {
defaultPaths = append([]string{path}, defaultPaths...)
}
if path := os.Getenv("CONNECT_CONFIG_PATH"); path != "" {
defaultPaths = append([]string{path}, defaultPaths...)
}

confStr := os.Getenv("BENTHOS_CONFIG")
if confStr == "" {
confStr = os.Getenv("CONNECT_CONFIG")
}

if confStr == "" {
// Iterate default config paths
for _, path := range defaultPaths {
if confBytes, err := os.ReadFile(path); err == nil {
confStr = string(confBytes)
break
}
}
}

var err error
if handler, err = serverless.NewHandler(confStr); err != nil {
fmt.Fprintf(os.Stderr, "Initialisation error: %v\n", err)
os.Exit(1)
}

lambda.Start(handler.Handle)

ctx, done := context.WithTimeout(context.Background(), time.Second*30)
defer done()

if err = handler.Close(ctx); err != nil {
fmt.Fprintf(os.Stderr, "Shut down error: %v\n", err)
os.Exit(1)
}
}
118 changes: 118 additions & 0 deletions internal/serverless/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package serverless

import (
"context"
"fmt"

"github.com/redpanda-data/benthos/v4/public/service"
)

// Handler provides a mechanism for controlling the lifetime of a serverless
// handler runtime of Redpanda Connect.
type Handler struct {
prodFn service.MessageHandlerFunc
strm *service.Stream
}

// NewHandler creates a new serverless stream handler, where the provided config
// is used in order to determine the behaviour of the pipeline.
func NewHandler(confYAML string) (*Handler, error) {
env := service.GlobalEnvironment()
schema := env.FullConfigSchema("", "")
schema.SetFieldDefault(map[string]any{
"none": map[string]any{},
}, "metrics")
schema.SetFieldDefault("json", "logger", "format")
schema.SetFieldDefault(map[string]any{
"inproc": "____ignored",
}, "input")
schema.SetFieldDefault(map[string]any{
"switch": map[string]any{
"retry_until_success": false,
"cases": []any{
map[string]any{
"check": "errored()",
"output": map[string]any{
"reject": "processing failed due to: ${! error() }",
},
},
map[string]any{
"output": map[string]any{
"sync_response": map[string]any{},
},
},
},
},
}, "output")

strmBuilder := env.NewStreamBuilder()
strmBuilder.SetSchema(schema)

if err := strmBuilder.SetYAML(confYAML); err != nil {
return nil, err
}

prod, err := strmBuilder.AddProducerFunc()
if err != nil {
return nil, err
}

strm, err := strmBuilder.Build()
if err != nil {
return nil, err
}

go func() {
_ = strm.Run(context.Background())
}()

return &Handler{
prodFn: prod,
strm: strm,
}, nil
}

// Close shuts down the underlying pipeline.
func (h *Handler) Close(ctx context.Context) error {
return h.strm.Stop(ctx)
}

// Handle is a request/response func that injects a payload into the underlying
// Benthos pipeline and returns a result.
func (h *Handler) Handle(ctx context.Context, v any) (any, error) {
msg := service.NewMessage(nil)
msg.SetStructured(v)

msg, store := msg.WithSyncResponseStore()

if err := h.prodFn(ctx, msg); err != nil {
return nil, err
}

resultBatches := store.Read()

anyResults := make([][]any, len(resultBatches))
for i, batch := range resultBatches {
batchResults := make([]any, len(batch))
for j, p := range batch {
var merr error
if batchResults[j], merr = p.AsStructured(); merr != nil {
return nil, fmt.Errorf("failed to process result batch '%v': failed to marshal json response: %v", i, merr)
}
}
anyResults[i] = batchResults
}

if len(anyResults) == 1 {
if len(anyResults[0]) == 1 {
return anyResults[0][0], nil
}
return anyResults[0], nil
}

genBatchOfBatches := make([]any, len(anyResults))
for i, b := range anyResults {
genBatchOfBatches[i] = b
}
return genBatchOfBatches, nil
}
35 changes: 35 additions & 0 deletions internal/serverless/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package serverless_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/connect/v4/internal/serverless"

_ "github.com/redpanda-data/connect/v4/public/components/pure"
)

func TestServerlessHandlerDefaults(t *testing.T) {
h, err := serverless.NewHandler(`
pipeline:
processors:
- mapping: 'root = content().uppercase()'
logger:
level: NONE
`)
require.NoError(t, err)

ctx, done := context.WithTimeout(context.Background(), time.Second*5)
defer done()

res, err := h.Handle(ctx, "hello world")
require.NoError(t, err)

assert.Equal(t, "HELLO WORLD", res)

require.NoError(t, h.Close(ctx))
}

0 comments on commit 6d472c0

Please sign in to comment.