Skip to content

Commit b0c8885

Browse files
authoredJan 18, 2023
Adding GCS output and fixing linting issues for few outputs (#46)
* Adding GCS output * fixing linting issues * adding comments and checking some more returned errors * update changelog * update test_go with environment variable again * adding possibility to connect to public bucket (without credentials) and removing environment variables * go mod tidy * update readme
1 parent acd49f3 commit b0c8885

File tree

11 files changed

+309
-285
lines changed

11 files changed

+309
-285
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](http://semver.org/).
1616

1717
### Added
1818

19+
- Add GCS output support: [#46](https://github.com/elastic/stream/pull/46)
1920
- Added support for azure blob storage output: [#46](https://github.com/elastic/stream/pull/46)
2021

2122
## [0.8.0]

‎README.md

+15
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ stream is a test utility for streaming data via:
1212
- Kafka
1313
- [Lumberjack](#lumberjack-output-reference)
1414
- HTTP Mock Server
15+
- Azure Blob Storage
16+
- Google Cloud Storage
1517

1618
Input data can be read from:
1719

@@ -113,3 +115,16 @@ By default, Lumberjack batches contain one event with a `message` field.
113115

114116
If `--lumberjack-parse-json` is used then the input data is parsed as JSON
115117
and the resulting data is sent as a batch.
118+
119+
## GCS Output Reference
120+
121+
The GCS output is used to collect data from the configured source, create a GCS bucket, and populate it with the incoming data.
122+
When specifying a (`--addr`) which should be a combination of both host and port, usually pointing to a locally running emulator,
123+
the client will be overriding the configured API endpoint, which defaults to the public google storage API, towards the emulator instead.
124+
The emulator does not require authentication.
125+
126+
### Options
127+
128+
- `gcs-bucket`: The name of the GCS bucket that should be created, should not already exist.
129+
- `gcs-object`: The name of the GCS object that will be populated with the collected data, using the configured GCS bucket.
130+
- `gcs-projectid`: The related projectID used when creating the bucket, this is required to be changed from the default value when not using an emulator.

‎command/root.go

+6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
// Register outputs.
2929
_ "github.com/elastic/stream/pkg/output/azureblobstorage"
3030
_ "github.com/elastic/stream/pkg/output/gcppubsub"
31+
_ "github.com/elastic/stream/pkg/output/gcs"
3132
_ "github.com/elastic/stream/pkg/output/kafka"
3233
_ "github.com/elastic/stream/pkg/output/lumberjack"
3334
_ "github.com/elastic/stream/pkg/output/tcp"
@@ -87,6 +88,11 @@ func ExecuteContext(ctx context.Context) error {
8788
// Kafka Pubsub output flags.
8889
rootCmd.PersistentFlags().StringVar(&opts.KafkaOptions.Topic, "kafka-topic", "test", "Kafka topic name")
8990

91+
// GCS output flags.
92+
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.Bucket, "gcs-bucket", "testbucket", "GCS Bucket name")
93+
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.Object, "gcs-object", "testobject", "GCS Object name")
94+
rootCmd.PersistentFlags().StringVar(&opts.GcsOptions.ProjectID, "gcs-projectid", "testproject", "GCS Project name")
95+
9096
// Lumberjack output flags.
9197
rootCmd.PersistentFlags().BoolVar(&opts.LumberjackOptions.ParseJSON, "lumberjack-parse-json", false, "Parse the input data as JSON and send the structured data as a Lumberjack batch.")
9298

‎go.mod

+15-11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.19
44

55
require (
66
cloud.google.com/go/pubsub v1.25.1
7+
cloud.google.com/go/storage v1.28.0
78
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1
89
github.com/Shopify/sarama v1.36.0
910
github.com/elastic/go-concert v0.2.0
@@ -19,14 +20,15 @@ require (
1920
go.uber.org/zap v1.23.0
2021
golang.org/x/sys v0.0.0-20220913175220-63ea55921009
2122
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9
22-
google.golang.org/api v0.95.0
23+
google.golang.org/api v0.102.0
2324
gotest.tools v2.2.0+incompatible
2425
)
2526

2627
require (
2728
cloud.google.com/go v0.104.0 // indirect
28-
cloud.google.com/go/compute v1.9.0 // indirect
29-
cloud.google.com/go/iam v0.4.0 // indirect
29+
cloud.google.com/go/compute v1.12.1 // indirect
30+
cloud.google.com/go/compute/metadata v0.2.1 // indirect
31+
cloud.google.com/go/iam v0.5.0 // indirect
3032
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4 // indirect
3133
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
3234
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
@@ -48,8 +50,9 @@ require (
4850
github.com/golang/snappy v0.0.4 // indirect
4951
github.com/google/go-cmp v0.5.9 // indirect
5052
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
51-
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
52-
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
53+
github.com/google/uuid v1.3.0 // indirect
54+
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
55+
github.com/googleapis/gax-go/v2 v2.6.0 // indirect
5356
github.com/hashicorp/errwrap v1.1.0 // indirect
5457
github.com/hashicorp/go-multierror v1.1.1 // indirect
5558
github.com/hashicorp/go-uuid v1.0.3 // indirect
@@ -79,13 +82,14 @@ require (
7982
go.opencensus.io v0.23.0 // indirect
8083
go.uber.org/atomic v1.10.0 // indirect
8184
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
82-
golang.org/x/net v0.0.0-20220909164309-bea034e7d591 // indirect
83-
golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1 // indirect
84-
golang.org/x/sync v0.0.0-20220907140024-f12130a52804 // indirect
85-
golang.org/x/text v0.3.7 // indirect
85+
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect
86+
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
87+
golang.org/x/sync v0.1.0 // indirect
88+
golang.org/x/text v0.4.0 // indirect
89+
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
8690
google.golang.org/appengine v1.6.7 // indirect
87-
google.golang.org/genproto v0.0.0-20220913154956-18f8339a66a5 // indirect
88-
google.golang.org/grpc v1.49.0 // indirect
91+
google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e // indirect
92+
google.golang.org/grpc v1.50.1 // indirect
8993
google.golang.org/protobuf v1.28.1 // indirect
9094
gopkg.in/yaml.v2 v2.4.0 // indirect
9195
gopkg.in/yaml.v3 v3.0.1 // indirect

‎go.sum

+30-268
Large diffs are not rendered by default.

‎pkg/output/gcppubsub/gcppubsub.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10-
"io/ioutil"
10+
"io"
1111
"net/http"
1212
"os"
1313

@@ -55,7 +55,7 @@ func (o *Output) DialContext(ctx context.Context) error {
5555
}
5656
defer resp.Body.Close()
5757

58-
_, err = ioutil.ReadAll(resp.Body)
58+
_, err = io.ReadAll(resp.Body)
5959
if err != nil {
6060
return err
6161
}

‎pkg/output/gcppubsub/gcppubsub_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"context"
99
"encoding/json"
1010
"fmt"
11-
"io/ioutil"
11+
"io"
1212
"log"
1313
"net/http"
1414
"os"
@@ -69,7 +69,7 @@ func TestMain(m *testing.M) {
6969
}
7070
defer resp.Body.Close()
7171

72-
_, err = ioutil.ReadAll(resp.Body)
72+
_, err = io.ReadAll(resp.Body)
7373
if err != nil {
7474
return err
7575
}

‎pkg/output/gcs/gcs.go

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Licensed to Elasticsearch B.V. under one or more agreements.
2+
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
package gcs
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"net/url"
11+
12+
"cloud.google.com/go/storage"
13+
"google.golang.org/api/option"
14+
15+
"github.com/elastic/stream/pkg/output"
16+
)
17+
18+
func init() {
19+
output.Register("gcs", New)
20+
}
21+
22+
type Output struct {
23+
opts *output.Options
24+
client *storage.Client
25+
writer *storage.Writer
26+
cancelFunc func()
27+
}
28+
29+
func New(opts *output.Options) (output.Output, error) {
30+
gcsClient, ctx, cancel, err := NewClient(opts.Addr)
31+
if err != nil {
32+
return nil, err
33+
}
34+
obj := gcsClient.Bucket(opts.GcsOptions.Bucket).Object(opts.GcsOptions.Object)
35+
writer := obj.NewWriter(ctx)
36+
37+
return &Output{opts: opts, client: gcsClient, cancelFunc: cancel, writer: writer}, nil
38+
}
39+
40+
func (o *Output) DialContext(ctx context.Context) error {
41+
if err := o.createBucket(ctx); err != nil {
42+
return err
43+
}
44+
return nil
45+
}
46+
47+
func (o *Output) Close() error {
48+
if err := o.writer.Close(); err != nil {
49+
return err
50+
}
51+
if err := o.client.Close(); err != nil {
52+
return err
53+
}
54+
o.cancelFunc()
55+
return nil
56+
}
57+
58+
func (o *Output) Write(b []byte) (int, error) {
59+
if _, err := o.writer.Write(b); err != nil {
60+
return 0, fmt.Errorf("failed to copy data: %w", err)
61+
}
62+
63+
return len(b), nil
64+
}
65+
66+
func (o *Output) createBucket(ctx context.Context) error {
67+
bkt := o.client.Bucket(o.opts.GcsOptions.Bucket)
68+
_, err := bkt.Attrs(ctx)
69+
if errors.Is(err, storage.ErrBucketNotExist) {
70+
err = bkt.Create(ctx, o.opts.GcsOptions.ProjectID, nil)
71+
if err != nil {
72+
return fmt.Errorf("failed to create Bucket: %w", err)
73+
}
74+
return nil
75+
}
76+
return nil
77+
}
78+
79+
func NewClient(addr string) (gcsClient *storage.Client, ctx context.Context, cancel context.CancelFunc, err error) {
80+
ctx, cancel = context.WithCancel(context.Background())
81+
var h *url.URL
82+
if addr != "" {
83+
h, err = url.Parse(addr)
84+
if err != nil {
85+
return nil, nil, nil, err
86+
}
87+
h.Path = "storage/v1/"
88+
gcsClient, err = storage.NewClient(ctx, option.WithEndpoint(h.String()), option.WithoutAuthentication())
89+
} else {
90+
gcsClient, err = storage.NewClient(ctx)
91+
}
92+
if err != nil {
93+
cancel()
94+
return nil, nil, nil, fmt.Errorf("failed to create gcs client: %w", err)
95+
}
96+
97+
return gcsClient, ctx, cancel, nil
98+
}

‎pkg/output/gcs/gcs_test.go

+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Licensed to Elasticsearch B.V. under one or more agreements.
2+
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
package gcs
6+
7+
import (
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
"io"
12+
"log"
13+
"net"
14+
"net/http"
15+
"os"
16+
"testing"
17+
18+
"github.com/ory/dockertest/v3"
19+
"github.com/ory/dockertest/v3/docker"
20+
"github.com/stretchr/testify/require"
21+
"gotest.tools/assert"
22+
23+
"github.com/elastic/stream/pkg/output"
24+
)
25+
26+
const (
27+
emulatorHost = "localhost"
28+
emulatorPort = "4443"
29+
bucket = "testbucket"
30+
objectname = "testobject"
31+
)
32+
33+
var emulatorHostAndPort = fmt.Sprintf("http://%s", net.JoinHostPort(emulatorHost, emulatorPort))
34+
35+
func TestMain(m *testing.M) {
36+
pool, err := dockertest.NewPool("")
37+
if err != nil {
38+
log.Fatalf("Could not connect to docker: %s", err)
39+
}
40+
41+
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
42+
Repository: "fsouza/fake-gcs-server",
43+
Tag: "latest",
44+
Cmd: []string{"-host=0.0.0.0", "-public-host=localhost", fmt.Sprintf("-port=%s", emulatorPort), "-scheme=http"},
45+
PortBindings: map[docker.Port][]docker.PortBinding{
46+
emulatorPort: {{HostIP: emulatorHost, HostPort: emulatorPort}},
47+
},
48+
ExposedPorts: []string{emulatorPort},
49+
}, func(config *docker.HostConfig) {
50+
// set AutoRemove to true so that stopped container goes away by itself
51+
config.AutoRemove = true
52+
config.RestartPolicy = docker.RestartPolicy{
53+
Name: "no",
54+
}
55+
})
56+
if err != nil {
57+
log.Fatalf("Could not start resource: %s", err)
58+
}
59+
60+
if err := pool.Retry(func() error {
61+
// Disable HTTP keep-alives to ensure no extra goroutines hang around.
62+
httpClient := http.Client{Transport: &http.Transport{DisableKeepAlives: true}}
63+
64+
// Sanity check the emulator.
65+
resp, err := httpClient.Get(fmt.Sprintf("http://%s:%s/storage/v1/b", emulatorHost, emulatorPort))
66+
if err != nil {
67+
return err
68+
}
69+
defer resp.Body.Close()
70+
71+
_, err = io.ReadAll(resp.Body)
72+
if err != nil {
73+
return err
74+
}
75+
if resp.StatusCode != http.StatusOK {
76+
return fmt.Errorf("unexpected status code: %v", resp.StatusCode)
77+
}
78+
return err
79+
}); err != nil {
80+
_ = pool.Purge(resource)
81+
log.Fatalf("Could not connect to the gcs instance: %s", err)
82+
}
83+
84+
code := m.Run()
85+
86+
_ = pool.Purge(resource)
87+
88+
os.Exit(code)
89+
}
90+
91+
func TestGcs(t *testing.T) {
92+
out, err := New(&output.Options{
93+
Addr: emulatorHostAndPort,
94+
GcsOptions: output.GcsOptions{
95+
Bucket: bucket,
96+
Object: objectname,
97+
},
98+
})
99+
require.NoError(t, err)
100+
101+
err = out.DialContext(context.Background())
102+
require.NoError(t, err)
103+
104+
event := map[string]interface{}{
105+
"message": "hello world!",
106+
}
107+
data, err := json.Marshal(event)
108+
require.NoError(t, err)
109+
110+
n, err := out.Write(data)
111+
require.NoError(t, err)
112+
assert.Equal(t, len(data), n)
113+
114+
// Need to close the Writer for the Object to be created in the Bucket.
115+
out.Close()
116+
117+
gcsClient, ctx, cancel, err := NewClient(emulatorHostAndPort)
118+
require.NoError(t, err)
119+
t.Cleanup(func() { _ = gcsClient.Close() })
120+
t.Cleanup(cancel)
121+
122+
o := gcsClient.Bucket(bucket).Object(objectname)
123+
r, err := o.NewReader(ctx)
124+
require.NoError(t, err)
125+
126+
body, err := io.ReadAll(r)
127+
require.NoError(t, err)
128+
defer r.Close()
129+
130+
assert.Equal(t, string(data), string(body))
131+
}

‎pkg/output/options.go

+7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Options struct {
2121
KafkaOptions
2222
AzureBlobStorageOptions
2323
LumberjackOptions
24+
GcsOptions
2425
}
2526

2627
type WebhookOptions struct {
@@ -50,3 +51,9 @@ type AzureBlobStorageOptions struct {
5051
type LumberjackOptions struct {
5152
ParseJSON bool // Parse the input bytes as JSON and send structured data. By default, input bytes are sent in a 'message' field.
5253
}
54+
55+
type GcsOptions struct {
56+
ProjectID string // Project ID, needs to be unique with multiple buckets of the same name.
57+
Bucket string // Bucket name. Will create it if do not exist.
58+
Object string // Name of the object created inside the related Bucket.
59+
}

‎pkg/output/webhook/webhook_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ package webhook
77
import (
88
"context"
99
"encoding/json"
10-
"io/ioutil"
10+
"io"
1111
"net/http"
1212
"net/http/httptest"
1313
"testing"
@@ -37,7 +37,7 @@ func TestWebhook(t *testing.T) {
3737
if r.Method == http.MethodPost {
3838
assert.Equal(t, contentType, r.Header.Get("Content-Type"))
3939

40-
data, err := ioutil.ReadAll(r.Body)
40+
data, err := io.ReadAll(r.Body)
4141
require.NoError(t, err)
4242

4343
var event map[string]string

0 commit comments

Comments
 (0)
Please sign in to comment.