diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 550c59d6c..3f0742fbd 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -29,7 +29,7 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) -func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, path string, sourceMtimeNs int64) (replicatedChunks []*filer_pb.FileChunk, err error) { if len(sourceChunks) == 0 { return } @@ -76,7 +76,7 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_ } if sourceChunk.IsChunkManifest { - replicatedChunk, replicateErr := fs.replicateOneManifestChunk(ctx, sourceChunk, path, sourceMtime) + replicatedChunk, replicateErr := fs.replicateOneManifestChunk(ctx, sourceChunk, path, sourceMtimeNs) if replicateErr != nil { setError(replicateErr) break @@ -94,7 +94,7 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_ defer wg.Done() var replicatedChunk *filer_pb.FileChunk retryErr := util.Retry("replicate chunks", func() error { - chunk, e := fs.replicateOneChunk(source, path, sourceMtime) + chunk, e := fs.replicateOneChunk(source, path, sourceMtimeNs) if e != nil { return e } @@ -117,9 +117,9 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_ return } -func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) { +func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string, sourceMtimeNs int64) (*filer_pb.FileChunk, error) { - fileId, err := fs.fetchAndWrite(sourceChunk, path, sourceMtime) + fileId, err := fs.fetchAndWrite(sourceChunk, path, sourceMtimeNs) if err != nil { return nil, fmt.Errorf("copy %s: %w", sourceChunk.GetFileIdString(), err) } @@ -138,13 +138,13 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path str }, nil } -func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) { +func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, path string, sourceMtimeNs int64) (*filer_pb.FileChunk, error) { resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, fs.filerSource.LookupFileId, sourceChunk) if err != nil { return nil, fmt.Errorf("resolve manifest %s: %w", sourceChunk.GetFileIdString(), err) } - replicatedResolvedChunks, err := fs.replicateChunks(ctx, resolvedChunks, path, sourceMtime) + replicatedResolvedChunks, err := fs.replicateChunks(ctx, resolvedChunks, path, sourceMtimeNs) if err != nil { return nil, fmt.Errorf("replicate manifest data chunks %s: %w", sourceChunk.GetFileIdString(), err) } @@ -162,7 +162,7 @@ func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk return nil, fmt.Errorf("marshal manifest %s: %w", sourceChunk.GetFileIdString(), err) } - manifestFileId, err := fs.uploadManifestChunk(path, sourceMtime, sourceChunk.GetFileIdString(), manifestData) + manifestFileId, err := fs.uploadManifestChunk(path, sourceMtimeNs, sourceChunk.GetFileIdString(), manifestData) if err != nil { return nil, err } @@ -178,7 +178,7 @@ func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk }, nil } -func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceFileId string, manifestData []byte) (fileId string, err error) { +func (fs *FilerSink) uploadManifestChunk(path string, sourceMtimeNs int64, sourceFileId string, manifestData []byte) (fileId string, err error) { uploader, err := fs.getUploader() if err != nil { glog.V(0).Infof("upload manifest data %v: %v", sourceFileId, err) @@ -221,7 +221,7 @@ func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceF fileId = currentFileId return nil }, func(uploadErr error) (shouldContinue bool) { - if fs.hasSourceNewerVersion(path, sourceMtime) { + if fs.hasSourceNewerVersion(path, sourceMtimeNs) { glog.V(1).Infof("skip retrying stale source manifest %s for %s: %v", sourceFileId, path, uploadErr) return false } @@ -235,7 +235,7 @@ func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceF return fileId, nil } -func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (fileId string, err error) { +func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, sourceMtimeNs int64) (fileId string, err error) { uploader, err := fs.getUploader() if err != nil { glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) @@ -348,7 +348,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, transferStatus.mu.Unlock() return false } - if fs.hasSourceNewerVersion(path, sourceMtime) { + if fs.hasSourceNewerVersion(path, sourceMtimeNs) { glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) return false } @@ -381,10 +381,8 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, const maxTransientBackoff = 2 * time.Minute -// nextTransientBackoff returns the next backoff duration for a transient -// network failure. It starts at 10s, doubles each time, and caps at 2 minutes -// so an overloaded destination can recover instead of being hammered every -// second by the surrounding RetryUntil loop. +// nextTransientBackoff escalates 10s -> doubling -> 2m cap so an overloaded +// destination can recover instead of being hammered by the RetryUntil loop. func nextTransientBackoff(current time.Duration) time.Duration { if current < 10*time.Second { return 10 * time.Second @@ -403,12 +401,10 @@ func isEofError(err error) bool { return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) } -// isRetryableNetworkError reports whether err is a transient network failure -// worth a backoff-and-retry: an interrupted read (EOF), a timeout (e.g. the -// destination volume server hitting its idle deadline while reading a large -// upload body under load), or a reset/broken connection. The volume server -// returns the timeout as a JSON error string, so match on text in addition to -// the net.Error interface. +// isRetryableNetworkError reports whether err is a transient network failure worth +// a backoff-and-retry: EOF, timeout (e.g. the destination's idle deadline under +// load), or a reset/broken connection. The volume server returns the timeout as a +// JSON string, so match on text as well as the net.Error interface. func isRetryableNetworkError(err error) bool { if err == nil { return false @@ -420,8 +416,7 @@ func isRetryableNetworkError(err error) bool { if errors.As(err, &netErr) && netErr.Timeout() { return true } - // lower-case so we also catch capitalized variants from other OSes, - // libraries, or custom error wrappers + // lower-cased to also catch capitalized variants msg := strings.ToLower(err.Error()) return strings.Contains(msg, "i/o timeout") || strings.Contains(msg, "connection reset") || @@ -447,8 +442,12 @@ func (fs *FilerSink) buildUploadUrl(host, fileId string) string { return fmt.Sprintf("http://%s/%s", host, fileId) } -func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) bool { - if sourceMtime <= 0 || fs.filerSource == nil { +// hasSourceNewerVersion reports whether the source's current entry for targetPath +// has moved past sourceMtimeNs — gone, or a strictly-newer mtime — meaning the +// version being replayed is stale. The lookup runs regardless of sourceMtimeNs so +// a deleted source is detected even when the replayed mtime is epoch/unset. +func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtimeNs int64) bool { + if fs.filerSource == nil { return false } @@ -458,8 +457,21 @@ func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) } sourceEntry, err := filer_pb.GetEntry(context.Background(), fs.filerSource, sourcePath) - if err != nil { - glog.V(1).Infof("lookup source entry %s: %v", sourcePath, err) + return sourceSupersedes(sourcePath, sourceEntry, err, sourceMtimeNs) +} + +// sourceSupersedes reports whether the replayed version (sourceMtimeNs) is stale: +// true if the source entry is gone or strictly newer, false on a same/older version +// or any non-"not found" lookup error (so a transient failure never skips a live +// file). GetEntry signals missing as ErrNotFound — possibly wrapped or a plain gRPC +// string, occasionally a nil entry — all treated as gone. +func sourceSupersedes(sourcePath util.FullPath, sourceEntry *filer_pb.Entry, lookupErr error, sourceMtimeNs int64) bool { + if lookupErr != nil { + if errors.Is(lookupErr, filer_pb.ErrNotFound) || strings.Contains(lookupErr.Error(), filer_pb.ErrNotFound.Error()) { + glog.V(1).Infof("source entry %s no longer exists: %v", sourcePath, lookupErr) + return true + } + glog.V(1).Infof("lookup source entry %s: %v", sourcePath, lookupErr) return false } if sourceEntry == nil { @@ -467,7 +479,12 @@ func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) return true } - return sourceEntry.Attributes != nil && sourceEntry.Attributes.Mtime > sourceMtime + // source still holds the entry; only a strictly-newer mtime proves staleness, + // and only when there is a valid replayed mtime to compare against. + if sourceMtimeNs <= 0 { + return false + } + return getEntryMtimeNs(sourceEntry) > sourceMtimeNs } func (fs *FilerSink) targetPathToSourcePath(targetPath string) (util.FullPath, bool) { @@ -475,6 +492,13 @@ func (fs *FilerSink) targetPathToSourcePath(targetPath string) (util.FullPath, b return "", false } + // Incremental sink keys carry a date prefix (sinkDir/YYYY-MM-DD/relPath) that + // can't be reversed to a source path; report unmappable rather than build a path + // under a nonexistent dated dir, which would read as ErrNotFound and skip a live entry. + if fs.isIncremental { + return "", false + } + normalizePath := func(p string) string { p = strings.TrimSuffix(p, "/") if p == "" { diff --git a/weed/replication/sink/filersink/fetch_write_test.go b/weed/replication/sink/filersink/fetch_write_test.go index dd072e587..0cdd2c7f5 100644 --- a/weed/replication/sink/filersink/fetch_write_test.go +++ b/weed/replication/sink/filersink/fetch_write_test.go @@ -26,12 +26,13 @@ func TestMain(m *testing.M) { func TestTargetPathToSourcePath(t *testing.T) { tests := []struct { - name string - targetRoot string - sourceRoot string - targetPath string - wantPath util.FullPath - wantOK bool + name string + targetRoot string + sourceRoot string + targetPath string + incremental bool + wantPath util.FullPath + wantOK bool }{ { name: "basic mapping", @@ -41,6 +42,16 @@ func TestTargetPathToSourcePath(t *testing.T) { wantPath: "/source/path/file.txt", wantOK: true, }, + { + // incremental keys carry a date prefix that can't be reversed; unmappable + name: "incremental sink is unmappable", + targetRoot: "/target", + sourceRoot: "/source", + targetPath: "/target/2026-06-09/path/file.txt", + incremental: true, + wantPath: "", + wantOK: false, + }, { name: "trailing slash roots", targetRoot: "/target/", @@ -78,7 +89,8 @@ func TestTargetPathToSourcePath(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { fs := &FilerSink{ - dir: tc.targetRoot, + dir: tc.targetRoot, + isIncremental: tc.incremental, filerSource: &source.FilerSource{ Dir: tc.sourceRoot, }, @@ -321,3 +333,54 @@ func TestReplicateChunksPreservesSizeMismatchSentinel(t *testing.T) { t.Fatalf("error chain broken: errors.Is(err, errChunkSizeMismatch) = false; got %v", err) } } + +// sourceSupersedes decides whether to skip a stale replayed event. The replayed +// mtime is fixed; the table varies what the source lookup returned. +func TestSourceSupersedes(t *testing.T) { + const eventNs int64 = 5_000_000_500 // the version being replayed (sec 5, ns 500) + + withMtime := func(sec int64, ns int32) *filer_pb.Entry { + return &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: sec, MtimeNs: ns}} + } + + tests := []struct { + name string + entry *filer_pb.Entry + lookupErr error + want bool + }{ + // deleted on source: ErrNotFound in several shapes, all read as gone -> skip + {"not-found sentinel", nil, filer_pb.ErrNotFound, true}, + {"not-found wrapped", nil, fmt.Errorf("lookup /x: %w", filer_pb.ErrNotFound), true}, + {"not-found as string (gRPC)", nil, errors.New("rpc error: " + filer_pb.ErrNotFound.Error()), true}, + {"nil entry, nil error", nil, nil, true}, + // transient lookup failure must NOT skip a possibly-live file + {"network error", nil, errors.New("dial tcp: i/o timeout"), false}, + // live entry: compare full-ns mtime against the replayed version + {"source strictly newer", withMtime(5, 600), nil, true}, + {"source same version", withMtime(5, 500), nil, false}, + {"source older (out-of-order replay)", withMtime(5, 400), nil, false}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := sourceSupersedes("/source/x/config", tc.entry, tc.lookupErr, eventNs) + if got != tc.want { + t.Fatalf("sourceSupersedes = %v, want %v", got, tc.want) + } + }) + } +} + +// An epoch/unset replayed mtime (0) must not block "gone" detection: a deleted +// source still reports superseded so the event is skipped instead of wedging on +// permanent retries. A live source stays not-superseded — no valid mtime to compare. +func TestSourceSupersedesEpochMtime(t *testing.T) { + live := &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: 5, MtimeNs: 600}} + if !sourceSupersedes("/source/x", nil, filer_pb.ErrNotFound, 0) { + t.Fatal("epoch-mtime deleted source must be reported gone") + } + if sourceSupersedes("/source/x", live, nil, 0) { + t.Fatal("epoch-mtime live source must not be reported superseded") + } +} diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 9bfe3c6ac..340b3d4eb 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -183,26 +183,22 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ glog.V(3).Infof("already replicated %s", key) return nil } - if resp.Entry.Attributes != nil && resp.Entry.Attributes.Mtime >= entry.Attributes.Mtime { + if getEntryMtimeNs(resp.Entry) >= getEntryMtimeNs(entry) { if filer.FileSize(resp.Entry) >= filer.FileSize(entry) { glog.V(3).Infof("skip overwriting %s", key) return nil } - // destination is shorter despite a newer mtime: a truncated copy - // an earlier failed replication left behind; overwrite from source. glog.Warningf("repair truncated %s: destination %d bytes < source %d bytes but has a newer mtime; overwriting from source", key, filer.FileSize(resp.Entry), filer.FileSize(entry)) } } - replicatedChunks, err := fs.replicateChunks(context.Background(), entry.GetChunks(), key, getEntryMtime(entry)) + replicatedChunks, err := fs.replicateChunks(context.Background(), entry.GetChunks(), key, getEntryMtimeNs(entry)) if err != nil { - // Don't swallow size-mismatch: source bytes disagree with source - // metadata, so committing would propagate corruption silently. + // don't swallow: committing mismatched bytes would propagate corruption if errors.Is(err, errChunkSizeMismatch) { - glog.Errorf("refuse to replicate entry with corrupt chunk %s: %v", key, err) - return err + return fs.onCorruptChunk(key, entry, err) } glog.Warningf("replicate entry chunks %s: %v", key, err) return nil @@ -266,20 +262,22 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) + // the supersession checks below must look up where the incoming entry lives + // now, which for a rename is newParentPath/newEntry.Name, not the old key. + targetKey := updatedEntryKey(key, newParentPath, newEntry) + switch chooseUpdateAction(existingEntry, newEntry) { case updateSkip: - // a newer, complete version already landed; usually out-of-order messages. - // leave the destination untouched — no point rewriting the same entry. + // a newer, complete version already landed (out-of-order); leave it glog.V(2).Infof("late updates %s", key) return true, nil case updateRepair: glog.Warningf("repair truncated %s: destination %d bytes < source %d bytes but has a newer mtime; re-replicating full source content", key, filer.FileSize(existingEntry), filer.FileSize(newEntry)) - replicatedChunks, err := fs.replicateChunks(context.Background(), newEntry.GetChunks(), key, getEntryMtime(newEntry)) + replicatedChunks, err := fs.replicateChunks(context.Background(), newEntry.GetChunks(), targetKey, getEntryMtimeNs(newEntry)) if err != nil { if errors.Is(err, errChunkSizeMismatch) { - glog.Errorf("refuse to replicate entry with corrupt chunk %s: %v", key, err) - return true, err + return true, fs.onCorruptChunk(targetKey, newEntry, err) } glog.Warningf("replicate entry chunks %s: %v", key, err) return true, nil @@ -305,11 +303,10 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent } // replicate the chunks that are new in the source - replicatedChunks, err := fs.replicateChunks(context.Background(), newChunks, key, getEntryMtime(newEntry)) + replicatedChunks, err := fs.replicateChunks(context.Background(), newChunks, targetKey, getEntryMtimeNs(newEntry)) if err != nil { if errors.Is(err, errChunkSizeMismatch) { - glog.Errorf("refuse to replicate entry with corrupt chunk %s: %v", key, err) - return true, err + return true, fs.onCorruptChunk(targetKey, newEntry, err) } glog.Warningf("replicate entry chunks %s: %v", key, err) return true, nil @@ -360,34 +357,56 @@ func compareChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunc return } -func getEntryMtime(entry *filer_pb.Entry) int64 { +// getEntryMtimeNs returns the mtime at full nanosecond precision so versions +// rewritten within the same second can be ordered. int64 ns is safe until ~2262. +func getEntryMtimeNs(entry *filer_pb.Entry) int64 { if entry == nil || entry.Attributes == nil { return 0 } - return entry.Attributes.Mtime + return entry.Attributes.Mtime*int64(1e9) + int64(entry.Attributes.MtimeNs) } -// updateAction decides how UpdateEntry should reconcile an incoming source -// version with the destination's current entry. +// onCorruptChunk handles a permanent chunk size-mismatch (fetched source bytes +// disagree with source metadata). If the source already holds a newer version the +// event is a stale replay whose chunk was overwritten and GC'd — skip it (lossless: +// meta events are full snapshots, so a later event re-carries the current chunks). +// Otherwise the live version is corrupt, so return the error and halt loudly. +func (fs *FilerSink) onCorruptChunk(key string, entry *filer_pb.Entry, err error) error { + if fs.hasSourceNewerVersion(key, getEntryMtimeNs(entry)) { + glog.Warningf("skip stale entry %s with superseded corrupt chunk: %v", key, err) + return nil + } + glog.Errorf("refuse to replicate entry with corrupt chunk %s: %v", key, err) + return err +} + +// updateAction decides how UpdateEntry reconciles an incoming source version with +// the destination's current entry. type updateAction int const ( - // updateNormal applies the incremental source diff: the destination is at - // or behind the source version. + // updateNormal applies the source diff: destination at or behind the source. updateNormal updateAction = iota - // updateSkip leaves the destination untouched because a newer, complete - // version already landed out of order (last-writer-wins). + // updateSkip leaves the destination untouched: a newer complete version already landed. updateSkip - // updateRepair re-replicates the full source content over a destination - // that is strictly shorter than the source despite a newer mtime — a - // truncated copy an earlier failed replication left behind. Its mtime can - // look "newer" (the source preserved an old mtime while the partial copy - // was written recently), which would otherwise strand the corruption. + // updateRepair re-replicates full source over a destination strictly shorter than + // the source despite a newer mtime — a truncated copy from a failed replication. updateRepair ) +// updatedEntryKey returns the sink path the incoming entry lands at: for a rename +// newParentPath/newEntry.Name, else key's parent + name. Supersession must use this, +// not the pre-rename key, which would look deleted on the source and skip a live event. +func updatedEntryKey(key, newParentPath string, newEntry *filer_pb.Entry) string { + targetDir := newParentPath + if targetDir == "" { + targetDir, _ = util.FullPath(key).DirAndName() + } + return string(util.NewFullPath(targetDir, newEntry.Name)) +} + func chooseUpdateAction(existing, incoming *filer_pb.Entry) updateAction { - if getEntryMtime(existing) <= getEntryMtime(incoming) { + if getEntryMtimeNs(existing) <= getEntryMtimeNs(incoming) { return updateNormal } if filer.FileSize(existing) < filer.FileSize(incoming) { diff --git a/weed/replication/sink/filersink/filer_sink_test.go b/weed/replication/sink/filersink/filer_sink_test.go index 024f6edb5..f64fcd5e2 100644 --- a/weed/replication/sink/filersink/filer_sink_test.go +++ b/weed/replication/sink/filersink/filer_sink_test.go @@ -1,6 +1,7 @@ package filersink import ( + "errors" "testing" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -12,6 +13,49 @@ func entry(mtime int64, size uint64) *filer_pb.Entry { return &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: mtime, FileSize: size}} } +// entryNs adds a sub-second component to check same-second ordering. +func entryNs(mtime int64, ns int32, size uint64) *filer_pb.Entry { + return &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: mtime, MtimeNs: ns, FileSize: size}} +} + +// getEntryMtimeNs must order versions written within the same second, which +// plain second-grained mtime cannot. It also must be nil-safe. +func TestGetEntryMtimeNs(t *testing.T) { + if got := getEntryMtimeNs(nil); got != 0 { + t.Fatalf("nil entry: got %d, want 0", got) + } + if got := getEntryMtimeNs(&filer_pb.Entry{}); got != 0 { + t.Fatalf("nil attributes: got %d, want 0", got) + } + + mk := func(sec int64, ns int32) *filer_pb.Entry { + return &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: sec, MtimeNs: ns}} + } + if got, want := getEntryMtimeNs(mk(5, 200)), int64(5_000_000_200); got != want { + t.Fatalf("full-ns: got %d, want %d", got, want) + } + // same second, later sub-second component must compare strictly greater + if !(getEntryMtimeNs(mk(5, 200)) > getEntryMtimeNs(mk(5, 100))) { + t.Fatalf("same-second ordering failed: %d not > %d", + getEntryMtimeNs(mk(5, 200)), getEntryMtimeNs(mk(5, 100))) + } +} + +// onCorruptChunk must surface the error, not skip, when it can't confirm +// supersession (no source filer here) — otherwise it could drop a live file. +func TestOnCorruptChunkRefusesWhenSupersessionUnconfirmed(t *testing.T) { + fs := &FilerSink{} // filerSource nil => hasSourceNewerVersion is always false + sentinel := errors.New("corrupt chunk: read 0 bytes, source says 107") + + got := fs.onCorruptChunk("/buckets/x/config", + &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: 5, MtimeNs: 200}}, + sentinel) + + if !errors.Is(got, sentinel) { + t.Fatalf("expected the error to be returned (refuse), got %v", got) + } +} + // chooseUpdateAction decides skip vs repair vs normal. A destination left // truncated by an earlier failed replication can carry a newer mtime (the // source preserved an old mtime while the partial copy was written recently), @@ -30,6 +74,9 @@ func TestChooseUpdateAction(t *testing.T) { {"destination newer and complete", entry(300, 100), entry(200, 100), updateSkip}, {"destination newer and larger", entry(300, 200), entry(200, 100), updateSkip}, {"destination newer but truncated", entry(300, 90), entry(200, 100), updateRepair}, + // same second: sub-second ordering must still pick the winner + {"same second, destination newer and complete", entryNs(5, 200, 100), entryNs(5, 100, 100), updateSkip}, + {"same second, destination older", entryNs(5, 100, 100), entryNs(5, 200, 100), updateNormal}, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -39,3 +86,29 @@ func TestChooseUpdateAction(t *testing.T) { }) } } + +// updatedEntryKey must resolve the incoming entry's new path. For a rename the +// supersession check has to target newParentPath/newEntry.Name, not the old key, +// or the renamed-away old path looks deleted on the source and skips a live event. +func TestUpdatedEntryKey(t *testing.T) { + named := func(name string) *filer_pb.Entry { return &filer_pb.Entry{Name: name} } + tests := []struct { + name string + key string + newParentPath string + entry *filer_pb.Entry + want string + }{ + {"content update, no new parent", "/dst/a.txt", "", named("a.txt"), "/dst/a.txt"}, + {"rename same dir", "/dst/old.txt", "/dst", named("new.txt"), "/dst/new.txt"}, + {"rename to subdir", "/dst/old.txt", "/dst/sub", named("new.txt"), "/dst/sub/new.txt"}, + {"root parent", "/old.txt", "/", named("new.txt"), "/new.txt"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if got := updatedEntryKey(tc.key, tc.newParentPath, tc.entry); got != tc.want { + t.Fatalf("updatedEntryKey = %q, want %q", got, tc.want) + } + }) + } +}