Skip to content

Commit

Permalink
do not sync file if walkdir failed
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Apr 8, 2024
1 parent d932108 commit 9d73b9f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
1 change: 1 addition & 0 deletions lang/en/us.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var areaUS = map[string]string{

"info.check.start": "Start checking files for %s, heavy = %v",
"info.check.done": "File check finished for %s, missing %d files",
"error.check.failed": "Failed to check %s: %v",
"hint.check.checking": "> Checking ",
"warn.check.modified.size": "Found modified file: size of %q is %d, expect %d",
"warn.check.modified.hash": "Found modified file: hash of %q is %s, expect %s",
Expand Down
1 change: 1 addition & 0 deletions lang/zh/cn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var areaCN = map[string]string{

"info.check.start": "开始在 %s 检测文件. 强检查 = %v",
"info.check.done": "文件在 %s 检查完毕, 缺失 %d 个文件",
"error.check.failed": "无法检查 %s: %v",
"hint.check.checking": "> 检查中 ",
"warn.check.modified.size": "找到修改过的文件: %q 的大小为 %d, 预期 %d",
"warn.check.modified.hash": "找到修改过的文件: %q 的哈希值为 %s, 预期 %s",
Expand Down
45 changes: 31 additions & 14 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"context"
"crypto"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -221,7 +222,7 @@ func (cr *Cluster) checkFileFor(
heavy bool,
missing *utils.SyncMap[string, *fileInfoWithTargets],
pg *mpb.Progress,
) {
) (err error) {
var missingCount atomic.Int32
addMissing := func(f FileInfo) {
missingCount.Add(1)
Expand Down Expand Up @@ -288,7 +289,7 @@ func (cr *Cluster) checkFileFor(
{
start := time.Now()
var checkedMp [256]bool
sto.WalkDir(func(hash string, size int64) error {
if err = sto.WalkDir(func(hash string, size int64) error {
if n := utils.HexTo256(hash); !checkedMp[n] {
checkedMp[n] = true
now := time.Now()
Expand All @@ -297,13 +298,15 @@ func (cr *Cluster) checkFileFor(
}
sizeMap[hash] = size
return nil
})
}); err != nil {
return
}
}

bar.SetCurrent(0)
bar.SetTotal((int64)(len(files)), false)
for _, f := range files {
if ctx.Err() != nil {
if err = ctx.Err(); err != nil {
return
}
start := time.Now()
Expand All @@ -326,7 +329,7 @@ func (cr *Cluster) checkFileFor(
} else {
_, buf, free := slots.Alloc(ctx)
if buf == nil {
return
return ctx.Err()
}
go func(f FileInfo, buf []byte, free func()) {
defer log.RecoverPanic(nil)
Expand Down Expand Up @@ -378,28 +381,42 @@ func (cr *Cluster) CheckFiles(
pg *mpb.Progress,
) (map[string]*fileInfoWithTargets, error) {
missingMap := utils.NewSyncMap[string, *fileInfoWithTargets]()
done := make(chan struct{}, 0)
done := make(chan bool, 0)

for _, s := range cr.storages {
go func(s storage.Storage) {
defer log.RecordPanic()
defer func() {
select {
case done <- struct{}{}:
case <-ctx.Done():
}
}()
cr.checkFileFor(ctx, s, files, heavyCheck, missingMap, pg)
err := cr.checkFileFor(ctx, s, files, heavyCheck, missingMap, pg)
if ctx.Err() != nil {
return
}
if err != nil {
log.Errorf(Tr("error.check.failed"), s, err)
}
select {
case done <- err == nil:
case <-ctx.Done():
}
}(s)
}
goodCount := 0
for i := len(cr.storages); i > 0; i-- {
select {
case <-done:
case ok := <-done:
if ok {
goodCount++
}
case <-ctx.Done():
log.Warn(Tr("warn.sync.interrupted"))
return nil, ctx.Err()
}
}
if err := ctx.Err(); err != nil {
return nil, err
}
if goodCount == 0 {
return nil, errors.New("All storages are failed")
}
return missingMap.RawMap(), nil
}

Expand Down

0 comments on commit 9d73b9f

Please sign in to comment.