Skip to content

Commit

Permalink
use namedLocks to lock file for safe read or write
Browse files Browse the repository at this point in the history
* log warn if waiting to obtain the lock
* retry renaming & log warn when 'finalizing' a safe write
  • Loading branch information
elv-gilles committed Jul 2, 2024
1 parent dd4eb4a commit 94b4449
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 7 deletions.
107 changes: 101 additions & 6 deletions util/fileutil/safe.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,50 @@ package fileutil
import (
"io"
"os"
"time"

"github.com/eluv-io/common-go/util/syncutil"
"github.com/eluv-io/common-go/util/timeutil"
"github.com/eluv-io/errors-go"
elog "github.com/eluv-io/log-go"
)

const (
tempExt = ".temp"
)

var (
log = elog.Get("/eluvio/fileutil")
logIfWait4Lock = time.Millisecond * 100
safeFiles = syncutil.NamedLocks{}
)

// lockSafeFile takes a lock in 'safeFiles' for the given path and returns the
// unlocker that must be called after using the locked file.
// This uses real sync.Mutex. Alternatively we could use:
// - advisory file locking as done by go internals in package src/cmd/go/internal/lockedfile.
// - or: https://github.com/rboyer/safeio
func lockSafeFile(op, path string) syncutil.Unlocker {
watch := timeutil.StartWatch()
unlock := safeFiles.Lock(path)

d := watch.Duration()
if d > logIfWait4Lock {
log.Warn(op+" - waited for file lock",
"path", path,
"duration", d)
}
return unlock
}

func PurgeSafeFile(path string) error {
unlocker := lockSafeFile("PurgeSafeFile", path)
defer unlocker.Unlock()

_ = os.Remove(path + tempExt)
return os.RemoveAll(path)
}

// NewSafeWriter returns a writer that writes to a temporary file and attempts to replace the target file upon
// finalization. This prevents the target file from becoming corrupted in case of crashes and ensures "atomic writes".
//
Expand All @@ -20,26 +56,55 @@ const (
// while closing or renaming the temp file
// - if the provided error is not nil, it tries to close and remove the temp file and returns the provided in error
func NewSafeWriter(path string) (w io.Writer, finalize func(error) error, err error) {
return newSafeWriter(path)
}
func newSafeWriter(path string, lock ...bool) (w io.Writer, finalize func(error) error, err error) {
var unlocker syncutil.Unlocker
if len(lock) > 0 && !lock[0] {
unlocker = &noopUnlocker{}
} else {
unlocker = lockSafeFile("NewSafeWriter", path)
}

tmp := path + tempExt
tmpFile, err := os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return nil, nil, err
}

finalize = func(org error) error {
defer unlocker.Unlock()
err := tmpFile.Close()
if org != nil {
_ = os.Remove(tmp)
return org
}
if err == nil {
if err != nil {
return errors.E("safe write close", errors.K.IO.Default(), err)
}

count := 0
for {
count++
_ = os.Remove(path)
err = os.Rename(tmp, path)

if err == nil || count > 3 {
return err
}

// retry if renaming fails with 'no such file or directory'
var lnkErr *os.LinkError
if ok := errors.As(err, &lnkErr); ok && os.IsNotExist(lnkErr.Err) {
log.Warn("safe writer - rename failed (no such file)",
"temp_file", tmp,
"count", count)
time.Sleep(time.Millisecond * 2 * time.Duration(count))
continue
}

return errors.E("safe write", errors.K.IO.Default(), err)
}
if err == nil {
return nil
}
return errors.E("safe write", errors.K.IO.Default(), err)
}

return tmpFile, finalize, nil
Expand All @@ -51,6 +116,16 @@ func NewSafeWriter(path string) (w io.Writer, finalize func(error) error, err er
// - If both the temporary file and the target exists, then the temp file is removed and the target file used.
// - Otherwise the target file is attempted to be opened
func NewSafeReader(path string) (io.ReadCloser, error) {
return newSafeReader(path)
}
func newSafeReader(path string, lock ...bool) (io.ReadCloser, error) {
var unlocker syncutil.Unlocker
if len(lock) > 0 && !lock[0] {
unlocker = &noopUnlocker{}
} else {
unlocker = lockSafeFile("NewSafeReader", path)
}

tmp := path + tempExt
if _, err := os.Stat(tmp); err == nil {
fexists := false
Expand All @@ -61,15 +136,35 @@ func NewSafeReader(path string) (io.ReadCloser, error) {
switch fexists {
case true:
// both files exist: assume we crashed writing temp or just afterward
// note: the assumption could be wrong without lock as we could be writing
_ = os.Remove(tmp)
case false:
// temp was written correctly but not renamed to f.path
// note: the assumption could be wrong without lock as we could be writing
_ = os.Remove(path)
err = os.Rename(tmp, path)
if err != nil {
return nil, err
}
}
}
return os.Open(path)
ret, err := os.Open(path)
if err != nil {
return nil, err
}
return &safeFile{File: ret, unlocker: unlocker}, nil
}

type noopUnlocker struct{}

func (l *noopUnlocker) Unlock() {}

type safeFile struct {
*os.File
unlocker syncutil.Unlocker
}

func (f *safeFile) Close() error {
defer f.unlocker.Unlock()
return f.File.Close()
}
123 changes: 122 additions & 1 deletion util/fileutil/safe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ import (
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/eluv-io/errors-go"
)

func TestSafeReader(t *testing.T) {
testDir, err := os.MkdirTemp("", "TestSafeReaderWriter")
testDir, err := os.MkdirTemp("", "TestSafeReader")
require.NoError(t, err)
defer func() { _ = os.RemoveAll(testDir) }()

type testCase struct {
name string
Expand Down Expand Up @@ -63,6 +68,7 @@ func TestSafeReader(t *testing.T) {
func TestSafeWriter(t *testing.T) {
testDir, err := os.MkdirTemp("", "TestSafeReaderWriter")
require.NoError(t, err)
defer func() { _ = os.RemoveAll(testDir) }()

target := filepath.Join(testDir, "f.txt")
ftmp := target + tempExt
Expand Down Expand Up @@ -91,6 +97,7 @@ func TestSafeWriter(t *testing.T) {
func TestSafeWriterError(t *testing.T) {
testDir, err := os.MkdirTemp("", "TestSafeReaderWriter")
require.NoError(t, err)
defer func() { _ = os.RemoveAll(testDir) }()

target := filepath.Join(testDir, "f.txt")
ftmp := target + tempExt
Expand All @@ -112,3 +119,117 @@ func TestSafeWriterError(t *testing.T) {
_, err = os.Stat(ftmp)
require.Error(t, err)
}

func TestSlowSafeReaderWriter(t *testing.T) {
testDir, err := os.MkdirTemp("", "TestSlowSafeReaderWriter")
require.NoError(t, err)
defer func() { _ = os.RemoveAll(testDir) }()

target := filepath.Join(testDir, "f.txt")
writer, finalize, err := NewSafeWriter(target)
require.NoError(t, err)
_, err = writer.Write([]byte("test data"))
require.NoError(t, err)
err = finalize(nil)
require.NoError(t, err)

// take the lock on target and do a 'slow read'
reader, err := NewSafeReader(target)
require.NoError(t, err)
go func(rd io.ReadCloser) {
time.Sleep(time.Millisecond * 500)
_ = rd.Close()
}(reader)

// attempt to write during the slow read (log should report a warning)
writer, finalize, err = NewSafeWriter(target)
require.NoError(t, err)
_, err = writer.Write([]byte("test data2"))
require.NoError(t, err)
err = finalize(nil)
require.NoError(t, err)

reader, err = NewSafeReader(target)
require.NoError(t, err)
defer errors.Ignore(reader.Close)
bb, err := io.ReadAll(reader)
require.NoError(t, err)
require.Equal(t, "test data2", string(bb))
}

// TestConcurrentSafeReaderWriter shows that reading concurrently with a write
// might lead to fail writing whenever not using lock.
func TestConcurrentSafeReaderWriter(t *testing.T) {
testDir, err := os.MkdirTemp("", "TestConcurrentSafeReaderWriter")
require.NoError(t, err)
defer func() { _ = os.RemoveAll(testDir) }()

target := filepath.Join(testDir, "f.txt")

readFile := func(lock bool) (string, error) {
reader, err := newSafeReader(target, lock)
if err != nil {
return "", err
}
bb, err := io.ReadAll(reader)
_ = reader.Close()
if err != nil {
return "", err
}
return string(bb), nil
}

type testCase struct {
lock bool
wantFail bool
}

for _, tc := range []*testCase{
{lock: true},
{lock: false, wantFail: true},
} {
//fmt.Println("case", tc.lock)
tcase := fmt.Sprintf("lock: %v", tc.lock)
err := PurgeSafeFile(target)
require.NoError(t, err, tcase)

writer, finalize, err := newSafeWriter(target, tc.lock)
require.NoError(t, err, tcase)

// reading concurrently without lock deletes the '.temp' file
val := ""
var rerr error
wg := sync.WaitGroup{}
wg.Add(1)
go func(lock bool) {
defer wg.Done()
val, rerr = readFile(lock)
}(tc.lock)

time.Sleep(time.Millisecond * 100)
_, err = writer.Write([]byte("test data"))
require.NoError(t, err, tcase)
time.Sleep(time.Millisecond * 100)
err = finalize(nil)
wg.Wait()

//fmt.Println("val", val)
require.NoError(t, rerr, tcase)
if tc.wantFail {
require.Error(t, err, tcase)
require.Equal(t, "", val, tcase)
} else {
require.NoError(t, err, tcase)
require.Equal(t, "test data", val, tcase)
}

ret, err := readFile(tc.lock)
if tc.wantFail {
require.Error(t, err, tcase)
} else {
require.NoError(t, err, tcase)
require.Equal(t, "test data", ret, tcase)
}
}

}

0 comments on commit 94b4449

Please sign in to comment.