diff --git a/lang/en/us.go b/lang/en/us.go index 72725931..9c5a9e73 100644 --- a/lang/en/us.go +++ b/lang/en/us.go @@ -50,6 +50,7 @@ var areaUS = map[string]string{ "info.check.start": "Start checking files for %s, heavy = %v", "info.check.done": "File check finished for %s, missing %d files", + "error.check.failed": "Failed to check %s: %v", "hint.check.checking": "> Checking ", "warn.check.modified.size": "Found modified file: size of %q is %d, expect %d", "warn.check.modified.hash": "Found modified file: hash of %q is %s, expect %s", diff --git a/lang/zh/cn.go b/lang/zh/cn.go index ea403560..d2faff76 100644 --- a/lang/zh/cn.go +++ b/lang/zh/cn.go @@ -50,6 +50,7 @@ var areaCN = map[string]string{ "info.check.start": "开始在 %s 检测文件. 强检查 = %v", "info.check.done": "文件在 %s 检查完毕, 缺失 %d 个文件", + "error.check.failed": "无法检查 %s: %v", "hint.check.checking": "> 检查中 ", "warn.check.modified.size": "找到修改过的文件: %q 的大小为 %d, 预期 %d", "warn.check.modified.hash": "找到修改过的文件: %q 的哈希值为 %s, 预期 %s", diff --git a/sync.go b/sync.go index b202d76e..c2c2dc4e 100644 --- a/sync.go +++ b/sync.go @@ -25,6 +25,7 @@ import ( "context" "crypto" "encoding/hex" + "errors" "fmt" "io" "net/http" @@ -221,7 +222,7 @@ func (cr *Cluster) checkFileFor( heavy bool, missing *utils.SyncMap[string, *fileInfoWithTargets], pg *mpb.Progress, -) { +) (err error) { var missingCount atomic.Int32 addMissing := func(f FileInfo) { missingCount.Add(1) @@ -288,7 +289,7 @@ func (cr *Cluster) checkFileFor( { start := time.Now() var checkedMp [256]bool - sto.WalkDir(func(hash string, size int64) error { + if err = sto.WalkDir(func(hash string, size int64) error { if n := utils.HexTo256(hash); !checkedMp[n] { checkedMp[n] = true now := time.Now() @@ -297,13 +298,15 @@ func (cr *Cluster) checkFileFor( } sizeMap[hash] = size return nil - }) + }); err != nil { + return + } } bar.SetCurrent(0) bar.SetTotal((int64)(len(files)), false) for _, f := range files { - if ctx.Err() != nil { + if err = ctx.Err(); err != nil { return } start := time.Now() @@ -326,7 +329,7 @@ func (cr *Cluster) checkFileFor( } else { _, buf, free := slots.Alloc(ctx) if buf == nil { - return + return ctx.Err() } go func(f FileInfo, buf []byte, free func()) { defer log.RecoverPanic(nil) @@ -378,28 +381,42 @@ func (cr *Cluster) CheckFiles( pg *mpb.Progress, ) (map[string]*fileInfoWithTargets, error) { missingMap := utils.NewSyncMap[string, *fileInfoWithTargets]() - done := make(chan struct{}, 0) + done := make(chan bool, 0) for _, s := range cr.storages { go func(s storage.Storage) { defer log.RecordPanic() - defer func() { - select { - case done <- struct{}{}: - case <-ctx.Done(): - } - }() - cr.checkFileFor(ctx, s, files, heavyCheck, missingMap, pg) + err := cr.checkFileFor(ctx, s, files, heavyCheck, missingMap, pg) + if ctx.Err() != nil { + return + } + if err != nil { + log.Errorf(Tr("error.check.failed"), s, err) + } + select { + case done <- err == nil: + case <-ctx.Done(): + } }(s) } + goodCount := 0 for i := len(cr.storages); i > 0; i-- { select { - case <-done: + case ok := <-done: + if ok { + goodCount++ + } case <-ctx.Done(): log.Warn(Tr("warn.sync.interrupted")) return nil, ctx.Err() } } + if err := ctx.Err(); err != nil { + return nil, err + } + if goodCount == 0 { + return nil, errors.New("All storages are failed") + } return missingMap.RawMap(), nil }