mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
feat(filer.sync): add -verifySync mode to filer.sync for cross-cluster file comparison (#9284)
* Add -verifySync flag to filer.sync for cross-cluster file comparison
Add a verification mode to filer.sync that compares entries between two
filers without performing actual synchronization. Uses directory-level
sorted merge of ListEntries to detect missing files, size mismatches,
and ETag mismatches. Supports -isActivePassive for unidirectional check
and -modifyTimeAgo to skip recently modified files during sync lag.
* Add mtime annotation and JSON output to filer.sync -verifySync
Add automatic mtime relation analysis for SIZE_MISMATCH and
ETAG_MISMATCH diffs, and an NDJSON output mode for external tooling.
mtime classification:
- B_NEWER => "late_updates_skip_likely" hint. Surfaces the case
where target has a stub entry whose mtime is ahead of source's
real file, causing UpdateEntry's mtime guard in filersink to
permanently skip the update.
- A_NEWER => "sync_lag_or_event_miss" hint.
- EQUAL => no hint (chunk-level issue suspected).
Text output example:
[SIZE_MISMATCH] /path (a=996, b=0, B newer +274d [late-updates skip likely])
Add -verifyJsonOutput flag. When set, emits one JSON object per
line (NDJSON) for diffs and a final SUMMARY object, suitable for
piping into external diagnostic pipelines.
Concurrent writes from the directory worker pool are now serialized
via outputMu to keep both text lines and JSON records atomic.
* fix(filer.sync): use shared global semaphore in verifySync to bound goroutine explosion
Replace the per-call local semaphore in compareDirectory with a single
shared semaphore created in runVerifySync. The old per-level semaphore
applied a limit of verifySyncConcurrency only within each directory level,
allowing effective concurrency to grow as verifySyncConcurrency^depth on
deep trees.
The shared semaphore is held only for each directory's I/O phase
(listEntries + merge) and released before recursing into subdirectories,
so a parent never blocks waiting for children to acquire slots — which
would deadlock once tree depth exceeds the semaphore capacity.
Extract the capacity into a named constant (verifySyncConcurrency = 5)
with a comment explaining the memory vs. performance trade-off.
Add unit tests:
- correctness: missing file, only-in-B, size mismatch, active-passive mode
- concurrency bound: peak concurrent listings ≤ verifySyncConcurrency
- no-deadlock: binary tree of depth 10 completes within timeout
* fix(filer.sync): stream directory entries to prevent OOM on large directories
Replace the listEntries helper (which accumulated all entries into a
single []filer_pb.Entry slice) with an entryStream type that pages
through the directory in the background and forwards entries one at a
time through a buffered channel. Memory per directory comparison is now
O(channel buffer size = 64) regardless of how many entries the directory
contains.
Key design points:
- entryStream wraps a goroutine + buffered channel with a one-entry
lookahead (peek/advance) so the two-pointer sorted merge in
compareDirectory can work without buffering any full listing.
- A child context (mergeCtx) is passed to both stream goroutines so
they are cancelled promptly if compareDirectory returns early (e.g.
on error); the ctx.Done() select arm in the callback prevents
goroutine leaks when the consumer stops reading.
- stream.err is written by the goroutine before close(ch), so it is
safe to read after the channel is exhausted (Go memory model:
channel close happens-before the zero-value receive).
- countMissingRecursive is rewritten to use ReadDirAllEntries with a
direct callback, eliminating its own slice allocation.
- listEntries is removed; it is no longer called anywhere.
* fix(filer.sync): address verifySync review findings
Four real bugs found and fixed; one finding already resolved (shared
semaphore was introduced in a prior commit).
path.Join for child paths (filer_sync_verify.go)
fmt.Sprintf("%s/%s", dir, name) produced "//name" when dir was "/".
Replace all child-path concatenations with path.Join so root-level
walks emit clean paths.
cutoffTime check for ONLY_IN_B entries (filer_sync_verify.go)
The B-only branch ignored -modifyTimeAgo, so files recently written
to B were reported as ONLY_IN_B instead of being skipped. Mirror the
A-side mtime guard: skip and increment skippedRecent when the entry
is newer than cutoffTime.
Summary emitted before error check (filer_sync_verify.go)
A filer I/O error mid-walk still caused a SUMMARY record (or text
summary) to be printed, making partial runs appear complete. Move the
error check to before summary emission; on error, return immediately
without printing any summary.
Return false on verification failure (filer_sync.go)
runVerifySync returned true (exit 0) even when diffs were found or the
walk failed. Return false so the main binary sets exit status 1,
consistent with how all other commands signal failure.
* test(filer.sync): add missing verifySync test coverage
Four new tests covering gaps identified during review:
TestVerifySyncETagMismatch
Verifies that two files with identical size but different Md5 checksums
are counted as etagMismatch (not sizeMismatch). Exercises the second
branch of compareEntries that was previously untested.
TestVerifySyncCutoffTime (4 subtests)
A-only recent — recent file skipped (skippedRecent++), not MISSING
A-only old — old file reported as MISSING
B-only recent — recent file skipped (skippedRecent++), not ONLY_IN_B
B-only old — old file reported as ONLY_IN_B
The B-only subtests specifically cover the cutoffTime fix added in the
previous commit.
TestVerifySyncRootPath
Regression for the path.Join fix: walks from "/" and verifies that the
child directory is reached and compared correctly (the old Sprintf
produced "//data" which would silently produce wrong results).
Asserts dirCount=2 and fileCount=1 to confirm the full tree is walked.
* fix(filer.sync): use os.Exit(2) instead of return false on verify failure
return false triggered weed.go's error handler which printed the full
command usage — appropriate for invalid arguments, not for a completed
verification that found differences. Use os.Exit(2) consistent with
the existing pattern in filer_sync.go (lines 251, 293).
* refactor(filer.sync.verify): split verify into its own command
The verify mode is a one-shot batch operation with a fundamentally
different lifecycle from the long-running sync subscriber, and most of
filer.sync's flags (replication, metrics port, debug pprof, concurrency,
etc.) do not apply to it. Extract it into a sibling command alongside
filer.copy/filer.backup/filer.export rather than a flag mode on
filer.sync.
Also rename modifyTimeAgo to modifiedTimeAgo (grammatical) and drop the
verifyJsonOutput prefix to plain jsonOutput now that the verify context
is implicit in the command name.
* fix(filer.sync.verify): address review comments
- Bounded worker pool: cap subdirectory goroutines per level via a
jobs channel and min(verifySyncConcurrency, len(subDirs)) workers
instead of spawning one goroutine per child. Wide directories no
longer park ~2KB per queued goroutine.
- Don't gate recursion on a directory's mtime: a fresh child write
bumps the parent mtime, but older files inside should still be
reported as missing. Always recurse for missing-in-B directories
and apply the cutoff per-file inside countMissingRecursive.
- Apply -modifiedTimeAgo symmetrically: matched-name files now skip
the comparison when EITHER side is recently modified, not just A.
This restores lag tolerance when B was just rewritten.
Adds tests for both new behaviors and a shared isTooRecent helper.
---------
Co-authored-by: Chris Lu <chris.lu@gmail.com>
This commit is contained in:
@@ -27,6 +27,7 @@ var Commands = []*Command{
|
||||
cmdFilerRemoteSynchronize,
|
||||
cmdFilerReplicate,
|
||||
cmdFilerSynchronize,
|
||||
cmdFilerSyncVerify,
|
||||
cmdFix,
|
||||
cmdFuse,
|
||||
cmdIam,
|
||||
|
||||
@@ -0,0 +1,674 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/security"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type SyncVerifyOptions struct {
|
||||
filerA *string
|
||||
filerB *string
|
||||
aPath *string
|
||||
bPath *string
|
||||
aSecurity *string
|
||||
bSecurity *string
|
||||
isActivePassive *bool
|
||||
modifiedTimeAgo *time.Duration
|
||||
jsonOutput *bool
|
||||
}
|
||||
|
||||
var syncVerifyOptions SyncVerifyOptions
|
||||
|
||||
func init() {
|
||||
cmdFilerSyncVerify.Run = runFilerSyncVerify // break init cycle
|
||||
syncVerifyOptions.filerA = cmdFilerSyncVerify.Flag.String("a", "", "filer A in one SeaweedFS cluster")
|
||||
syncVerifyOptions.filerB = cmdFilerSyncVerify.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
|
||||
syncVerifyOptions.aPath = cmdFilerSyncVerify.Flag.String("a.path", "/", "directory to verify on filer A")
|
||||
syncVerifyOptions.bPath = cmdFilerSyncVerify.Flag.String("b.path", "/", "directory to verify on filer B")
|
||||
syncVerifyOptions.aSecurity = cmdFilerSyncVerify.Flag.String("a.security", "", "security.toml file for filer A when clusters use different certificates")
|
||||
syncVerifyOptions.bSecurity = cmdFilerSyncVerify.Flag.String("b.security", "", "security.toml file for filer B when clusters use different certificates")
|
||||
syncVerifyOptions.isActivePassive = cmdFilerSyncVerify.Flag.Bool("isActivePassive", false, "one directional comparison from A to B; entries only in B are not reported")
|
||||
syncVerifyOptions.modifiedTimeAgo = cmdFilerSyncVerify.Flag.Duration("modifiedTimeAgo", 0, "only verify files modified before this duration ago (e.g. 1h) for sync-lag tolerance")
|
||||
syncVerifyOptions.jsonOutput = cmdFilerSyncVerify.Flag.Bool("jsonOutput", false, "emit NDJSON output (one JSON object per line) for external tooling")
|
||||
}
|
||||
|
||||
var cmdFilerSyncVerify = &Command{
|
||||
UsageLine: "filer.sync.verify -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
|
||||
Short: "compare entries between two filers and report differences",
|
||||
Long: `compare entries between two filers and report differences, then exit.
|
||||
|
||||
Useful for validating active/passive sync targets agree with the source.
|
||||
Reports MISSING (in A but not in B), ONLY_IN_B (in B but not in A; suppressed
|
||||
in active-passive mode), SIZE_MISMATCH, and ETAG_MISMATCH. Honors
|
||||
-modifiedTimeAgo to skip recently-modified files (sync-lag tolerance) and
|
||||
-isActivePassive for unidirectional comparison.
|
||||
|
||||
Exits with code 0 on agreement, 2 on differences or operational errors.
|
||||
|
||||
`,
|
||||
}
|
||||
|
||||
func runFilerSyncVerify(cmd *Command, args []string) bool {
|
||||
util.LoadSecurityConfiguration()
|
||||
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
|
||||
|
||||
grpcDialOptionA := grpcDialOption
|
||||
grpcDialOptionB := grpcDialOption
|
||||
if *syncVerifyOptions.aSecurity != "" {
|
||||
var err error
|
||||
if grpcDialOptionA, err = security.LoadClientTLSFromFile(*syncVerifyOptions.aSecurity, "grpc.client"); err != nil {
|
||||
glog.Fatalf("load security config for filer A: %v", err)
|
||||
}
|
||||
}
|
||||
if *syncVerifyOptions.bSecurity != "" {
|
||||
var err error
|
||||
if grpcDialOptionB, err = security.LoadClientTLSFromFile(*syncVerifyOptions.bSecurity, "grpc.client"); err != nil {
|
||||
glog.Fatalf("load security config for filer B: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
filerA := pb.ServerAddress(*syncVerifyOptions.filerA)
|
||||
filerB := pb.ServerAddress(*syncVerifyOptions.filerB)
|
||||
|
||||
if err := runVerifySync(filerA, filerB, *syncVerifyOptions.aPath, *syncVerifyOptions.bPath,
|
||||
*syncVerifyOptions.isActivePassive, *syncVerifyOptions.modifiedTimeAgo,
|
||||
*syncVerifyOptions.jsonOutput,
|
||||
grpcDialOptionA, grpcDialOptionB); err != nil {
|
||||
glog.Errorf("verify sync: %v", err)
|
||||
os.Exit(2)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// verifySyncConcurrency caps concurrent directory I/O across the entire
|
||||
// recursive walk. A single shared semaphore is created in runVerifySync and
|
||||
// passed down so the limit applies globally — a per-call semaphore would only
|
||||
// cap concurrency per directory level, allowing fanout to grow as
|
||||
// verifySyncConcurrency^depth on deep trees.
|
||||
//
|
||||
// Trade-off: higher values reduce wall time on wide trees by parallelizing
|
||||
// listEntries RPCs, at the cost of more concurrent load on both filers and
|
||||
// more memory from queued goroutines (each waiting goroutine ~2KB stack).
|
||||
// Each compareDirectory holds a slot only for its own listing+compare phase
|
||||
// and releases before recursing, so a parent never blocks waiting for
|
||||
// children to acquire slots.
|
||||
const verifySyncConcurrency = 5
|
||||
|
||||
// isTooRecent reports whether entry's mtime is past cutoff (sync-lag tolerance).
|
||||
// Returns false when cutoff is zero or attributes are missing.
|
||||
func isTooRecent(entry *filer_pb.Entry, cutoff time.Time) bool {
|
||||
return !cutoff.IsZero() && entry != nil && entry.Attributes != nil && entry.Attributes.Mtime > cutoff.Unix()
|
||||
}
|
||||
|
||||
type VerifyResult struct {
|
||||
dirCount atomic.Int64
|
||||
fileCount atomic.Int64
|
||||
missingCount atomic.Int64
|
||||
sizeMismatch atomic.Int64
|
||||
etagMismatch atomic.Int64
|
||||
onlyInB atomic.Int64
|
||||
skippedRecent atomic.Int64
|
||||
|
||||
// outputMu serializes writes to stdout. Multiple goroutines call
|
||||
// reportDiff concurrently from compareDirectory worker pool.
|
||||
outputMu sync.Mutex
|
||||
jsonOutput bool
|
||||
}
|
||||
|
||||
type verifyDiffType int
|
||||
|
||||
const (
|
||||
diffMissing verifyDiffType = iota // in A but not in B
|
||||
diffOnlyInB // in B but not in A
|
||||
diffSizeMismatch // size differs
|
||||
diffETagMismatch // etag differs
|
||||
)
|
||||
|
||||
// diffRecord is the JSON Lines schema for a single diff entry.
|
||||
type diffRecord struct {
|
||||
Type string `json:"type"`
|
||||
Path string `json:"path"`
|
||||
IsDirectory bool `json:"isDirectory,omitempty"`
|
||||
A *entryRecord `json:"a,omitempty"`
|
||||
B *entryRecord `json:"b,omitempty"`
|
||||
MtimeRelation string `json:"mtimeRelation,omitempty"` // EQUAL | A_NEWER | B_NEWER
|
||||
MtimeDelta string `json:"mtimeDelta,omitempty"` // human-readable, e.g. "5d", "12h"
|
||||
Hint string `json:"hint,omitempty"` // late_updates_skip_likely | sync_lag_or_event_miss
|
||||
}
|
||||
|
||||
type entryRecord struct {
|
||||
Size uint64 `json:"size"`
|
||||
Mtime int64 `json:"mtime"`
|
||||
ETag string `json:"etag,omitempty"`
|
||||
}
|
||||
|
||||
type summaryRecord struct {
|
||||
Type string `json:"type"`
|
||||
Directories int64 `json:"directories"`
|
||||
Files int64 `json:"files"`
|
||||
SkippedRecent int64 `json:"skippedRecent"`
|
||||
Missing int64 `json:"missing"`
|
||||
SizeMismatch int64 `json:"sizeMismatch"`
|
||||
ETagMismatch int64 `json:"etagMismatch"`
|
||||
OnlyInB int64 `json:"onlyInB"`
|
||||
TotalErrors int64 `json:"totalErrors"`
|
||||
}
|
||||
|
||||
// simpleFilerClient implements filer_pb.FilerClient for gRPC connections
|
||||
type simpleFilerClient struct {
|
||||
grpcAddress pb.ServerAddress
|
||||
grpcDialOption grpc.DialOption
|
||||
}
|
||||
|
||||
func (c *simpleFilerClient) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||
return pb.WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
|
||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||
return fn(client)
|
||||
}, c.grpcAddress.ToGrpcAddress(), false, c.grpcDialOption)
|
||||
}
|
||||
|
||||
func (c *simpleFilerClient) AdjustedUrl(location *filer_pb.Location) string {
|
||||
return location.Url
|
||||
}
|
||||
|
||||
func (c *simpleFilerClient) GetDataCenter() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func runVerifySync(filerA, filerB pb.ServerAddress, aPath, bPath string,
|
||||
isActivePassive bool, modifiedTimeAgo time.Duration,
|
||||
jsonOutput bool,
|
||||
grpcDialOptionA, grpcDialOptionB grpc.DialOption) error {
|
||||
|
||||
clientA := &simpleFilerClient{grpcAddress: filerA, grpcDialOption: grpcDialOptionA}
|
||||
clientB := &simpleFilerClient{grpcAddress: filerB, grpcDialOption: grpcDialOptionB}
|
||||
|
||||
var cutoffTime time.Time
|
||||
if modifiedTimeAgo > 0 {
|
||||
cutoffTime = time.Now().Add(-modifiedTimeAgo)
|
||||
}
|
||||
|
||||
if !jsonOutput {
|
||||
if !cutoffTime.IsZero() {
|
||||
fmt.Fprintf(os.Stdout, "Verifying files modified before %v (modifiedTimeAgo=%v)\n",
|
||||
cutoffTime.Format(time.RFC3339), modifiedTimeAgo)
|
||||
}
|
||||
fmt.Fprintf(os.Stdout, "Comparing %s%s => %s%s (isActivePassive=%v)\n\n",
|
||||
filerA, aPath, filerB, bPath, isActivePassive)
|
||||
}
|
||||
|
||||
result := &VerifyResult{jsonOutput: jsonOutput}
|
||||
ctx := context.Background()
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
|
||||
if err := compareDirectory(ctx, clientA, clientB, aPath, bPath, isActivePassive, cutoffTime, sem, result); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
totalErrors := result.missingCount.Load() + result.sizeMismatch.Load() + result.etagMismatch.Load()
|
||||
if !isActivePassive {
|
||||
totalErrors += result.onlyInB.Load()
|
||||
}
|
||||
|
||||
if jsonOutput {
|
||||
summary := summaryRecord{
|
||||
Type: "SUMMARY",
|
||||
Directories: result.dirCount.Load(),
|
||||
Files: result.fileCount.Load(),
|
||||
SkippedRecent: result.skippedRecent.Load(),
|
||||
Missing: result.missingCount.Load(),
|
||||
SizeMismatch: result.sizeMismatch.Load(),
|
||||
ETagMismatch: result.etagMismatch.Load(),
|
||||
OnlyInB: result.onlyInB.Load(),
|
||||
TotalErrors: totalErrors,
|
||||
}
|
||||
writeJSONLine(result, summary)
|
||||
} else {
|
||||
fmt.Fprintf(os.Stdout, "\nSummary:\n")
|
||||
fmt.Fprintf(os.Stdout, " Directories compared: %d\n", result.dirCount.Load())
|
||||
fmt.Fprintf(os.Stdout, " Files verified: %d\n", result.fileCount.Load())
|
||||
if result.skippedRecent.Load() > 0 {
|
||||
fmt.Fprintf(os.Stdout, " Skipped (too recent): %d\n", result.skippedRecent.Load())
|
||||
}
|
||||
fmt.Fprintf(os.Stdout, " Missing in B: %d\n", result.missingCount.Load())
|
||||
fmt.Fprintf(os.Stdout, " Size mismatch: %d\n", result.sizeMismatch.Load())
|
||||
fmt.Fprintf(os.Stdout, " ETag mismatch: %d\n", result.etagMismatch.Load())
|
||||
if !isActivePassive {
|
||||
fmt.Fprintf(os.Stdout, " Only in B: %d\n", result.onlyInB.Load())
|
||||
}
|
||||
fmt.Fprintf(os.Stdout, " Total errors: %d\n", totalErrors)
|
||||
}
|
||||
|
||||
if totalErrors > 0 {
|
||||
return fmt.Errorf("found %d differences", totalErrors)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// entryStream is a sorted, streaming view of a single directory's entries.
|
||||
// A background goroutine pages through the directory via ReadDirAllEntries
|
||||
// and forwards each entry to a buffered channel; the caller consumes entries
|
||||
// one at a time through peek/advance. Memory usage is O(channel buffer) —
|
||||
// independent of directory size — rather than O(total entries).
|
||||
type entryStream struct {
|
||||
ch <-chan *filer_pb.Entry
|
||||
head *filer_pb.Entry
|
||||
done bool
|
||||
err error // written before ch is closed; safe to read once done==true
|
||||
}
|
||||
|
||||
// newEntryStream starts the background goroutine. It exits when listing
|
||||
// completes, an error occurs, or ctx is cancelled; the channel is always
|
||||
// closed before exit so consumers do not block indefinitely.
|
||||
func newEntryStream(ctx context.Context, client filer_pb.FilerClient, dir string) *entryStream {
|
||||
ch := make(chan *filer_pb.Entry, 64)
|
||||
s := &entryStream{ch: ch}
|
||||
go func() {
|
||||
defer close(ch)
|
||||
s.err = filer_pb.ReadDirAllEntries(ctx, client, util.FullPath(dir), "",
|
||||
func(entry *filer_pb.Entry, isLast bool) error {
|
||||
select {
|
||||
case ch <- entry:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
})
|
||||
}()
|
||||
return s
|
||||
}
|
||||
|
||||
// peek returns the next entry without consuming it, or nil at end-of-stream.
|
||||
func (s *entryStream) peek() *filer_pb.Entry {
|
||||
if s.done {
|
||||
return nil
|
||||
}
|
||||
if s.head == nil {
|
||||
e, ok := <-s.ch
|
||||
if !ok {
|
||||
s.done = true
|
||||
return nil
|
||||
}
|
||||
s.head = e
|
||||
}
|
||||
return s.head
|
||||
}
|
||||
|
||||
// advance consumes and returns the next entry.
|
||||
func (s *entryStream) advance() *filer_pb.Entry {
|
||||
e := s.peek()
|
||||
s.head = nil
|
||||
return e
|
||||
}
|
||||
|
||||
func compareDirectory(ctx context.Context,
|
||||
clientA, clientB filer_pb.FilerClient,
|
||||
dirA, dirB string,
|
||||
isActivePassive bool,
|
||||
cutoffTime time.Time,
|
||||
sem chan struct{},
|
||||
result *VerifyResult) error {
|
||||
|
||||
// Hold a slot only for this directory's I/O phase (listings + merge).
|
||||
// Released before recursing so parents never block waiting for children
|
||||
// to acquire slots — see verifySyncConcurrency for the rationale.
|
||||
sem <- struct{}{}
|
||||
released := false
|
||||
releaseSlot := func() {
|
||||
if !released {
|
||||
released = true
|
||||
<-sem
|
||||
}
|
||||
}
|
||||
defer releaseSlot()
|
||||
|
||||
result.dirCount.Add(1)
|
||||
|
||||
// A child context ensures that stream goroutines are cancelled and their
|
||||
// channels are closed if compareDirectory returns early (e.g. on error).
|
||||
mergeCtx, cancelMerge := context.WithCancel(ctx)
|
||||
defer cancelMerge()
|
||||
|
||||
streamA := newEntryStream(mergeCtx, clientA, dirA)
|
||||
streamB := newEntryStream(mergeCtx, clientB, dirB)
|
||||
|
||||
// collect subdirectories for recursive comparison
|
||||
type dirPair struct{ a, b string }
|
||||
var subDirs []dirPair
|
||||
|
||||
for streamA.peek() != nil || streamB.peek() != nil {
|
||||
eA := streamA.peek()
|
||||
eB := streamB.peek()
|
||||
|
||||
switch {
|
||||
case eA != nil && (eB == nil || eA.Name < eB.Name):
|
||||
// entry only in A
|
||||
entryA := streamA.advance()
|
||||
if entryA.IsDirectory {
|
||||
// Always recurse for missing-in-B directories: a recent
|
||||
// child write can bump the parent's mtime even though
|
||||
// older missing files exist underneath. The cutoff is
|
||||
// applied per-file inside countMissingRecursive.
|
||||
reportDiff(diffMissing, dirA, entryA, nil, result)
|
||||
countMissingRecursive(ctx, clientA, path.Join(dirA, entryA.Name), cutoffTime, result)
|
||||
} else if isTooRecent(entryA, cutoffTime) {
|
||||
result.skippedRecent.Add(1)
|
||||
} else {
|
||||
reportDiff(diffMissing, dirA, entryA, nil, result)
|
||||
}
|
||||
|
||||
case eB != nil && (eA == nil || eB.Name < eA.Name):
|
||||
// entry only in B
|
||||
entryB := streamB.advance()
|
||||
if !isActivePassive {
|
||||
if isTooRecent(entryB, cutoffTime) {
|
||||
result.skippedRecent.Add(1)
|
||||
} else {
|
||||
reportDiff(diffOnlyInB, dirB, entryB, nil, result)
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
// same name in both
|
||||
entryA := streamA.advance()
|
||||
entryB := streamB.advance()
|
||||
|
||||
if entryA.IsDirectory && entryB.IsDirectory {
|
||||
subDirs = append(subDirs, dirPair{
|
||||
a: path.Join(dirA, entryA.Name),
|
||||
b: path.Join(dirB, entryB.Name),
|
||||
})
|
||||
} else if !entryA.IsDirectory && !entryB.IsDirectory {
|
||||
// Skip if either side was modified recently (sync-lag tolerance).
|
||||
if isTooRecent(entryA, cutoffTime) || isTooRecent(entryB, cutoffTime) {
|
||||
result.skippedRecent.Add(1)
|
||||
} else {
|
||||
compareEntries(dirA, entryA, entryB, result)
|
||||
}
|
||||
} else {
|
||||
// type mismatch: one is dir, other is file
|
||||
reportDiff(diffMissing, dirA, entryA, nil, result)
|
||||
if !isActivePassive {
|
||||
reportDiff(diffOnlyInB, dirB, entryB, nil, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Both channels are closed: close happens-before the receive of the zero
|
||||
// value, so stream.err is visible here without additional synchronisation.
|
||||
if err := streamA.err; err != nil && err != context.Canceled {
|
||||
return fmt.Errorf("list %s on filer A: %v", dirA, err)
|
||||
}
|
||||
if err := streamB.err; err != nil && err != context.Canceled {
|
||||
return fmt.Errorf("list %s on filer B: %v", dirB, err)
|
||||
}
|
||||
|
||||
// Release our slot before recursing so children can acquire it. Holding
|
||||
// it across wg.Wait would deadlock once depth exceeds verifySyncConcurrency.
|
||||
releaseSlot()
|
||||
|
||||
if len(subDirs) > 0 {
|
||||
// Bounded worker pool: cap goroutines per directory level instead
|
||||
// of spawning one per child. A directory with thousands of subdirs
|
||||
// would otherwise park ~2KB per waiting goroutine even though
|
||||
// only `verifySyncConcurrency` can do I/O at once.
|
||||
workers := verifySyncConcurrency
|
||||
if len(subDirs) < workers {
|
||||
workers = len(subDirs)
|
||||
}
|
||||
jobs := make(chan dirPair, len(subDirs))
|
||||
errCh := make(chan error, 1) // first error wins; others dropped
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for pair := range jobs {
|
||||
if err := compareDirectory(ctx, clientA, clientB, pair.a, pair.b, isActivePassive, cutoffTime, sem, result); err != nil {
|
||||
select {
|
||||
case errCh <- err:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
for _, pair := range subDirs {
|
||||
jobs <- pair
|
||||
}
|
||||
close(jobs)
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
if err := <-errCh; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
func compareEntries(dir string, entryA, entryB *filer_pb.Entry, result *VerifyResult) {
|
||||
result.fileCount.Add(1)
|
||||
|
||||
sizeA := filer.FileSize(entryA)
|
||||
sizeB := filer.FileSize(entryB)
|
||||
if sizeA != sizeB {
|
||||
reportDiff(diffSizeMismatch, dir, entryA, entryB, result)
|
||||
return
|
||||
}
|
||||
|
||||
etagA := filer.ETag(entryA)
|
||||
etagB := filer.ETag(entryB)
|
||||
if etagA != etagB {
|
||||
reportDiff(diffETagMismatch, dir, entryA, entryB, result)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// mtimeRelation classifies B.mtime vs A.mtime. Both entries must be non-nil.
|
||||
// Returns relation, absolute delta in seconds, and human-readable string.
|
||||
func mtimeRelation(entryA, entryB *filer_pb.Entry) (relation, deltaStr string, deltaSec int64) {
|
||||
if entryA == nil || entryB == nil || entryA.Attributes == nil || entryB.Attributes == nil {
|
||||
return "", "", 0
|
||||
}
|
||||
delta := entryB.Attributes.Mtime - entryA.Attributes.Mtime
|
||||
abs := delta
|
||||
if abs < 0 {
|
||||
abs = -abs
|
||||
}
|
||||
switch {
|
||||
case delta == 0:
|
||||
return "EQUAL", "0s", 0
|
||||
case delta > 0:
|
||||
return "B_NEWER", formatSeconds(abs), abs
|
||||
default:
|
||||
return "A_NEWER", formatSeconds(abs), abs
|
||||
}
|
||||
}
|
||||
|
||||
func formatSeconds(s int64) string {
|
||||
switch {
|
||||
case s < 60:
|
||||
return fmt.Sprintf("%ds", s)
|
||||
case s < 3600:
|
||||
return fmt.Sprintf("%dm", s/60)
|
||||
case s < 86400:
|
||||
return fmt.Sprintf("%dh", s/3600)
|
||||
default:
|
||||
return fmt.Sprintf("%dd", s/86400)
|
||||
}
|
||||
}
|
||||
|
||||
// hintFor returns an automatic interpretation hint based on mtime relation.
|
||||
// Only emitted for SIZE_MISMATCH and ETAG_MISMATCH where both entries exist.
|
||||
func hintFor(relation string) string {
|
||||
switch relation {
|
||||
case "B_NEWER":
|
||||
return "late_updates_skip_likely"
|
||||
case "A_NEWER":
|
||||
return "sync_lag_or_event_miss"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func reportDiff(diffType verifyDiffType, dir string, entryA, entryB *filer_pb.Entry, result *VerifyResult) {
|
||||
switch diffType {
|
||||
case diffMissing:
|
||||
result.missingCount.Add(1)
|
||||
case diffOnlyInB:
|
||||
result.onlyInB.Add(1)
|
||||
case diffSizeMismatch:
|
||||
result.sizeMismatch.Add(1)
|
||||
case diffETagMismatch:
|
||||
result.etagMismatch.Add(1)
|
||||
}
|
||||
|
||||
if result.jsonOutput {
|
||||
writeJSONDiff(result, diffType, dir, entryA, entryB)
|
||||
} else {
|
||||
writeTextDiff(result, diffType, dir, entryA, entryB)
|
||||
}
|
||||
}
|
||||
|
||||
func writeTextDiff(result *VerifyResult, diffType verifyDiffType, dir string, entryA, entryB *filer_pb.Entry) {
|
||||
entryPath := path.Join(dir, entryA.Name)
|
||||
|
||||
result.outputMu.Lock()
|
||||
defer result.outputMu.Unlock()
|
||||
|
||||
switch diffType {
|
||||
case diffMissing:
|
||||
if entryA.IsDirectory {
|
||||
fmt.Fprintf(os.Stdout, "[MISSING] %s/ (directory)\n", entryPath)
|
||||
} else {
|
||||
fmt.Fprintf(os.Stdout, "[MISSING] %s (size=%d, etag=%s)\n",
|
||||
entryPath, filer.FileSize(entryA), filer.ETag(entryA))
|
||||
}
|
||||
case diffOnlyInB:
|
||||
fmt.Fprintf(os.Stdout, "[ONLY_IN_B] %s\n", entryPath)
|
||||
case diffSizeMismatch:
|
||||
ann := annotation(entryA, entryB)
|
||||
fmt.Fprintf(os.Stdout, "[SIZE_MISMATCH] %s (a=%d, b=%d%s)\n",
|
||||
entryPath, filer.FileSize(entryA), filer.FileSize(entryB), ann)
|
||||
case diffETagMismatch:
|
||||
ann := annotation(entryA, entryB)
|
||||
fmt.Fprintf(os.Stdout, "[ETAG_MISMATCH] %s (a=%s, b=%s%s)\n",
|
||||
entryPath, filer.ETag(entryA), filer.ETag(entryB), ann)
|
||||
}
|
||||
}
|
||||
|
||||
// annotation builds the trailing ", mtime: ... [hint]" segment for text output.
|
||||
// Returns empty string if entries are unavailable.
|
||||
func annotation(entryA, entryB *filer_pb.Entry) string {
|
||||
relation, delta, _ := mtimeRelation(entryA, entryB)
|
||||
if relation == "" {
|
||||
return ""
|
||||
}
|
||||
switch relation {
|
||||
case "EQUAL":
|
||||
return ", mtime equal [chunk-level issue]"
|
||||
case "B_NEWER":
|
||||
return fmt.Sprintf(", B newer +%s [late-updates skip likely]", delta)
|
||||
case "A_NEWER":
|
||||
return fmt.Sprintf(", A newer +%s [sync lag or event miss]", delta)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func writeJSONDiff(result *VerifyResult, diffType verifyDiffType, dir string, entryA, entryB *filer_pb.Entry) {
|
||||
rec := diffRecord{Path: path.Join(dir, entryA.Name)}
|
||||
|
||||
switch diffType {
|
||||
case diffMissing:
|
||||
rec.Type = "MISSING"
|
||||
rec.IsDirectory = entryA.IsDirectory
|
||||
rec.A = toEntryRecord(entryA)
|
||||
case diffOnlyInB:
|
||||
rec.Type = "ONLY_IN_B"
|
||||
// for diffOnlyInB the existing convention passes the entry as entryA
|
||||
rec.IsDirectory = entryA.IsDirectory
|
||||
rec.B = toEntryRecord(entryA)
|
||||
case diffSizeMismatch:
|
||||
rec.Type = "SIZE_MISMATCH"
|
||||
rec.A = toEntryRecord(entryA)
|
||||
rec.B = toEntryRecord(entryB)
|
||||
relation, delta, _ := mtimeRelation(entryA, entryB)
|
||||
rec.MtimeRelation = relation
|
||||
rec.MtimeDelta = delta
|
||||
rec.Hint = hintFor(relation)
|
||||
case diffETagMismatch:
|
||||
rec.Type = "ETAG_MISMATCH"
|
||||
rec.A = toEntryRecord(entryA)
|
||||
rec.B = toEntryRecord(entryB)
|
||||
relation, delta, _ := mtimeRelation(entryA, entryB)
|
||||
rec.MtimeRelation = relation
|
||||
rec.MtimeDelta = delta
|
||||
rec.Hint = hintFor(relation)
|
||||
}
|
||||
|
||||
writeJSONLine(result, rec)
|
||||
}
|
||||
|
||||
func toEntryRecord(entry *filer_pb.Entry) *entryRecord {
|
||||
if entry == nil {
|
||||
return nil
|
||||
}
|
||||
r := &entryRecord{
|
||||
Size: filer.FileSize(entry),
|
||||
ETag: filer.ETag(entry),
|
||||
}
|
||||
if entry.Attributes != nil {
|
||||
r.Mtime = entry.Attributes.Mtime
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// writeJSONLine emits a single JSON object followed by newline. Holds outputMu
|
||||
// across marshal+write so concurrent goroutines never interleave.
|
||||
func writeJSONLine(result *VerifyResult, v any) {
|
||||
data, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
glog.Warningf("marshal verify record: %v", err)
|
||||
return
|
||||
}
|
||||
result.outputMu.Lock()
|
||||
defer result.outputMu.Unlock()
|
||||
os.Stdout.Write(data)
|
||||
os.Stdout.Write([]byte{'\n'})
|
||||
}
|
||||
|
||||
func countMissingRecursive(ctx context.Context, client filer_pb.FilerClient, dir string, cutoffTime time.Time, result *VerifyResult) {
|
||||
err := filer_pb.ReadDirAllEntries(ctx, client, util.FullPath(dir), "",
|
||||
func(entry *filer_pb.Entry, isLast bool) error {
|
||||
if entry.IsDirectory {
|
||||
countMissingRecursive(ctx, client, path.Join(dir, entry.Name), cutoffTime, result)
|
||||
return nil
|
||||
}
|
||||
if !cutoffTime.IsZero() && entry.Attributes != nil && entry.Attributes.Mtime > cutoffTime.Unix() {
|
||||
result.skippedRecent.Add(1)
|
||||
return nil
|
||||
}
|
||||
reportDiff(diffMissing, dir, entry, nil, result)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
glog.Warningf("list missing directory %s: %v", dir, err)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,529 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// --- stream / inner-client / outer-client mocks ---
|
||||
|
||||
type verifyTestStream struct {
|
||||
entries []*filer_pb.Entry
|
||||
idx int
|
||||
}
|
||||
|
||||
func (s *verifyTestStream) Recv() (*filer_pb.ListEntriesResponse, error) {
|
||||
if s.idx >= len(s.entries) {
|
||||
return nil, io.EOF
|
||||
}
|
||||
resp := &filer_pb.ListEntriesResponse{Entry: s.entries[s.idx]}
|
||||
s.idx++
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *verifyTestStream) Header() (metadata.MD, error) { return metadata.MD{}, nil }
|
||||
func (s *verifyTestStream) Trailer() metadata.MD { return metadata.MD{} }
|
||||
func (s *verifyTestStream) CloseSend() error { return nil }
|
||||
func (s *verifyTestStream) Context() context.Context { return context.Background() }
|
||||
func (s *verifyTestStream) SendMsg(_ any) error { return nil }
|
||||
func (s *verifyTestStream) RecvMsg(_ any) error { return nil }
|
||||
|
||||
// verifyTestInnerClient is the SeaweedFilerClient passed to fn inside WithFilerClient.
|
||||
type verifyTestInnerClient struct {
|
||||
filer_pb.SeaweedFilerClient // embed for unimplemented RPCs
|
||||
entriesByDir map[string][]*filer_pb.Entry
|
||||
}
|
||||
|
||||
func (c *verifyTestInnerClient) ListEntries(_ context.Context, in *filer_pb.ListEntriesRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[filer_pb.ListEntriesResponse], error) {
|
||||
return &verifyTestStream{entries: c.entriesByDir[in.Directory]}, nil
|
||||
}
|
||||
|
||||
// verifyTestFilerClient implements filer_pb.FilerClient and tracks concurrent
|
||||
// WithFilerClient invocations to let tests verify the global concurrency bound.
|
||||
type verifyTestFilerClient struct {
|
||||
entriesByDir map[string][]*filer_pb.Entry
|
||||
inFlight int64 // accessed via atomic
|
||||
peakFlight int64 // accessed via atomic
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
func (c *verifyTestFilerClient) WithFilerClient(_ bool, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||
// track peak concurrent in-flight listings
|
||||
n := atomic.AddInt64(&c.inFlight, 1)
|
||||
defer atomic.AddInt64(&c.inFlight, -1)
|
||||
for {
|
||||
peak := atomic.LoadInt64(&c.peakFlight)
|
||||
if n <= peak || atomic.CompareAndSwapInt64(&c.peakFlight, peak, n) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if c.delay > 0 {
|
||||
time.Sleep(c.delay)
|
||||
}
|
||||
return fn(&verifyTestInnerClient{entriesByDir: c.entriesByDir})
|
||||
}
|
||||
|
||||
func (c *verifyTestFilerClient) AdjustedUrl(_ *filer_pb.Location) string { return "" }
|
||||
func (c *verifyTestFilerClient) GetDataCenter() string { return "" }
|
||||
|
||||
// --- entry helpers ---
|
||||
|
||||
func verifyFileEntry(name string, size uint64) *filer_pb.Entry {
|
||||
return &filer_pb.Entry{
|
||||
Name: name,
|
||||
Attributes: &filer_pb.FuseAttributes{FileSize: size},
|
||||
}
|
||||
}
|
||||
|
||||
func verifyDirEntry(name string) *filer_pb.Entry {
|
||||
return &filer_pb.Entry{Name: name, IsDirectory: true}
|
||||
}
|
||||
|
||||
// --- tests ---
|
||||
|
||||
// TestVerifySyncMissingFile confirms that a file present in A but absent in B
|
||||
// is counted as missing.
|
||||
func TestVerifySyncMissingFile(t *testing.T) {
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/root": {verifyFileEntry("file.txt", 100)},
|
||||
},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/root": {},
|
||||
},
|
||||
}
|
||||
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/root", "/root", false, time.Time{}, sem, result)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.missingCount.Load(); got != 1 {
|
||||
t.Errorf("missingCount = %d, want 1", got)
|
||||
}
|
||||
if got := result.sizeMismatch.Load(); got != 0 {
|
||||
t.Errorf("sizeMismatch = %d, want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestVerifySyncOnlyInB confirms that a file present only in B is counted
|
||||
// (non-active-passive mode) or ignored (active-passive mode).
|
||||
func TestVerifySyncOnlyInB(t *testing.T) {
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/root": {}},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/root": {verifyFileEntry("extra.txt", 50)},
|
||||
},
|
||||
}
|
||||
|
||||
t.Run("bidirectional", func(t *testing.T) {
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/root", "/root", false, time.Time{}, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.onlyInB.Load(); got != 1 {
|
||||
t.Errorf("onlyInB = %d, want 1", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("active-passive ignores onlyInB", func(t *testing.T) {
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/root", "/root", true, time.Time{}, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.onlyInB.Load(); got != 0 {
|
||||
t.Errorf("onlyInB = %d, want 0 in active-passive mode", got)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestVerifySyncSizeMismatch confirms that a file with differing sizes is
|
||||
// counted as a size mismatch and not as missing.
|
||||
func TestVerifySyncSizeMismatch(t *testing.T) {
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/root": {verifyFileEntry("data.bin", 1024)},
|
||||
},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/root": {verifyFileEntry("data.bin", 512)},
|
||||
},
|
||||
}
|
||||
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/root", "/root", false, time.Time{}, sem, result)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.sizeMismatch.Load(); got != 1 {
|
||||
t.Errorf("sizeMismatch = %d, want 1", got)
|
||||
}
|
||||
if got := result.missingCount.Load(); got != 0 {
|
||||
t.Errorf("missingCount = %d, want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestVerifySyncConcurrencyBound verifies that the shared semaphore keeps peak
|
||||
// concurrent filer listings at or below verifySyncConcurrency at all times.
|
||||
// A 5ms delay per WithFilerClient call makes the concurrency overlap observable.
|
||||
func TestVerifySyncConcurrencyBound(t *testing.T) {
|
||||
// Wide, shallow tree: root with 20 identical subdirectories.
|
||||
const fanout = 20
|
||||
entriesA := make(map[string][]*filer_pb.Entry)
|
||||
entriesB := make(map[string][]*filer_pb.Entry)
|
||||
|
||||
rootDirs := make([]*filer_pb.Entry, fanout)
|
||||
for i := range fanout {
|
||||
name := fmt.Sprintf("sub%02d", i)
|
||||
rootDirs[i] = verifyDirEntry(name)
|
||||
entriesA["/root/"+name] = []*filer_pb.Entry{verifyFileEntry("f.txt", 10)}
|
||||
entriesB["/root/"+name] = []*filer_pb.Entry{verifyFileEntry("f.txt", 10)}
|
||||
}
|
||||
entriesA["/root"] = rootDirs
|
||||
entriesB["/root"] = rootDirs
|
||||
|
||||
clientA := &verifyTestFilerClient{entriesByDir: entriesA, delay: 5 * time.Millisecond}
|
||||
clientB := &verifyTestFilerClient{entriesByDir: entriesB, delay: 5 * time.Millisecond}
|
||||
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/root", "/root", false, time.Time{}, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if result.missingCount.Load() != 0 || result.sizeMismatch.Load() != 0 {
|
||||
t.Errorf("unexpected diffs in identical tree")
|
||||
}
|
||||
|
||||
if peak := atomic.LoadInt64(&clientA.peakFlight); peak > verifySyncConcurrency {
|
||||
t.Errorf("clientA peak concurrent listings = %d, want ≤ %d (verifySyncConcurrency)",
|
||||
peak, verifySyncConcurrency)
|
||||
}
|
||||
if peak := atomic.LoadInt64(&clientB.peakFlight); peak > verifySyncConcurrency {
|
||||
t.Errorf("clientB peak concurrent listings = %d, want ≤ %d (verifySyncConcurrency)",
|
||||
peak, verifySyncConcurrency)
|
||||
}
|
||||
}
|
||||
|
||||
// TestVerifySyncETagMismatch confirms that two files with the same size but
|
||||
// different Md5 checksums are counted as an ETag mismatch, not a size mismatch.
|
||||
func TestVerifySyncETagMismatch(t *testing.T) {
|
||||
newEntry := func(name string, md5 []byte) *filer_pb.Entry {
|
||||
return &filer_pb.Entry{
|
||||
Name: name,
|
||||
Attributes: &filer_pb.FuseAttributes{
|
||||
FileSize: 100,
|
||||
Md5: md5,
|
||||
},
|
||||
}
|
||||
}
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/root": {newEntry("data.bin", []byte{0x11, 0x22, 0x33})},
|
||||
},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/root": {newEntry("data.bin", []byte{0x44, 0x55, 0x66})},
|
||||
},
|
||||
}
|
||||
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/root", "/root", false, time.Time{}, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.etagMismatch.Load(); got != 1 {
|
||||
t.Errorf("etagMismatch = %d, want 1", got)
|
||||
}
|
||||
if got := result.sizeMismatch.Load(); got != 0 {
|
||||
t.Errorf("sizeMismatch = %d, want 0 (same size should not trigger size mismatch)", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestVerifySyncCutoffTime verifies that entries newer than cutoffTime are
|
||||
// skipped in both the A-only (MISSING) and B-only (ONLY_IN_B) branches.
|
||||
func TestVerifySyncCutoffTime(t *testing.T) {
|
||||
cutoff := time.Unix(1000, 0)
|
||||
|
||||
recentEntry := func(name string) *filer_pb.Entry {
|
||||
return &filer_pb.Entry{
|
||||
Name: name,
|
||||
Attributes: &filer_pb.FuseAttributes{FileSize: 10, Mtime: 2000}, // > cutoff
|
||||
}
|
||||
}
|
||||
oldEntry := func(name string) *filer_pb.Entry {
|
||||
return &filer_pb.Entry{
|
||||
Name: name,
|
||||
Attributes: &filer_pb.FuseAttributes{FileSize: 10, Mtime: 500}, // < cutoff
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("A-only recent file is skipped, not reported missing", func(t *testing.T) {
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {recentEntry("new.txt")}},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {}},
|
||||
}
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/", "/", false, cutoff, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.skippedRecent.Load(); got != 1 {
|
||||
t.Errorf("skippedRecent = %d, want 1", got)
|
||||
}
|
||||
if got := result.missingCount.Load(); got != 0 {
|
||||
t.Errorf("missingCount = %d, want 0 (recent file should be skipped)", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("A-only old file is reported missing", func(t *testing.T) {
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {oldEntry("old.txt")}},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {}},
|
||||
}
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/", "/", false, cutoff, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.missingCount.Load(); got != 1 {
|
||||
t.Errorf("missingCount = %d, want 1", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("B-only recent file is skipped, not reported as ONLY_IN_B", func(t *testing.T) {
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {}},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {recentEntry("new.txt")}},
|
||||
}
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/", "/", false, cutoff, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.skippedRecent.Load(); got != 1 {
|
||||
t.Errorf("skippedRecent = %d, want 1", got)
|
||||
}
|
||||
if got := result.onlyInB.Load(); got != 0 {
|
||||
t.Errorf("onlyInB = %d, want 0 (recent B-only file should be skipped)", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("B-only old file is reported as ONLY_IN_B", func(t *testing.T) {
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {}},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {oldEntry("old.txt")}},
|
||||
}
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/", "/", false, cutoff, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.onlyInB.Load(); got != 1 {
|
||||
t.Errorf("onlyInB = %d, want 1", got)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestVerifySyncCutoffMatchedFileBSideRecent verifies that when matched-name
|
||||
// files differ but only the B side is recently modified, the comparison is
|
||||
// skipped (sync-lag tolerance) rather than reporting a spurious mismatch.
|
||||
func TestVerifySyncCutoffMatchedFileBSideRecent(t *testing.T) {
|
||||
cutoff := time.Unix(1000, 0)
|
||||
|
||||
entry := func(size uint64, mtime int64) *filer_pb.Entry {
|
||||
return &filer_pb.Entry{
|
||||
Name: "data.bin",
|
||||
Attributes: &filer_pb.FuseAttributes{FileSize: size, Mtime: mtime},
|
||||
}
|
||||
}
|
||||
|
||||
// A is old (size 100), B is recently rewritten with a different size.
|
||||
// Without the B-side cutoff check this would surface as SIZE_MISMATCH.
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {entry(100, 500)}},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{"/": {entry(200, 2000)}},
|
||||
}
|
||||
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/", "/", false, cutoff, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if got := result.skippedRecent.Load(); got != 1 {
|
||||
t.Errorf("skippedRecent = %d, want 1 (B-side recent should skip)", got)
|
||||
}
|
||||
if got := result.sizeMismatch.Load(); got != 0 {
|
||||
t.Errorf("sizeMismatch = %d, want 0 (recent B should not surface as mismatch)", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestVerifySyncMissingDirRecursesEvenWithRecentMtime verifies that a
|
||||
// directory missing in B with a recent mtime still has its subtree walked,
|
||||
// so older missing files inside are reported. A recent child write can bump
|
||||
// the parent mtime even though older missing files exist underneath.
|
||||
func TestVerifySyncMissingDirRecursesEvenWithRecentMtime(t *testing.T) {
|
||||
cutoff := time.Unix(1000, 0)
|
||||
|
||||
recentDir := &filer_pb.Entry{
|
||||
Name: "subdir",
|
||||
IsDirectory: true,
|
||||
Attributes: &filer_pb.FuseAttributes{Mtime: 2000}, // > cutoff
|
||||
}
|
||||
oldChild := &filer_pb.Entry{
|
||||
Name: "old.txt",
|
||||
Attributes: &filer_pb.FuseAttributes{FileSize: 10, Mtime: 500}, // < cutoff
|
||||
}
|
||||
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/": {recentDir},
|
||||
"/subdir": {oldChild},
|
||||
},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/": {},
|
||||
},
|
||||
}
|
||||
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/", "/", false, cutoff, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
// Expect: directory MISSING + recursed-old-file MISSING = 2 missing.
|
||||
if got := result.missingCount.Load(); got != 2 {
|
||||
t.Errorf("missingCount = %d, want 2 (recent dir + old child inside)", got)
|
||||
}
|
||||
if got := result.skippedRecent.Load(); got != 0 {
|
||||
t.Errorf("skippedRecent = %d, want 0 (dir mtime should not gate recursion)", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestVerifySyncRootPath is a regression test for the path.Join fix.
|
||||
// fmt.Sprintf("%s/%s", "/", name) produced "//name"; path.Join produces "/name".
|
||||
// This test walks from "/" and verifies the child directory is found and
|
||||
// compared correctly (not silently skipped due to a malformed path).
|
||||
func TestVerifySyncRootPath(t *testing.T) {
|
||||
clientA := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/": {verifyDirEntry("data")},
|
||||
"/data": {verifyFileEntry("file.txt", 42)},
|
||||
},
|
||||
}
|
||||
clientB := &verifyTestFilerClient{
|
||||
entriesByDir: map[string][]*filer_pb.Entry{
|
||||
"/": {verifyDirEntry("data")},
|
||||
"/data": {verifyFileEntry("file.txt", 42)},
|
||||
},
|
||||
}
|
||||
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
if err := compareDirectory(context.Background(), clientA, clientB,
|
||||
"/", "/", false, time.Time{}, sem, result); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if result.missingCount.Load() != 0 || result.sizeMismatch.Load() != 0 {
|
||||
t.Errorf("identical trees from root should have no diffs: missing=%d size=%d",
|
||||
result.missingCount.Load(), result.sizeMismatch.Load())
|
||||
}
|
||||
// 2 directories traversed: "/" and "/data"
|
||||
if got := result.dirCount.Load(); got != 2 {
|
||||
t.Errorf("dirCount = %d, want 2 (root + /data)", got)
|
||||
}
|
||||
// 1 file compared: /data/file.txt
|
||||
if got := result.fileCount.Load(); got != 1 {
|
||||
t.Errorf("fileCount = %d, want 1", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestVerifySyncNoDeadlockDeepTree ensures that a tree deeper than
|
||||
// verifySyncConcurrency completes without deadlocking. With a per-call
|
||||
// semaphore the walk would still complete (just with unbounded goroutines);
|
||||
// this test mainly guards that the shared-semaphore release-before-recurse
|
||||
// invariant holds — i.e. the walk finishes within the timeout.
|
||||
func TestVerifySyncNoDeadlockDeepTree(t *testing.T) {
|
||||
// Build a binary tree of depth 10 (well past verifySyncConcurrency=5).
|
||||
const depth = 10
|
||||
entriesA := make(map[string][]*filer_pb.Entry)
|
||||
entriesB := make(map[string][]*filer_pb.Entry)
|
||||
|
||||
var buildTree func(path string, d int)
|
||||
buildTree = func(path string, d int) {
|
||||
if d == 0 {
|
||||
entriesA[path] = []*filer_pb.Entry{verifyFileEntry("leaf.txt", 1)}
|
||||
entriesB[path] = []*filer_pb.Entry{verifyFileEntry("leaf.txt", 1)}
|
||||
return
|
||||
}
|
||||
children := []*filer_pb.Entry{verifyDirEntry("left"), verifyDirEntry("right")}
|
||||
entriesA[path] = children
|
||||
entriesB[path] = children
|
||||
buildTree(path+"/left", d-1)
|
||||
buildTree(path+"/right", d-1)
|
||||
}
|
||||
buildTree("/root", depth)
|
||||
|
||||
clientA := &verifyTestFilerClient{entriesByDir: entriesA}
|
||||
clientB := &verifyTestFilerClient{entriesByDir: entriesB}
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
result := &VerifyResult{}
|
||||
sem := make(chan struct{}, verifySyncConcurrency)
|
||||
done <- compareDirectory(context.Background(), clientA, clientB,
|
||||
"/root", "/root", false, time.Time{}, sem, result)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("compareDirectory did not complete within 10s — possible deadlock")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user