diff --git a/internal/impl/sftp/input.go b/internal/impl/sftp/input.go index 52c7bc3fbc..e980616b0a 100644 --- a/internal/impl/sftp/input.go +++ b/internal/impl/sftp/input.go @@ -187,7 +187,6 @@ func (s *sftpReader) Connect(ctx context.Context) (err error) { if s.scanner, err = s.scannerCtor.Create(file, func(ctx context.Context, aErr error) (outErr error) { _ = s.pathProvider.Ack(ctx, nextPath, aErr) if aErr != nil { - s.log.Errorf("skipping delete on finish: %s", aErr) return nil } if s.deleteOnFinish { @@ -223,7 +222,7 @@ func (s *sftpReader) Connect(ctx context.Context) (err error) { return } -func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPath string, skip bool, err error) { +func (s *sftpReader) initState(ctx context.Context) (client *sftp.Client, pathProvider pathProvider, skip bool, err error) { s.scannerMut.Lock() defer s.scannerMut.Unlock() @@ -242,11 +241,22 @@ func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPat s.pathProvider = s.getFilePathProvider(ctx) } + return s.client, s.pathProvider, false, nil +} + +func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPath string, skip bool, err error) { + client, pathProvider, skip, err := s.initState(ctx) + if err != nil || skip { + return + } + for { - if nextPath, err = s.pathProvider.Next(ctx, s.client); err != nil { + if nextPath, err = pathProvider.Next(ctx, client); err != nil { if errors.Is(err, sftp.ErrSshFxConnectionLost) { - _ = s.client.Close() + _ = client.Close() + s.scannerMut.Lock() s.client = nil + s.scannerMut.Unlock() return } if errors.Is(err, errEndOfPaths) { @@ -255,21 +265,23 @@ func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPat return } - if file, err = s.client.Open(nextPath); err != nil { + if file, err = client.Open(nextPath); err != nil { if errors.Is(err, sftp.ErrSshFxConnectionLost) { - _ = s.client.Close() + _ = client.Close() + s.scannerMut.Lock() s.client = nil + s.scannerMut.Unlock() } s.log.With("path", nextPath, "err", err.Error()).Warn("Unable to open previously identified file") if os.IsNotExist(err) { // If we failed to open the file because it no longer exists // then we can "ack" the path as we're done with it. - _ = s.pathProvider.Ack(ctx, nextPath, nil) + _ = pathProvider.Ack(ctx, nextPath, nil) } else { // Otherwise we "nack" it with the error as we'll want to // reprocess it again later. - _ = s.pathProvider.Ack(ctx, nextPath, err) + _ = pathProvider.Ack(ctx, nextPath, err) } } else { return @@ -310,9 +322,7 @@ func (s *sftpReader) ReadBatch(ctx context.Context) (service.MessageBatch, servi part.MetaSetMut("sftp_path", currentPath) } - return parts, func(ctx context.Context, res error) error { - return codecAckFn(ctx, res) - }, nil + return parts, codecAckFn, nil } func (s *sftpReader) Close(ctx context.Context) error { diff --git a/internal/impl/sftp/integration_test.go b/internal/impl/sftp/integration_test.go index 8185c11604..83c2951b68 100644 --- a/internal/impl/sftp/integration_test.go +++ b/internal/impl/sftp/integration_test.go @@ -15,13 +15,18 @@ package sftp import ( + "context" + "errors" + "fmt" "io/fs" "os" "strings" + "sync" "testing" "time" "github.com/ory/dockertest/v3" + "github.com/pkg/sftp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -110,6 +115,13 @@ func TestIntegrationSFTPDeleteOnFinish(t *testing.T) { resource := setupDockerPool(t) + client, err := getClient(resource) + require.NoError(t, err) + + writeSFTPFile(t, client, "/upload/1.txt", "data-1") + writeSFTPFile(t, client, "/upload/2.txt", "data-2") + writeSFTPFile(t, client, "/upload/3.txt", "data-3") + config := ` output: drop: {} @@ -135,22 +147,51 @@ cache_resources: default_ttl: 900s ` config = strings.NewReplacer( - "PORT", resource.GetPort("22/tcp"), + "$PORT", resource.GetPort("22/tcp"), ).Replace(config) - env := service.NewEnvironment() - parsedConfig, err := sftpInputSpec().ParseYAML(config, env) - require.NoError(t, err) - - reader, err := newSFTPReaderFromParsed(parsedConfig, service.MockResources()) + var receivedPathsMut sync.Mutex + var receivedPaths []string + + builder := service.NewStreamBuilder() + require.NoError(t, builder.SetYAML(config)) + require.NoError(t, builder.AddConsumerFunc(func(_ context.Context, msg *service.Message) error { + receivedPathsMut.Lock() + defer receivedPathsMut.Unlock() + path, ok := msg.MetaGet("sftp_path") + if !ok { + return errors.New("sftp_path metadata not found") + } + receivedPaths = append(receivedPaths, path) + return nil + })) + stream, err := builder.Build() require.NoError(t, err) - // TODO: what do I do here to drive the input - // reader.Connect - _ = reader + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + runErr := make(chan error) + go func() { runErr <- stream.Run(ctx) }() + defer func() { + err := <-runErr + require.NoError(t, err, "stream.Run() failed") + }() + + // require.EventuallyWithT(t, func(c *assert.CollectT) { + // receivedPathsMut.Lock() + // defer receivedPathsMut.Unlock() + // require.Len(c, receivedPaths, 3) + + // files, err := client.Glob("/upload/*.txt") + // require.NoError(c, err) + // require.Empty(c, files) + // }, time.Second*1000, time.Millisecond*100) } func setupDockerPool(t *testing.T) *dockertest.Resource { + t.Helper() + pool, err := dockertest.NewPool("") require.NoError(t, err) @@ -165,24 +206,36 @@ func setupDockerPool(t *testing.T) *dockertest.Resource { }) require.NoError(t, err) t.Cleanup(func() { + fmt.Println("============ PURGE TIME =========") assert.NoError(t, pool.Purge(resource)) }) _ = resource.Expire(900) - creds := credentials{ - Username: sftpUsername, - Password: sftpPassword, - } - // wait for server to be ready to accept connections require.NoError(t, pool.Retry(func() error { - _, err = creds.GetClient(&osPT{}, "localhost:"+resource.GetPort("22/tcp")) + _, err = getClient(resource) return err })) return resource } +func getClient(resource *dockertest.Resource) (*sftp.Client, error) { + creds := credentials{ + Username: sftpUsername, + Password: sftpPassword, + } + return creds.GetClient(&osPT{}, "localhost:"+resource.GetPort("22/tcp")) +} + +func writeSFTPFile(t *testing.T, client *sftp.Client, path, data string) { + t.Helper() + file, err := client.Create(path) + require.NoError(t, err, "creating file") + defer file.Close() + _, err = fmt.Fprint(file, data, "writing file contents") + require.NoError(t, err) +} type osPT struct{}