Skip to content

Commit

Permalink
test(sftp): add test for delete-on-finish bug
Browse files Browse the repository at this point in the history
This integration test makes sure that when `delete_on_finish` is true
and watching is enabled that we delete every file.
  • Loading branch information
ooesili committed Dec 3, 2024
1 parent 18b29aa commit ab133f4
Showing 1 changed file with 135 additions and 28 deletions.
163 changes: 135 additions & 28 deletions internal/impl/sftp/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@
package sftp

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/pkg/sftp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/benthos/v4/public/service/integration"

// Bring in memory cache.
Expand All @@ -39,34 +46,7 @@ func TestIntegrationSFTP(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

pool.MaxWait = time.Second * 30
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "atmoz/sftp",
Tag: "alpine",
Cmd: []string{
// https://github.com/atmoz/sftp/issues/401
"/bin/sh", "-c", "ulimit -n 65535 && exec /entrypoint " + sftpUsername + ":" + sftpPassword + ":1001:100:upload",
},
})
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

_ = resource.Expire(900)

creds := credentials{
Username: sftpUsername,
Password: sftpPassword,
}

require.NoError(t, pool.Retry(func() error {
_, err = creds.GetClient(&osPT{}, "localhost:"+resource.GetPort("22/tcp"))
return err
}))
resource := setupDockerPool(t)

t.Run("sftp", func(t *testing.T) {
template := `
Expand Down Expand Up @@ -129,6 +109,133 @@ cache_resources:
})
}

func TestIntegrationSFTPDeleteOnFinish(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

resource := setupDockerPool(t)

client, err := getClient(resource)
require.NoError(t, err)

writeSFTPFile(t, client, "/upload/1.txt", "data-1")
writeSFTPFile(t, client, "/upload/2.txt", "data-2")
writeSFTPFile(t, client, "/upload/3.txt", "data-3")

config := `
output:
drop: {}
input:
sftp:
address: localhost:$PORT
paths:
- /upload/*.txt
credentials:
username: foo
password: pass
delete_on_finish: true
watcher:
enabled: true
poll_interval: 100ms
cache: files_memory
cache_resources:
- label: files_memory
memory:
default_ttl: 900s
`
config = strings.NewReplacer(
"$PORT", resource.GetPort("22/tcp"),
).Replace(config)

var receivedPathsMut sync.Mutex
var receivedPaths []string

builder := service.NewStreamBuilder()
require.NoError(t, builder.SetYAML(config))
require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, msg *service.Message) error {
receivedPathsMut.Lock()
defer receivedPathsMut.Unlock()
path, ok := msg.MetaGet("sftp_path")
if !ok {
return errors.New("sftp_path metadata not found")
}
receivedPaths = append(receivedPaths, path)
return nil
}))
stream, err := builder.Build()
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
runErr := make(chan error)
go func() { runErr <- stream.Run(ctx) }()
defer func() {
cancel()
err := <-runErr
if err != context.Canceled {
require.NoError(t, err, "stream.Run() failed")
}
}()

require.EventuallyWithT(t, func(c *assert.CollectT) {
receivedPathsMut.Lock()
defer receivedPathsMut.Unlock()
assert.Len(c, receivedPaths, 3)

files, err := client.Glob("/upload/*.txt")
assert.NoError(c, err)
assert.Empty(c, files)
}, time.Second, time.Millisecond*100)
}

func setupDockerPool(t *testing.T) *dockertest.Resource {
t.Helper()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

pool.MaxWait = time.Second * 30
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "atmoz/sftp",
Tag: "alpine",
Cmd: []string{
// https://github.com/atmoz/sftp/issues/401
"/bin/sh", "-c", "ulimit -n 65535 && exec /entrypoint " + sftpUsername + ":" + sftpPassword + ":1001:100:upload",
},
})
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

_ = resource.Expire(900)

// wait for server to be ready to accept connections
require.NoError(t, pool.Retry(func() error {
_, err := getClient(resource)
return err
}))

return resource
}
func getClient(resource *dockertest.Resource) (*sftp.Client, error) {
creds := credentials{
Username: sftpUsername,
Password: sftpPassword,
}
return creds.GetClient(&osPT{}, "localhost:"+resource.GetPort("22/tcp"))
}

func writeSFTPFile(t *testing.T, client *sftp.Client, path, data string) {
t.Helper()
file, err := client.Create(path)
require.NoError(t, err, "creating file")
defer file.Close()
_, err = fmt.Fprint(file, data, "writing file contents")
require.NoError(t, err)
}

type osPT struct{}

func (o *osPT) Open(name string) (fs.File, error) {
Expand Down

0 comments on commit ab133f4

Please sign in to comment.