Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sftp): make sure to delete last file when watch and delete_on_finish are enabled #3037

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions internal/impl/sftp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ func (s *sftpReader) Connect(ctx context.Context) (err error) {
if s.scanner, err = s.scannerCtor.Create(file, func(ctx context.Context, aErr error) (outErr error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the assignment of s.scanner needs to be under the mutex as well based on the ReadBatch function right?

_ = s.pathProvider.Ack(ctx, nextPath, aErr)
if aErr != nil {
s.log.Errorf("skipping delete on finish: %s", aErr)
return nil
}
if s.deleteOnFinish {
Expand Down Expand Up @@ -223,7 +222,7 @@ func (s *sftpReader) Connect(ctx context.Context) (err error) {
return
}

func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPath string, skip bool, err error) {
func (s *sftpReader) initState(ctx context.Context) (client *sftp.Client, pathProvider pathProvider, skip bool, err error) {
s.scannerMut.Lock()
defer s.scannerMut.Unlock()

Expand All @@ -242,11 +241,22 @@ func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPat
s.pathProvider = s.getFilePathProvider(ctx)
}

return s.client, s.pathProvider, false, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a newcomer to this code this feels unsafe.

If the only important part of this code that all these variables are accessed/set together atomically, I do wonder if an atomic is better suited. You can use Swap to set the new value and destroy in Close, Store in Connect and Load in ReadBatch. I don't quite understand the higher level contract here of why it's only required that they are accessed concurrently and we don't have to worry about Close clobbering something ongoing in Connect or ReadBatch.

Copy link
Collaborator

@rockwotj rockwotj Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And when I talk about using atomics I mean using typed.AtomicValue in our typed package in internal/typed and wrapping all this state into a struct so it becomes typed.AtomicValue[*sftpReaderState]

}

func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPath string, skip bool, err error) {
client, pathProvider, skip, err := s.initState(ctx)
if err != nil || skip {
return
}

for {
if nextPath, err = s.pathProvider.Next(ctx, s.client); err != nil {
if nextPath, err = pathProvider.Next(ctx, client); err != nil {
if errors.Is(err, sftp.ErrSshFxConnectionLost) {
_ = s.client.Close()
_ = client.Close()
s.scannerMut.Lock()
s.client = nil
s.scannerMut.Unlock()
return
}
if errors.Is(err, errEndOfPaths) {
Expand All @@ -255,21 +265,23 @@ func (s *sftpReader) seekNextPath(ctx context.Context) (file *sftp.File, nextPat
return
}

if file, err = s.client.Open(nextPath); err != nil {
if file, err = client.Open(nextPath); err != nil {
if errors.Is(err, sftp.ErrSshFxConnectionLost) {
_ = s.client.Close()
_ = client.Close()
s.scannerMut.Lock()
s.client = nil
s.scannerMut.Unlock()
}

s.log.With("path", nextPath, "err", err.Error()).Warn("Unable to open previously identified file")
if os.IsNotExist(err) {
// If we failed to open the file because it no longer exists
// then we can "ack" the path as we're done with it.
_ = s.pathProvider.Ack(ctx, nextPath, nil)
_ = pathProvider.Ack(ctx, nextPath, nil)
} else {
// Otherwise we "nack" it with the error as we'll want to
// reprocess it again later.
_ = s.pathProvider.Ack(ctx, nextPath, err)
_ = pathProvider.Ack(ctx, nextPath, err)
}
} else {
return
Expand Down Expand Up @@ -310,9 +322,7 @@ func (s *sftpReader) ReadBatch(ctx context.Context) (service.MessageBatch, servi
part.MetaSetMut("sftp_path", currentPath)
}

return parts, func(ctx context.Context, res error) error {
return codecAckFn(ctx, res)
}, nil
return parts, codecAckFn, nil
}

func (s *sftpReader) Close(ctx context.Context) error {
Expand Down