Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding GCS output and fixing linting issues for few outputs #46

Merged
merged 9 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ This project adheres to [Semantic Versioning](http://semver.org/).

### Removed

## [0.9.0]

### Added

- Add GCS output support: [#46](https://github.com/elastic/stream/pull/46)

## [0.8.0]

### Added
Expand Down
6 changes: 6 additions & 0 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

// Register outputs.
_ "github.com/elastic/stream/pkg/output/gcppubsub"
_ "github.com/elastic/stream/pkg/output/gcs"
_ "github.com/elastic/stream/pkg/output/kafka"
_ "github.com/elastic/stream/pkg/output/lumberjack"
_ "github.com/elastic/stream/pkg/output/tcp"
Expand Down Expand Up @@ -81,6 +82,11 @@ func ExecuteContext(ctx context.Context) error {
// Kafka Pubsub output flags.
rootCmd.PersistentFlags().StringVar(&opts.KafkaOptions.Topic, "kafka-topic", "test", "Kafka topic name")

// GCS output flags.
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.Bucket, "gcs-bucket", "testbucket", "GCS Bucket name")
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.Object, "gcs-object", "testobject", "GCS Object name")
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.ProjectID, "gcs-projectid", "testproject", "GCS Project name")

// Lumberjack output flags.
rootCmd.PersistentFlags().BoolVar(&opts.LumberjackOptions.ParseJSON, "lumberjack-parse-json", false, "Parse the input data as JSON and send the structured data as a Lumberjack batch.")

Expand Down
26 changes: 15 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
cloud.google.com/go/pubsub v1.25.1
cloud.google.com/go/storage v1.28.0
github.com/Shopify/sarama v1.36.0
github.com/elastic/go-concert v0.2.0
github.com/elastic/go-lumber v0.1.2-0.20220819171948-335fde24ea0f
Expand All @@ -18,14 +19,15 @@ require (
go.uber.org/zap v1.23.0
golang.org/x/sys v0.0.0-20220913175220-63ea55921009
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9
google.golang.org/api v0.95.0
google.golang.org/api v0.102.0
gotest.tools v2.2.0+incompatible
)

require (
cloud.google.com/go v0.104.0 // indirect
cloud.google.com/go/compute v1.9.0 // indirect
cloud.google.com/go/iam v0.4.0 // indirect
cloud.google.com/go/compute v1.12.1 // indirect
cloud.google.com/go/compute/metadata v0.2.1 // indirect
cloud.google.com/go/iam v0.5.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
Expand All @@ -45,8 +47,9 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
Expand Down Expand Up @@ -76,13 +79,14 @@ require (
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/net v0.0.0-20220909164309-bea034e7d591 // indirect
golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1 // indirect
golang.org/x/sync v0.0.0-20220907140024-f12130a52804 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220913154956-18f8339a66a5 // indirect
google.golang.org/grpc v1.49.0 // indirect
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e // indirect
google.golang.org/grpc v1.50.1 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
299 changes: 31 additions & 268 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/output/gcppubsub/gcppubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"io"
"net/http"
"os"

Expand Down Expand Up @@ -55,7 +55,7 @@ func (o *Output) DialContext(ctx context.Context) error {
}
defer resp.Body.Close()

_, err = ioutil.ReadAll(resp.Body)
_, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/output/gcppubsub/gcppubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"log"
"net/http"
"os"
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestMain(m *testing.M) {
}
defer resp.Body.Close()

_, err = ioutil.ReadAll(resp.Body)
_, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
Expand Down
86 changes: 86 additions & 0 deletions pkg/output/gcs/gcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package gcs

import (
"context"
"errors"
"fmt"
"os"

"cloud.google.com/go/storage"

"github.com/elastic/stream/pkg/output"
)

func init() {
output.Register("gcs", New)
}

type Output struct {
opts *output.Options
client *storage.Client
writer *storage.Writer
cancelFunc func()
}

func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("google cloud address is required")
}
// https://cloud.google.com/go/docs/reference/cloud.google.com/go/storage/latest#hdr-Creating_a_Client
// This is required to override the client to use localhost instead, has to be set before creating the client
os.Setenv("STORAGE_EMULATOR_HOST", opts.Addr)
defer os.Unsetenv("STORAGE_EMULATOR_HOST")

ctx, cancel := context.WithCancel(context.Background())
gcsClient, err := storage.NewClient(ctx)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create gcs client: %w", err)
}
obj := gcsClient.Bucket(opts.GcsOptions.Bucket).Object(opts.GcsOptions.Object)
writer := obj.NewWriter(ctx)

return &Output{opts: opts, client: gcsClient, cancelFunc: cancel, writer: writer}, nil
}

func (o *Output) DialContext(ctx context.Context) error {
if err := o.createBucket(ctx); err != nil {
return err
}
return nil
}

func (o *Output) Close() error {
if err := o.writer.Close(); err != nil {
return err
}
if err := o.client.Close(); err != nil {
return err
}
o.cancelFunc()
return nil
}

func (o *Output) Write(b []byte) (int, error) {
if _, err := o.writer.Write(b); err != nil {
return 0, fmt.Errorf("failed to copy data: %w", err)
}

return len(b), nil
}

func (o *Output) createBucket(ctx context.Context) error {
bkt := o.client.Bucket(o.opts.GcsOptions.Bucket)
_, err := bkt.Attrs(ctx)
if errors.Is(err, storage.ErrBucketNotExist) {
err = bkt.Create(ctx, o.opts.GcsOptions.ProjectID, nil)
if err != nil {
return fmt.Errorf("failed to create Bucket: %w", err)
}
return nil
}
return nil
}
139 changes: 139 additions & 0 deletions pkg/output/gcs/gcs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

package gcs

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"testing"

"cloud.google.com/go/storage"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"
"gotest.tools/assert"

"github.com/elastic/stream/pkg/output"
)

const (
emulatorHost = "localhost"
emulatorPort = "4443"
bucket = "testbucket"
objectname = "testobject"
)

var emulatorHostAndPort = fmt.Sprintf("http://%s", net.JoinHostPort(emulatorHost, emulatorPort))

func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "fsouza/fake-gcs-server",
Tag: "latest",
Cmd: []string{"-host=0.0.0.0", "-public-host=localhost", fmt.Sprintf("-port=%s", emulatorPort), "-scheme=http"},
PortBindings: map[docker.Port][]docker.PortBinding{
emulatorPort: {{HostIP: emulatorHost, HostPort: emulatorPort}},
},
ExposedPorts: []string{emulatorPort},
}, func(config *docker.HostConfig) {
// set AutoRemove to true so that stopped container goes away by itself
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
if err != nil {
log.Fatalf("Could not start resource: %s", err)
}

if err := pool.Retry(func() error {
// Disable HTTP keep-alives to ensure no extra goroutines hang around.
httpClient := http.Client{Transport: &http.Transport{DisableKeepAlives: true}}

// Sanity check the emulator.
resp, err := httpClient.Get(fmt.Sprintf("http://%s:%s/storage/v1/b", emulatorHost, emulatorPort))
if err != nil {
return err
}
defer resp.Body.Close()

_, err = io.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Println(resp.StatusCode)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %v", resp.StatusCode)
}
return err
}); err != nil {
_ = pool.Purge(resource)
log.Fatalf("Could not connect to the gcs instance: %s", err)
}

code := m.Run()

_ = pool.Purge(resource)

os.Exit(code)
}

func TestGcs(t *testing.T) {
out, err := New(&output.Options{
Addr: emulatorHostAndPort,
GcsOptions: output.GcsOptions{
Bucket: bucket,
Object: objectname,
},
})
require.NoError(t, err)

err = out.DialContext(context.Background())
require.NoError(t, err)

event := map[string]interface{}{
"message": "hello world!",
}
data, err := json.Marshal(event)
require.NoError(t, err)

n, err := out.Write(data)
require.NoError(t, err)
assert.Equal(t, len(data), n)

// Need to close the Writer for the Object to be created in the Bucket.
out.Close()

// https://cloud.google.com/go/docs/reference/cloud.google.com/go/storage/latest#hdr-Creating_a_Client
// This is required to override the client to use localhost instead, has to be set before creating the client
os.Setenv("STORAGE_EMULATOR_HOST", emulatorHostAndPort)
defer os.Unsetenv("STORAGE_EMULATOR_HOST")

ctx, cancel := context.WithCancel(context.Background())
client, err := storage.NewClient(ctx)
require.NoError(t, err)
t.Cleanup(func() { _ = client.Close() })
t.Cleanup(cancel)

o := client.Bucket(bucket).Object(objectname)
r, err := o.NewReader(ctx)
require.NoError(t, err)

body, err := io.ReadAll(r)
require.NoError(t, err)
defer r.Close()

assert.Equal(t, string(data), string(body))
}
7 changes: 7 additions & 0 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Options struct {
GCPPubsubOptions
KafkaOptions
LumberjackOptions
GcsOptions
}

type WebhookOptions struct {
Expand All @@ -43,3 +44,9 @@ type KafkaOptions struct {
type LumberjackOptions struct {
ParseJSON bool // Parse the input bytes as JSON and send structured data. By default, input bytes are sent in a 'message' field.
}

type GcsOptions struct {
ProjectID string // Project ID, needs to be unique with multiple buckets of the same name.
Bucket string // Bucket name. Will create it if do not exist.
Object string // Name of the object created inside the related Bucket.
}
4 changes: 2 additions & 2 deletions pkg/output/webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package webhook
import (
"context"
"encoding/json"
"io/ioutil"
"io"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -37,7 +37,7 @@ func TestWebhook(t *testing.T) {
if r.Method == http.MethodPost {
assert.Equal(t, contentType, r.Header.Get("Content-Type"))

data, err := ioutil.ReadAll(r.Body)
data, err := io.ReadAll(r.Body)
require.NoError(t, err)

var event map[string]string
Expand Down