Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sftp): make sure to delete last file when watch and delete_on_finish are enabled #3037

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 53 additions & 40 deletions internal/impl/sftp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,62 @@ func newSFTPReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources)
}

func (s *sftpReader) Connect(ctx context.Context) (err error) {
file, nextPath, skip, err := s.seekNextPath(ctx)
if err != nil {
return err
}
if skip {
return nil
}

details := service.NewScannerSourceDetails()
details.SetName(nextPath)
if s.scanner, err = s.scannerCtor.Create(file, func(ctx context.Context, aErr error) (outErr error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the assignment of s.scanner needs to be under the mutex as well based on the ReadBatch function right?

_ = s.pathProvider.Ack(ctx, nextPath, aErr)
if aErr != nil {
s.log.Errorf("skipping delete on finish: %s", aErr)
return nil
}
if s.deleteOnFinish {
s.scannerMut.Lock()
client := s.client
if client == nil {
if client, outErr = s.creds.GetClient(s.mgr.FS(), s.address); outErr != nil {
outErr = fmt.Errorf("obtain private client: %w", outErr)
}
defer func() {
_ = client.Close()
}()
}
if outErr == nil {
if outErr = client.Remove(nextPath); outErr != nil {
outErr = fmt.Errorf("remove %v: %w", nextPath, outErr)
}
}
s.scannerMut.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we always return after this block we could defer this right? I just get worried if the unlock is not right after.

}
return
}, details); err != nil {
_ = file.Close()
_ = s.pathProvider.Ack(ctx, nextPath, err)
return err
}

s.scannerMut.Lock()
s.currentPath = nextPath
s.scannerMut.Unlock()

s.log.Debugf("Consuming from file '%v'", nextPath)
return
}

func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPath string, skip bool, err error) {
s.scannerMut.Lock()
defer s.scannerMut.Unlock()

if s.scanner != nil {
return nil
skip = true
Copy link
Collaborator

@rockwotj rockwotj Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we skip if there is a scanner (and what are we skipping)? Can you add a comment?

return
}

if s.client == nil {
Expand All @@ -191,8 +242,6 @@ func (s *sftpReader) Connect(ctx context.Context) (err error) {
s.pathProvider = s.getFilePathProvider(ctx)
}

var nextPath string
var file *sftp.File
for {
if nextPath, err = s.pathProvider.Next(ctx, s.client); err != nil {
if errors.Is(err, sftp.ErrSshFxConnectionLost) {
Expand Down Expand Up @@ -223,45 +272,9 @@ func (s *sftpReader) Connect(ctx context.Context) (err error) {
_ = s.pathProvider.Ack(ctx, nextPath, err)
}
} else {
break
}
}

details := service.NewScannerSourceDetails()
details.SetName(nextPath)
if s.scanner, err = s.scannerCtor.Create(file, func(ctx context.Context, aErr error) (outErr error) {
_ = s.pathProvider.Ack(ctx, nextPath, aErr)
if aErr != nil {
return nil
}
if s.deleteOnFinish {
s.scannerMut.Lock()
client := s.client
if client == nil {
if client, outErr = s.creds.GetClient(s.mgr.FS(), s.address); outErr != nil {
outErr = fmt.Errorf("obtain private client: %w", outErr)
}
defer func() {
_ = client.Close()
}()
}
if outErr == nil {
if outErr = client.Remove(nextPath); outErr != nil {
outErr = fmt.Errorf("remove %v: %w", nextPath, outErr)
}
}
s.scannerMut.Unlock()
return
}
return
}, details); err != nil {
_ = file.Close()
_ = s.pathProvider.Ack(ctx, nextPath, err)
return err
}
s.currentPath = nextPath

s.log.Debugf("Consuming from file '%v'", nextPath)
return
}

func (s *sftpReader) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) {
Expand Down