diff --git a/weed/command/command.go b/weed/command/command.go index 4baa92f04..ad7e2e9c2 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -27,6 +27,7 @@ var Commands = []*Command{ cmdFilerRemoteSynchronize, cmdFilerReplicate, cmdFilerSynchronize, + cmdFilerSyncVerify, cmdFix, cmdFuse, cmdIam, diff --git a/weed/command/filer_sync_verify.go b/weed/command/filer_sync_verify.go new file mode 100644 index 000000000..e46a5b9ca --- /dev/null +++ b/weed/command/filer_sync_verify.go @@ -0,0 +1,674 @@ +package command + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path" + "sync" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +type SyncVerifyOptions struct { + filerA *string + filerB *string + aPath *string + bPath *string + aSecurity *string + bSecurity *string + isActivePassive *bool + modifiedTimeAgo *time.Duration + jsonOutput *bool +} + +var syncVerifyOptions SyncVerifyOptions + +func init() { + cmdFilerSyncVerify.Run = runFilerSyncVerify // break init cycle + syncVerifyOptions.filerA = cmdFilerSyncVerify.Flag.String("a", "", "filer A in one SeaweedFS cluster") + syncVerifyOptions.filerB = cmdFilerSyncVerify.Flag.String("b", "", "filer B in the other SeaweedFS cluster") + syncVerifyOptions.aPath = cmdFilerSyncVerify.Flag.String("a.path", "/", "directory to verify on filer A") + syncVerifyOptions.bPath = cmdFilerSyncVerify.Flag.String("b.path", "/", "directory to verify on filer B") + syncVerifyOptions.aSecurity = cmdFilerSyncVerify.Flag.String("a.security", "", "security.toml file for filer A when clusters use different certificates") + syncVerifyOptions.bSecurity = cmdFilerSyncVerify.Flag.String("b.security", "", "security.toml file for filer B when clusters use different certificates") + syncVerifyOptions.isActivePassive = cmdFilerSyncVerify.Flag.Bool("isActivePassive", false, "one directional comparison from A to B; entries only in B are not reported") + syncVerifyOptions.modifiedTimeAgo = cmdFilerSyncVerify.Flag.Duration("modifiedTimeAgo", 0, "only verify files modified before this duration ago (e.g. 1h) for sync-lag tolerance") + syncVerifyOptions.jsonOutput = cmdFilerSyncVerify.Flag.Bool("jsonOutput", false, "emit NDJSON output (one JSON object per line) for external tooling") +} + +var cmdFilerSyncVerify = &Command{ + UsageLine: "filer.sync.verify -a=: -b=:", + Short: "compare entries between two filers and report differences", + Long: `compare entries between two filers and report differences, then exit. + + Useful for validating active/passive sync targets agree with the source. + Reports MISSING (in A but not in B), ONLY_IN_B (in B but not in A; suppressed + in active-passive mode), SIZE_MISMATCH, and ETAG_MISMATCH. Honors + -modifiedTimeAgo to skip recently-modified files (sync-lag tolerance) and + -isActivePassive for unidirectional comparison. + + Exits with code 0 on agreement, 2 on differences or operational errors. + +`, +} + +func runFilerSyncVerify(cmd *Command, args []string) bool { + util.LoadSecurityConfiguration() + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + grpcDialOptionA := grpcDialOption + grpcDialOptionB := grpcDialOption + if *syncVerifyOptions.aSecurity != "" { + var err error + if grpcDialOptionA, err = security.LoadClientTLSFromFile(*syncVerifyOptions.aSecurity, "grpc.client"); err != nil { + glog.Fatalf("load security config for filer A: %v", err) + } + } + if *syncVerifyOptions.bSecurity != "" { + var err error + if grpcDialOptionB, err = security.LoadClientTLSFromFile(*syncVerifyOptions.bSecurity, "grpc.client"); err != nil { + glog.Fatalf("load security config for filer B: %v", err) + } + } + + filerA := pb.ServerAddress(*syncVerifyOptions.filerA) + filerB := pb.ServerAddress(*syncVerifyOptions.filerB) + + if err := runVerifySync(filerA, filerB, *syncVerifyOptions.aPath, *syncVerifyOptions.bPath, + *syncVerifyOptions.isActivePassive, *syncVerifyOptions.modifiedTimeAgo, + *syncVerifyOptions.jsonOutput, + grpcDialOptionA, grpcDialOptionB); err != nil { + glog.Errorf("verify sync: %v", err) + os.Exit(2) + } + return true +} + +// verifySyncConcurrency caps concurrent directory I/O across the entire +// recursive walk. A single shared semaphore is created in runVerifySync and +// passed down so the limit applies globally — a per-call semaphore would only +// cap concurrency per directory level, allowing fanout to grow as +// verifySyncConcurrency^depth on deep trees. +// +// Trade-off: higher values reduce wall time on wide trees by parallelizing +// listEntries RPCs, at the cost of more concurrent load on both filers and +// more memory from queued goroutines (each waiting goroutine ~2KB stack). +// Each compareDirectory holds a slot only for its own listing+compare phase +// and releases before recursing, so a parent never blocks waiting for +// children to acquire slots. +const verifySyncConcurrency = 5 + +// isTooRecent reports whether entry's mtime is past cutoff (sync-lag tolerance). +// Returns false when cutoff is zero or attributes are missing. +func isTooRecent(entry *filer_pb.Entry, cutoff time.Time) bool { + return !cutoff.IsZero() && entry != nil && entry.Attributes != nil && entry.Attributes.Mtime > cutoff.Unix() +} + +type VerifyResult struct { + dirCount atomic.Int64 + fileCount atomic.Int64 + missingCount atomic.Int64 + sizeMismatch atomic.Int64 + etagMismatch atomic.Int64 + onlyInB atomic.Int64 + skippedRecent atomic.Int64 + + // outputMu serializes writes to stdout. Multiple goroutines call + // reportDiff concurrently from compareDirectory worker pool. + outputMu sync.Mutex + jsonOutput bool +} + +type verifyDiffType int + +const ( + diffMissing verifyDiffType = iota // in A but not in B + diffOnlyInB // in B but not in A + diffSizeMismatch // size differs + diffETagMismatch // etag differs +) + +// diffRecord is the JSON Lines schema for a single diff entry. +type diffRecord struct { + Type string `json:"type"` + Path string `json:"path"` + IsDirectory bool `json:"isDirectory,omitempty"` + A *entryRecord `json:"a,omitempty"` + B *entryRecord `json:"b,omitempty"` + MtimeRelation string `json:"mtimeRelation,omitempty"` // EQUAL | A_NEWER | B_NEWER + MtimeDelta string `json:"mtimeDelta,omitempty"` // human-readable, e.g. "5d", "12h" + Hint string `json:"hint,omitempty"` // late_updates_skip_likely | sync_lag_or_event_miss +} + +type entryRecord struct { + Size uint64 `json:"size"` + Mtime int64 `json:"mtime"` + ETag string `json:"etag,omitempty"` +} + +type summaryRecord struct { + Type string `json:"type"` + Directories int64 `json:"directories"` + Files int64 `json:"files"` + SkippedRecent int64 `json:"skippedRecent"` + Missing int64 `json:"missing"` + SizeMismatch int64 `json:"sizeMismatch"` + ETagMismatch int64 `json:"etagMismatch"` + OnlyInB int64 `json:"onlyInB"` + TotalErrors int64 `json:"totalErrors"` +} + +// simpleFilerClient implements filer_pb.FilerClient for gRPC connections +type simpleFilerClient struct { + grpcAddress pb.ServerAddress + grpcDialOption grpc.DialOption +} + +func (c *simpleFilerClient) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, c.grpcAddress.ToGrpcAddress(), false, c.grpcDialOption) +} + +func (c *simpleFilerClient) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +func (c *simpleFilerClient) GetDataCenter() string { + return "" +} + +func runVerifySync(filerA, filerB pb.ServerAddress, aPath, bPath string, + isActivePassive bool, modifiedTimeAgo time.Duration, + jsonOutput bool, + grpcDialOptionA, grpcDialOptionB grpc.DialOption) error { + + clientA := &simpleFilerClient{grpcAddress: filerA, grpcDialOption: grpcDialOptionA} + clientB := &simpleFilerClient{grpcAddress: filerB, grpcDialOption: grpcDialOptionB} + + var cutoffTime time.Time + if modifiedTimeAgo > 0 { + cutoffTime = time.Now().Add(-modifiedTimeAgo) + } + + if !jsonOutput { + if !cutoffTime.IsZero() { + fmt.Fprintf(os.Stdout, "Verifying files modified before %v (modifiedTimeAgo=%v)\n", + cutoffTime.Format(time.RFC3339), modifiedTimeAgo) + } + fmt.Fprintf(os.Stdout, "Comparing %s%s => %s%s (isActivePassive=%v)\n\n", + filerA, aPath, filerB, bPath, isActivePassive) + } + + result := &VerifyResult{jsonOutput: jsonOutput} + ctx := context.Background() + sem := make(chan struct{}, verifySyncConcurrency) + + if err := compareDirectory(ctx, clientA, clientB, aPath, bPath, isActivePassive, cutoffTime, sem, result); err != nil { + return err + } + + totalErrors := result.missingCount.Load() + result.sizeMismatch.Load() + result.etagMismatch.Load() + if !isActivePassive { + totalErrors += result.onlyInB.Load() + } + + if jsonOutput { + summary := summaryRecord{ + Type: "SUMMARY", + Directories: result.dirCount.Load(), + Files: result.fileCount.Load(), + SkippedRecent: result.skippedRecent.Load(), + Missing: result.missingCount.Load(), + SizeMismatch: result.sizeMismatch.Load(), + ETagMismatch: result.etagMismatch.Load(), + OnlyInB: result.onlyInB.Load(), + TotalErrors: totalErrors, + } + writeJSONLine(result, summary) + } else { + fmt.Fprintf(os.Stdout, "\nSummary:\n") + fmt.Fprintf(os.Stdout, " Directories compared: %d\n", result.dirCount.Load()) + fmt.Fprintf(os.Stdout, " Files verified: %d\n", result.fileCount.Load()) + if result.skippedRecent.Load() > 0 { + fmt.Fprintf(os.Stdout, " Skipped (too recent): %d\n", result.skippedRecent.Load()) + } + fmt.Fprintf(os.Stdout, " Missing in B: %d\n", result.missingCount.Load()) + fmt.Fprintf(os.Stdout, " Size mismatch: %d\n", result.sizeMismatch.Load()) + fmt.Fprintf(os.Stdout, " ETag mismatch: %d\n", result.etagMismatch.Load()) + if !isActivePassive { + fmt.Fprintf(os.Stdout, " Only in B: %d\n", result.onlyInB.Load()) + } + fmt.Fprintf(os.Stdout, " Total errors: %d\n", totalErrors) + } + + if totalErrors > 0 { + return fmt.Errorf("found %d differences", totalErrors) + } + return nil +} + +// entryStream is a sorted, streaming view of a single directory's entries. +// A background goroutine pages through the directory via ReadDirAllEntries +// and forwards each entry to a buffered channel; the caller consumes entries +// one at a time through peek/advance. Memory usage is O(channel buffer) — +// independent of directory size — rather than O(total entries). +type entryStream struct { + ch <-chan *filer_pb.Entry + head *filer_pb.Entry + done bool + err error // written before ch is closed; safe to read once done==true +} + +// newEntryStream starts the background goroutine. It exits when listing +// completes, an error occurs, or ctx is cancelled; the channel is always +// closed before exit so consumers do not block indefinitely. +func newEntryStream(ctx context.Context, client filer_pb.FilerClient, dir string) *entryStream { + ch := make(chan *filer_pb.Entry, 64) + s := &entryStream{ch: ch} + go func() { + defer close(ch) + s.err = filer_pb.ReadDirAllEntries(ctx, client, util.FullPath(dir), "", + func(entry *filer_pb.Entry, isLast bool) error { + select { + case ch <- entry: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) + }() + return s +} + +// peek returns the next entry without consuming it, or nil at end-of-stream. +func (s *entryStream) peek() *filer_pb.Entry { + if s.done { + return nil + } + if s.head == nil { + e, ok := <-s.ch + if !ok { + s.done = true + return nil + } + s.head = e + } + return s.head +} + +// advance consumes and returns the next entry. +func (s *entryStream) advance() *filer_pb.Entry { + e := s.peek() + s.head = nil + return e +} + +func compareDirectory(ctx context.Context, + clientA, clientB filer_pb.FilerClient, + dirA, dirB string, + isActivePassive bool, + cutoffTime time.Time, + sem chan struct{}, + result *VerifyResult) error { + + // Hold a slot only for this directory's I/O phase (listings + merge). + // Released before recursing so parents never block waiting for children + // to acquire slots — see verifySyncConcurrency for the rationale. + sem <- struct{}{} + released := false + releaseSlot := func() { + if !released { + released = true + <-sem + } + } + defer releaseSlot() + + result.dirCount.Add(1) + + // A child context ensures that stream goroutines are cancelled and their + // channels are closed if compareDirectory returns early (e.g. on error). + mergeCtx, cancelMerge := context.WithCancel(ctx) + defer cancelMerge() + + streamA := newEntryStream(mergeCtx, clientA, dirA) + streamB := newEntryStream(mergeCtx, clientB, dirB) + + // collect subdirectories for recursive comparison + type dirPair struct{ a, b string } + var subDirs []dirPair + + for streamA.peek() != nil || streamB.peek() != nil { + eA := streamA.peek() + eB := streamB.peek() + + switch { + case eA != nil && (eB == nil || eA.Name < eB.Name): + // entry only in A + entryA := streamA.advance() + if entryA.IsDirectory { + // Always recurse for missing-in-B directories: a recent + // child write can bump the parent's mtime even though + // older missing files exist underneath. The cutoff is + // applied per-file inside countMissingRecursive. + reportDiff(diffMissing, dirA, entryA, nil, result) + countMissingRecursive(ctx, clientA, path.Join(dirA, entryA.Name), cutoffTime, result) + } else if isTooRecent(entryA, cutoffTime) { + result.skippedRecent.Add(1) + } else { + reportDiff(diffMissing, dirA, entryA, nil, result) + } + + case eB != nil && (eA == nil || eB.Name < eA.Name): + // entry only in B + entryB := streamB.advance() + if !isActivePassive { + if isTooRecent(entryB, cutoffTime) { + result.skippedRecent.Add(1) + } else { + reportDiff(diffOnlyInB, dirB, entryB, nil, result) + } + } + + default: + // same name in both + entryA := streamA.advance() + entryB := streamB.advance() + + if entryA.IsDirectory && entryB.IsDirectory { + subDirs = append(subDirs, dirPair{ + a: path.Join(dirA, entryA.Name), + b: path.Join(dirB, entryB.Name), + }) + } else if !entryA.IsDirectory && !entryB.IsDirectory { + // Skip if either side was modified recently (sync-lag tolerance). + if isTooRecent(entryA, cutoffTime) || isTooRecent(entryB, cutoffTime) { + result.skippedRecent.Add(1) + } else { + compareEntries(dirA, entryA, entryB, result) + } + } else { + // type mismatch: one is dir, other is file + reportDiff(diffMissing, dirA, entryA, nil, result) + if !isActivePassive { + reportDiff(diffOnlyInB, dirB, entryB, nil, result) + } + } + } + } + + // Both channels are closed: close happens-before the receive of the zero + // value, so stream.err is visible here without additional synchronisation. + if err := streamA.err; err != nil && err != context.Canceled { + return fmt.Errorf("list %s on filer A: %v", dirA, err) + } + if err := streamB.err; err != nil && err != context.Canceled { + return fmt.Errorf("list %s on filer B: %v", dirB, err) + } + + // Release our slot before recursing so children can acquire it. Holding + // it across wg.Wait would deadlock once depth exceeds verifySyncConcurrency. + releaseSlot() + + if len(subDirs) > 0 { + // Bounded worker pool: cap goroutines per directory level instead + // of spawning one per child. A directory with thousands of subdirs + // would otherwise park ~2KB per waiting goroutine even though + // only `verifySyncConcurrency` can do I/O at once. + workers := verifySyncConcurrency + if len(subDirs) < workers { + workers = len(subDirs) + } + jobs := make(chan dirPair, len(subDirs)) + errCh := make(chan error, 1) // first error wins; others dropped + var wg sync.WaitGroup + + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for pair := range jobs { + if err := compareDirectory(ctx, clientA, clientB, pair.a, pair.b, isActivePassive, cutoffTime, sem, result); err != nil { + select { + case errCh <- err: + default: + } + } + } + }() + } + for _, pair := range subDirs { + jobs <- pair + } + close(jobs) + wg.Wait() + close(errCh) + if err := <-errCh; err != nil { + return err + } + } + + return nil +} + + +func compareEntries(dir string, entryA, entryB *filer_pb.Entry, result *VerifyResult) { + result.fileCount.Add(1) + + sizeA := filer.FileSize(entryA) + sizeB := filer.FileSize(entryB) + if sizeA != sizeB { + reportDiff(diffSizeMismatch, dir, entryA, entryB, result) + return + } + + etagA := filer.ETag(entryA) + etagB := filer.ETag(entryB) + if etagA != etagB { + reportDiff(diffETagMismatch, dir, entryA, entryB, result) + return + } +} + +// mtimeRelation classifies B.mtime vs A.mtime. Both entries must be non-nil. +// Returns relation, absolute delta in seconds, and human-readable string. +func mtimeRelation(entryA, entryB *filer_pb.Entry) (relation, deltaStr string, deltaSec int64) { + if entryA == nil || entryB == nil || entryA.Attributes == nil || entryB.Attributes == nil { + return "", "", 0 + } + delta := entryB.Attributes.Mtime - entryA.Attributes.Mtime + abs := delta + if abs < 0 { + abs = -abs + } + switch { + case delta == 0: + return "EQUAL", "0s", 0 + case delta > 0: + return "B_NEWER", formatSeconds(abs), abs + default: + return "A_NEWER", formatSeconds(abs), abs + } +} + +func formatSeconds(s int64) string { + switch { + case s < 60: + return fmt.Sprintf("%ds", s) + case s < 3600: + return fmt.Sprintf("%dm", s/60) + case s < 86400: + return fmt.Sprintf("%dh", s/3600) + default: + return fmt.Sprintf("%dd", s/86400) + } +} + +// hintFor returns an automatic interpretation hint based on mtime relation. +// Only emitted for SIZE_MISMATCH and ETAG_MISMATCH where both entries exist. +func hintFor(relation string) string { + switch relation { + case "B_NEWER": + return "late_updates_skip_likely" + case "A_NEWER": + return "sync_lag_or_event_miss" + } + return "" +} + +func reportDiff(diffType verifyDiffType, dir string, entryA, entryB *filer_pb.Entry, result *VerifyResult) { + switch diffType { + case diffMissing: + result.missingCount.Add(1) + case diffOnlyInB: + result.onlyInB.Add(1) + case diffSizeMismatch: + result.sizeMismatch.Add(1) + case diffETagMismatch: + result.etagMismatch.Add(1) + } + + if result.jsonOutput { + writeJSONDiff(result, diffType, dir, entryA, entryB) + } else { + writeTextDiff(result, diffType, dir, entryA, entryB) + } +} + +func writeTextDiff(result *VerifyResult, diffType verifyDiffType, dir string, entryA, entryB *filer_pb.Entry) { + entryPath := path.Join(dir, entryA.Name) + + result.outputMu.Lock() + defer result.outputMu.Unlock() + + switch diffType { + case diffMissing: + if entryA.IsDirectory { + fmt.Fprintf(os.Stdout, "[MISSING] %s/ (directory)\n", entryPath) + } else { + fmt.Fprintf(os.Stdout, "[MISSING] %s (size=%d, etag=%s)\n", + entryPath, filer.FileSize(entryA), filer.ETag(entryA)) + } + case diffOnlyInB: + fmt.Fprintf(os.Stdout, "[ONLY_IN_B] %s\n", entryPath) + case diffSizeMismatch: + ann := annotation(entryA, entryB) + fmt.Fprintf(os.Stdout, "[SIZE_MISMATCH] %s (a=%d, b=%d%s)\n", + entryPath, filer.FileSize(entryA), filer.FileSize(entryB), ann) + case diffETagMismatch: + ann := annotation(entryA, entryB) + fmt.Fprintf(os.Stdout, "[ETAG_MISMATCH] %s (a=%s, b=%s%s)\n", + entryPath, filer.ETag(entryA), filer.ETag(entryB), ann) + } +} + +// annotation builds the trailing ", mtime: ... [hint]" segment for text output. +// Returns empty string if entries are unavailable. +func annotation(entryA, entryB *filer_pb.Entry) string { + relation, delta, _ := mtimeRelation(entryA, entryB) + if relation == "" { + return "" + } + switch relation { + case "EQUAL": + return ", mtime equal [chunk-level issue]" + case "B_NEWER": + return fmt.Sprintf(", B newer +%s [late-updates skip likely]", delta) + case "A_NEWER": + return fmt.Sprintf(", A newer +%s [sync lag or event miss]", delta) + } + return "" +} + +func writeJSONDiff(result *VerifyResult, diffType verifyDiffType, dir string, entryA, entryB *filer_pb.Entry) { + rec := diffRecord{Path: path.Join(dir, entryA.Name)} + + switch diffType { + case diffMissing: + rec.Type = "MISSING" + rec.IsDirectory = entryA.IsDirectory + rec.A = toEntryRecord(entryA) + case diffOnlyInB: + rec.Type = "ONLY_IN_B" + // for diffOnlyInB the existing convention passes the entry as entryA + rec.IsDirectory = entryA.IsDirectory + rec.B = toEntryRecord(entryA) + case diffSizeMismatch: + rec.Type = "SIZE_MISMATCH" + rec.A = toEntryRecord(entryA) + rec.B = toEntryRecord(entryB) + relation, delta, _ := mtimeRelation(entryA, entryB) + rec.MtimeRelation = relation + rec.MtimeDelta = delta + rec.Hint = hintFor(relation) + case diffETagMismatch: + rec.Type = "ETAG_MISMATCH" + rec.A = toEntryRecord(entryA) + rec.B = toEntryRecord(entryB) + relation, delta, _ := mtimeRelation(entryA, entryB) + rec.MtimeRelation = relation + rec.MtimeDelta = delta + rec.Hint = hintFor(relation) + } + + writeJSONLine(result, rec) +} + +func toEntryRecord(entry *filer_pb.Entry) *entryRecord { + if entry == nil { + return nil + } + r := &entryRecord{ + Size: filer.FileSize(entry), + ETag: filer.ETag(entry), + } + if entry.Attributes != nil { + r.Mtime = entry.Attributes.Mtime + } + return r +} + +// writeJSONLine emits a single JSON object followed by newline. Holds outputMu +// across marshal+write so concurrent goroutines never interleave. +func writeJSONLine(result *VerifyResult, v any) { + data, err := json.Marshal(v) + if err != nil { + glog.Warningf("marshal verify record: %v", err) + return + } + result.outputMu.Lock() + defer result.outputMu.Unlock() + os.Stdout.Write(data) + os.Stdout.Write([]byte{'\n'}) +} + +func countMissingRecursive(ctx context.Context, client filer_pb.FilerClient, dir string, cutoffTime time.Time, result *VerifyResult) { + err := filer_pb.ReadDirAllEntries(ctx, client, util.FullPath(dir), "", + func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + countMissingRecursive(ctx, client, path.Join(dir, entry.Name), cutoffTime, result) + return nil + } + if !cutoffTime.IsZero() && entry.Attributes != nil && entry.Attributes.Mtime > cutoffTime.Unix() { + result.skippedRecent.Add(1) + return nil + } + reportDiff(diffMissing, dir, entry, nil, result) + return nil + }) + if err != nil { + glog.Warningf("list missing directory %s: %v", dir, err) + } +} diff --git a/weed/command/filer_sync_verify_test.go b/weed/command/filer_sync_verify_test.go new file mode 100644 index 000000000..9f38f81ba --- /dev/null +++ b/weed/command/filer_sync_verify_test.go @@ -0,0 +1,529 @@ +package command + +import ( + "context" + "fmt" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// --- stream / inner-client / outer-client mocks --- + +type verifyTestStream struct { + entries []*filer_pb.Entry + idx int +} + +func (s *verifyTestStream) Recv() (*filer_pb.ListEntriesResponse, error) { + if s.idx >= len(s.entries) { + return nil, io.EOF + } + resp := &filer_pb.ListEntriesResponse{Entry: s.entries[s.idx]} + s.idx++ + return resp, nil +} + +func (s *verifyTestStream) Header() (metadata.MD, error) { return metadata.MD{}, nil } +func (s *verifyTestStream) Trailer() metadata.MD { return metadata.MD{} } +func (s *verifyTestStream) CloseSend() error { return nil } +func (s *verifyTestStream) Context() context.Context { return context.Background() } +func (s *verifyTestStream) SendMsg(_ any) error { return nil } +func (s *verifyTestStream) RecvMsg(_ any) error { return nil } + +// verifyTestInnerClient is the SeaweedFilerClient passed to fn inside WithFilerClient. +type verifyTestInnerClient struct { + filer_pb.SeaweedFilerClient // embed for unimplemented RPCs + entriesByDir map[string][]*filer_pb.Entry +} + +func (c *verifyTestInnerClient) ListEntries(_ context.Context, in *filer_pb.ListEntriesRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[filer_pb.ListEntriesResponse], error) { + return &verifyTestStream{entries: c.entriesByDir[in.Directory]}, nil +} + +// verifyTestFilerClient implements filer_pb.FilerClient and tracks concurrent +// WithFilerClient invocations to let tests verify the global concurrency bound. +type verifyTestFilerClient struct { + entriesByDir map[string][]*filer_pb.Entry + inFlight int64 // accessed via atomic + peakFlight int64 // accessed via atomic + delay time.Duration +} + +func (c *verifyTestFilerClient) WithFilerClient(_ bool, fn func(filer_pb.SeaweedFilerClient) error) error { + // track peak concurrent in-flight listings + n := atomic.AddInt64(&c.inFlight, 1) + defer atomic.AddInt64(&c.inFlight, -1) + for { + peak := atomic.LoadInt64(&c.peakFlight) + if n <= peak || atomic.CompareAndSwapInt64(&c.peakFlight, peak, n) { + break + } + } + if c.delay > 0 { + time.Sleep(c.delay) + } + return fn(&verifyTestInnerClient{entriesByDir: c.entriesByDir}) +} + +func (c *verifyTestFilerClient) AdjustedUrl(_ *filer_pb.Location) string { return "" } +func (c *verifyTestFilerClient) GetDataCenter() string { return "" } + +// --- entry helpers --- + +func verifyFileEntry(name string, size uint64) *filer_pb.Entry { + return &filer_pb.Entry{ + Name: name, + Attributes: &filer_pb.FuseAttributes{FileSize: size}, + } +} + +func verifyDirEntry(name string) *filer_pb.Entry { + return &filer_pb.Entry{Name: name, IsDirectory: true} +} + +// --- tests --- + +// TestVerifySyncMissingFile confirms that a file present in A but absent in B +// is counted as missing. +func TestVerifySyncMissingFile(t *testing.T) { + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/root": {verifyFileEntry("file.txt", 100)}, + }, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/root": {}, + }, + } + + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + err := compareDirectory(context.Background(), clientA, clientB, + "/root", "/root", false, time.Time{}, sem, result) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.missingCount.Load(); got != 1 { + t.Errorf("missingCount = %d, want 1", got) + } + if got := result.sizeMismatch.Load(); got != 0 { + t.Errorf("sizeMismatch = %d, want 0", got) + } +} + +// TestVerifySyncOnlyInB confirms that a file present only in B is counted +// (non-active-passive mode) or ignored (active-passive mode). +func TestVerifySyncOnlyInB(t *testing.T) { + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/root": {}}, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/root": {verifyFileEntry("extra.txt", 50)}, + }, + } + + t.Run("bidirectional", func(t *testing.T) { + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/root", "/root", false, time.Time{}, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.onlyInB.Load(); got != 1 { + t.Errorf("onlyInB = %d, want 1", got) + } + }) + + t.Run("active-passive ignores onlyInB", func(t *testing.T) { + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/root", "/root", true, time.Time{}, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.onlyInB.Load(); got != 0 { + t.Errorf("onlyInB = %d, want 0 in active-passive mode", got) + } + }) +} + +// TestVerifySyncSizeMismatch confirms that a file with differing sizes is +// counted as a size mismatch and not as missing. +func TestVerifySyncSizeMismatch(t *testing.T) { + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/root": {verifyFileEntry("data.bin", 1024)}, + }, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/root": {verifyFileEntry("data.bin", 512)}, + }, + } + + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + err := compareDirectory(context.Background(), clientA, clientB, + "/root", "/root", false, time.Time{}, sem, result) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.sizeMismatch.Load(); got != 1 { + t.Errorf("sizeMismatch = %d, want 1", got) + } + if got := result.missingCount.Load(); got != 0 { + t.Errorf("missingCount = %d, want 0", got) + } +} + +// TestVerifySyncConcurrencyBound verifies that the shared semaphore keeps peak +// concurrent filer listings at or below verifySyncConcurrency at all times. +// A 5ms delay per WithFilerClient call makes the concurrency overlap observable. +func TestVerifySyncConcurrencyBound(t *testing.T) { + // Wide, shallow tree: root with 20 identical subdirectories. + const fanout = 20 + entriesA := make(map[string][]*filer_pb.Entry) + entriesB := make(map[string][]*filer_pb.Entry) + + rootDirs := make([]*filer_pb.Entry, fanout) + for i := range fanout { + name := fmt.Sprintf("sub%02d", i) + rootDirs[i] = verifyDirEntry(name) + entriesA["/root/"+name] = []*filer_pb.Entry{verifyFileEntry("f.txt", 10)} + entriesB["/root/"+name] = []*filer_pb.Entry{verifyFileEntry("f.txt", 10)} + } + entriesA["/root"] = rootDirs + entriesB["/root"] = rootDirs + + clientA := &verifyTestFilerClient{entriesByDir: entriesA, delay: 5 * time.Millisecond} + clientB := &verifyTestFilerClient{entriesByDir: entriesB, delay: 5 * time.Millisecond} + + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/root", "/root", false, time.Time{}, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if result.missingCount.Load() != 0 || result.sizeMismatch.Load() != 0 { + t.Errorf("unexpected diffs in identical tree") + } + + if peak := atomic.LoadInt64(&clientA.peakFlight); peak > verifySyncConcurrency { + t.Errorf("clientA peak concurrent listings = %d, want ≤ %d (verifySyncConcurrency)", + peak, verifySyncConcurrency) + } + if peak := atomic.LoadInt64(&clientB.peakFlight); peak > verifySyncConcurrency { + t.Errorf("clientB peak concurrent listings = %d, want ≤ %d (verifySyncConcurrency)", + peak, verifySyncConcurrency) + } +} + +// TestVerifySyncETagMismatch confirms that two files with the same size but +// different Md5 checksums are counted as an ETag mismatch, not a size mismatch. +func TestVerifySyncETagMismatch(t *testing.T) { + newEntry := func(name string, md5 []byte) *filer_pb.Entry { + return &filer_pb.Entry{ + Name: name, + Attributes: &filer_pb.FuseAttributes{ + FileSize: 100, + Md5: md5, + }, + } + } + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/root": {newEntry("data.bin", []byte{0x11, 0x22, 0x33})}, + }, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/root": {newEntry("data.bin", []byte{0x44, 0x55, 0x66})}, + }, + } + + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/root", "/root", false, time.Time{}, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.etagMismatch.Load(); got != 1 { + t.Errorf("etagMismatch = %d, want 1", got) + } + if got := result.sizeMismatch.Load(); got != 0 { + t.Errorf("sizeMismatch = %d, want 0 (same size should not trigger size mismatch)", got) + } +} + +// TestVerifySyncCutoffTime verifies that entries newer than cutoffTime are +// skipped in both the A-only (MISSING) and B-only (ONLY_IN_B) branches. +func TestVerifySyncCutoffTime(t *testing.T) { + cutoff := time.Unix(1000, 0) + + recentEntry := func(name string) *filer_pb.Entry { + return &filer_pb.Entry{ + Name: name, + Attributes: &filer_pb.FuseAttributes{FileSize: 10, Mtime: 2000}, // > cutoff + } + } + oldEntry := func(name string) *filer_pb.Entry { + return &filer_pb.Entry{ + Name: name, + Attributes: &filer_pb.FuseAttributes{FileSize: 10, Mtime: 500}, // < cutoff + } + } + + t.Run("A-only recent file is skipped, not reported missing", func(t *testing.T) { + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {recentEntry("new.txt")}}, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {}}, + } + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/", "/", false, cutoff, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.skippedRecent.Load(); got != 1 { + t.Errorf("skippedRecent = %d, want 1", got) + } + if got := result.missingCount.Load(); got != 0 { + t.Errorf("missingCount = %d, want 0 (recent file should be skipped)", got) + } + }) + + t.Run("A-only old file is reported missing", func(t *testing.T) { + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {oldEntry("old.txt")}}, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {}}, + } + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/", "/", false, cutoff, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.missingCount.Load(); got != 1 { + t.Errorf("missingCount = %d, want 1", got) + } + }) + + t.Run("B-only recent file is skipped, not reported as ONLY_IN_B", func(t *testing.T) { + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {}}, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {recentEntry("new.txt")}}, + } + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/", "/", false, cutoff, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.skippedRecent.Load(); got != 1 { + t.Errorf("skippedRecent = %d, want 1", got) + } + if got := result.onlyInB.Load(); got != 0 { + t.Errorf("onlyInB = %d, want 0 (recent B-only file should be skipped)", got) + } + }) + + t.Run("B-only old file is reported as ONLY_IN_B", func(t *testing.T) { + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {}}, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {oldEntry("old.txt")}}, + } + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/", "/", false, cutoff, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.onlyInB.Load(); got != 1 { + t.Errorf("onlyInB = %d, want 1", got) + } + }) +} + +// TestVerifySyncCutoffMatchedFileBSideRecent verifies that when matched-name +// files differ but only the B side is recently modified, the comparison is +// skipped (sync-lag tolerance) rather than reporting a spurious mismatch. +func TestVerifySyncCutoffMatchedFileBSideRecent(t *testing.T) { + cutoff := time.Unix(1000, 0) + + entry := func(size uint64, mtime int64) *filer_pb.Entry { + return &filer_pb.Entry{ + Name: "data.bin", + Attributes: &filer_pb.FuseAttributes{FileSize: size, Mtime: mtime}, + } + } + + // A is old (size 100), B is recently rewritten with a different size. + // Without the B-side cutoff check this would surface as SIZE_MISMATCH. + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {entry(100, 500)}}, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{"/": {entry(200, 2000)}}, + } + + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/", "/", false, cutoff, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := result.skippedRecent.Load(); got != 1 { + t.Errorf("skippedRecent = %d, want 1 (B-side recent should skip)", got) + } + if got := result.sizeMismatch.Load(); got != 0 { + t.Errorf("sizeMismatch = %d, want 0 (recent B should not surface as mismatch)", got) + } +} + +// TestVerifySyncMissingDirRecursesEvenWithRecentMtime verifies that a +// directory missing in B with a recent mtime still has its subtree walked, +// so older missing files inside are reported. A recent child write can bump +// the parent mtime even though older missing files exist underneath. +func TestVerifySyncMissingDirRecursesEvenWithRecentMtime(t *testing.T) { + cutoff := time.Unix(1000, 0) + + recentDir := &filer_pb.Entry{ + Name: "subdir", + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{Mtime: 2000}, // > cutoff + } + oldChild := &filer_pb.Entry{ + Name: "old.txt", + Attributes: &filer_pb.FuseAttributes{FileSize: 10, Mtime: 500}, // < cutoff + } + + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/": {recentDir}, + "/subdir": {oldChild}, + }, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/": {}, + }, + } + + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/", "/", false, cutoff, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // Expect: directory MISSING + recursed-old-file MISSING = 2 missing. + if got := result.missingCount.Load(); got != 2 { + t.Errorf("missingCount = %d, want 2 (recent dir + old child inside)", got) + } + if got := result.skippedRecent.Load(); got != 0 { + t.Errorf("skippedRecent = %d, want 0 (dir mtime should not gate recursion)", got) + } +} + +// TestVerifySyncRootPath is a regression test for the path.Join fix. +// fmt.Sprintf("%s/%s", "/", name) produced "//name"; path.Join produces "/name". +// This test walks from "/" and verifies the child directory is found and +// compared correctly (not silently skipped due to a malformed path). +func TestVerifySyncRootPath(t *testing.T) { + clientA := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/": {verifyDirEntry("data")}, + "/data": {verifyFileEntry("file.txt", 42)}, + }, + } + clientB := &verifyTestFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/": {verifyDirEntry("data")}, + "/data": {verifyFileEntry("file.txt", 42)}, + }, + } + + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + if err := compareDirectory(context.Background(), clientA, clientB, + "/", "/", false, time.Time{}, sem, result); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.missingCount.Load() != 0 || result.sizeMismatch.Load() != 0 { + t.Errorf("identical trees from root should have no diffs: missing=%d size=%d", + result.missingCount.Load(), result.sizeMismatch.Load()) + } + // 2 directories traversed: "/" and "/data" + if got := result.dirCount.Load(); got != 2 { + t.Errorf("dirCount = %d, want 2 (root + /data)", got) + } + // 1 file compared: /data/file.txt + if got := result.fileCount.Load(); got != 1 { + t.Errorf("fileCount = %d, want 1", got) + } +} + +// TestVerifySyncNoDeadlockDeepTree ensures that a tree deeper than +// verifySyncConcurrency completes without deadlocking. With a per-call +// semaphore the walk would still complete (just with unbounded goroutines); +// this test mainly guards that the shared-semaphore release-before-recurse +// invariant holds — i.e. the walk finishes within the timeout. +func TestVerifySyncNoDeadlockDeepTree(t *testing.T) { + // Build a binary tree of depth 10 (well past verifySyncConcurrency=5). + const depth = 10 + entriesA := make(map[string][]*filer_pb.Entry) + entriesB := make(map[string][]*filer_pb.Entry) + + var buildTree func(path string, d int) + buildTree = func(path string, d int) { + if d == 0 { + entriesA[path] = []*filer_pb.Entry{verifyFileEntry("leaf.txt", 1)} + entriesB[path] = []*filer_pb.Entry{verifyFileEntry("leaf.txt", 1)} + return + } + children := []*filer_pb.Entry{verifyDirEntry("left"), verifyDirEntry("right")} + entriesA[path] = children + entriesB[path] = children + buildTree(path+"/left", d-1) + buildTree(path+"/right", d-1) + } + buildTree("/root", depth) + + clientA := &verifyTestFilerClient{entriesByDir: entriesA} + clientB := &verifyTestFilerClient{entriesByDir: entriesB} + + done := make(chan error, 1) + go func() { + result := &VerifyResult{} + sem := make(chan struct{}, verifySyncConcurrency) + done <- compareDirectory(context.Background(), clientA, clientB, + "/root", "/root", false, time.Time{}, sem, result) + }() + + select { + case err := <-done: + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + case <-time.After(10 * time.Second): + t.Fatal("compareDirectory did not complete within 10s — possible deadlock") + } +}