fix(filer.backup): skip replay events whose source chunk was superseded or deleted (#9886)

* fix(filer.backup): skip replay events whose chunk no longer exists on the source

"Source" is the filer we replicate FROM (e.g. green in a green->blue backup).

Replaying the metadata log from a checkpoint can hit an event whose chunk was
since overwritten/deleted and garbage-collected on the source volume. Fetching
it returns 0 bytes (a permanent size mismatch), which the sink propagated to the
subscription — so the same offset retried forever and replication stalled.

Skip the event only when proven stale; otherwise keep refusing so genuine loss
of a live file still halts loudly:

- onCorruptChunk centralizes the three errChunkSizeMismatch sites.
- getEntryMtimeNs compares mtime at nanosecond precision so same-second rewrites
  (git's config.lock dance) are ordered correctly.
- sourceSupersedes re-reads the entry's current state on the source: gone
  (ErrNotFound) or a strictly-newer mtime than the replayed version -> skip;
  any other lookup error keeps the entry.

Skipping is lossless: events are full-entry snapshots, so a later event
re-carries the current chunks and a delete event reconciles a removed file.

* test(filer.backup): cover the superseded-chunk skip decision

- TestSourceSupersedes: not-found (sentinel / wrapped / gRPC string) and nil
  entry -> skip; network error -> keep; source newer -> skip; same/older -> keep.
- TestGetEntryMtimeNs: nanosecond precision, same-second ordering, nil safety.
- TestOnCorruptChunkRefusesWhenSupersessionUnconfirmed: never skip silently when
  supersession cannot be confirmed.

* fix(filer.backup): don't infer supersession for incremental sinks

In incremental mode the sink key carries a date prefix
(sinkDir/YYYY-MM-DD/relPath) that cannot be reversed to a real source path, so a
source lookup would always be ErrNotFound and wrongly classify a live entry as
deleted — skipping it. Make targetPathToSourcePath report "unmappable" in
incremental mode; hasSourceNewerVersion already declines to skip when the source
path cannot be mapped.

Found in code review. Non-incremental sinks (filer.backup green->blue) are
unaffected.

* refactor(filer.backup): name the mtime param sourceMtimeNs; note ns overflow bound

- Rename the threaded sourceMtime parameter to sourceMtimeNs across the internal
  replicate/fetch helpers so the unit is explicit (it only feeds
  hasSourceNewerVersion, which compares in nanoseconds).
- Document that getEntryMtimeNs's int64 ns arithmetic is safe until ~year 2262.

No behavior change.

* fix(filer.backup): order same-second versions in the CreateEntry skip and update gates

The CreateEntry already-replicated short-circuit and chooseUpdateAction
still compared second-grained mtime, so a newer version written within
the same second could be skipped as already-replicated or overwritten by
an older same-second replay. Route both through getEntryMtimeNs, matching
the precision the chunk-replication path already uses.

* test(filer.backup): cover same-second update-action ordering

* docs(filer.backup): trim verbose comments to terse why

* fix(filer.backup): check supersession against the rename's new path

For a rename the filer sink updates in place (the delete+create branch is
skipped for sink name "filer"), so the corrupt-chunk supersession check
queried the pre-rename key. Its source-side ErrNotFound was read as
"superseded", silently advancing the checkpoint without applying the rename.
Map the incoming entry's new path (newParentPath/newEntry.Name) for both
update branches.

* fix(filer.backup): detect a deleted source even when the replayed mtime is epoch

hasSourceNewerVersion returned early when sourceMtimeNs <= 0, skipping the
source lookup, so a deleted entry with mtime 0 (a valid epoch timestamp) never
got the gone verdict and wedged on permanent retries. Always look up; gate only
the newer-mtime comparison on a valid replayed mtime.

---------

Co-authored-by: Chris Lu <chris.lu@gmail.com>
This commit is contained in:
Jaehoon Kim
2026-06-10 00:53:29 +09:00
committed by GitHub
parent 1cf92f6c2e
commit 202517c02a
4 changed files with 245 additions and 66 deletions
+53 -29
View File
@@ -29,7 +29,7 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) {
func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, path string, sourceMtimeNs int64) (replicatedChunks []*filer_pb.FileChunk, err error) {
if len(sourceChunks) == 0 {
return
}
@@ -76,7 +76,7 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_
}
if sourceChunk.IsChunkManifest {
replicatedChunk, replicateErr := fs.replicateOneManifestChunk(ctx, sourceChunk, path, sourceMtime)
replicatedChunk, replicateErr := fs.replicateOneManifestChunk(ctx, sourceChunk, path, sourceMtimeNs)
if replicateErr != nil {
setError(replicateErr)
break
@@ -94,7 +94,7 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_
defer wg.Done()
var replicatedChunk *filer_pb.FileChunk
retryErr := util.Retry("replicate chunks", func() error {
chunk, e := fs.replicateOneChunk(source, path, sourceMtime)
chunk, e := fs.replicateOneChunk(source, path, sourceMtimeNs)
if e != nil {
return e
}
@@ -117,9 +117,9 @@ func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_
return
}
func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) {
func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string, sourceMtimeNs int64) (*filer_pb.FileChunk, error) {
fileId, err := fs.fetchAndWrite(sourceChunk, path, sourceMtime)
fileId, err := fs.fetchAndWrite(sourceChunk, path, sourceMtimeNs)
if err != nil {
return nil, fmt.Errorf("copy %s: %w", sourceChunk.GetFileIdString(), err)
}
@@ -138,13 +138,13 @@ func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path str
}, nil
}
func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) {
func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, path string, sourceMtimeNs int64) (*filer_pb.FileChunk, error) {
resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, fs.filerSource.LookupFileId, sourceChunk)
if err != nil {
return nil, fmt.Errorf("resolve manifest %s: %w", sourceChunk.GetFileIdString(), err)
}
replicatedResolvedChunks, err := fs.replicateChunks(ctx, resolvedChunks, path, sourceMtime)
replicatedResolvedChunks, err := fs.replicateChunks(ctx, resolvedChunks, path, sourceMtimeNs)
if err != nil {
return nil, fmt.Errorf("replicate manifest data chunks %s: %w", sourceChunk.GetFileIdString(), err)
}
@@ -162,7 +162,7 @@ func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk
return nil, fmt.Errorf("marshal manifest %s: %w", sourceChunk.GetFileIdString(), err)
}
manifestFileId, err := fs.uploadManifestChunk(path, sourceMtime, sourceChunk.GetFileIdString(), manifestData)
manifestFileId, err := fs.uploadManifestChunk(path, sourceMtimeNs, sourceChunk.GetFileIdString(), manifestData)
if err != nil {
return nil, err
}
@@ -178,7 +178,7 @@ func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk
}, nil
}
func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceFileId string, manifestData []byte) (fileId string, err error) {
func (fs *FilerSink) uploadManifestChunk(path string, sourceMtimeNs int64, sourceFileId string, manifestData []byte) (fileId string, err error) {
uploader, err := fs.getUploader()
if err != nil {
glog.V(0).Infof("upload manifest data %v: %v", sourceFileId, err)
@@ -221,7 +221,7 @@ func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceF
fileId = currentFileId
return nil
}, func(uploadErr error) (shouldContinue bool) {
if fs.hasSourceNewerVersion(path, sourceMtime) {
if fs.hasSourceNewerVersion(path, sourceMtimeNs) {
glog.V(1).Infof("skip retrying stale source manifest %s for %s: %v", sourceFileId, path, uploadErr)
return false
}
@@ -235,7 +235,7 @@ func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceF
return fileId, nil
}
func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (fileId string, err error) {
func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, sourceMtimeNs int64) (fileId string, err error) {
uploader, err := fs.getUploader()
if err != nil {
glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
@@ -348,7 +348,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
transferStatus.mu.Unlock()
return false
}
if fs.hasSourceNewerVersion(path, sourceMtime) {
if fs.hasSourceNewerVersion(path, sourceMtimeNs) {
glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr)
return false
}
@@ -381,10 +381,8 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
const maxTransientBackoff = 2 * time.Minute
// nextTransientBackoff returns the next backoff duration for a transient
// network failure. It starts at 10s, doubles each time, and caps at 2 minutes
// so an overloaded destination can recover instead of being hammered every
// second by the surrounding RetryUntil loop.
// nextTransientBackoff escalates 10s -> doubling -> 2m cap so an overloaded
// destination can recover instead of being hammered by the RetryUntil loop.
func nextTransientBackoff(current time.Duration) time.Duration {
if current < 10*time.Second {
return 10 * time.Second
@@ -403,12 +401,10 @@ func isEofError(err error) bool {
return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
}
// isRetryableNetworkError reports whether err is a transient network failure
// worth a backoff-and-retry: an interrupted read (EOF), a timeout (e.g. the
// destination volume server hitting its idle deadline while reading a large
// upload body under load), or a reset/broken connection. The volume server
// returns the timeout as a JSON error string, so match on text in addition to
// the net.Error interface.
// isRetryableNetworkError reports whether err is a transient network failure worth
// a backoff-and-retry: EOF, timeout (e.g. the destination's idle deadline under
// load), or a reset/broken connection. The volume server returns the timeout as a
// JSON string, so match on text as well as the net.Error interface.
func isRetryableNetworkError(err error) bool {
if err == nil {
return false
@@ -420,8 +416,7 @@ func isRetryableNetworkError(err error) bool {
if errors.As(err, &netErr) && netErr.Timeout() {
return true
}
// lower-case so we also catch capitalized variants from other OSes,
// libraries, or custom error wrappers
// lower-cased to also catch capitalized variants
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "i/o timeout") ||
strings.Contains(msg, "connection reset") ||
@@ -447,8 +442,12 @@ func (fs *FilerSink) buildUploadUrl(host, fileId string) string {
return fmt.Sprintf("http://%s/%s", host, fileId)
}
func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) bool {
if sourceMtime <= 0 || fs.filerSource == nil {
// hasSourceNewerVersion reports whether the source's current entry for targetPath
// has moved past sourceMtimeNs — gone, or a strictly-newer mtime — meaning the
// version being replayed is stale. The lookup runs regardless of sourceMtimeNs so
// a deleted source is detected even when the replayed mtime is epoch/unset.
func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtimeNs int64) bool {
if fs.filerSource == nil {
return false
}
@@ -458,8 +457,21 @@ func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64)
}
sourceEntry, err := filer_pb.GetEntry(context.Background(), fs.filerSource, sourcePath)
if err != nil {
glog.V(1).Infof("lookup source entry %s: %v", sourcePath, err)
return sourceSupersedes(sourcePath, sourceEntry, err, sourceMtimeNs)
}
// sourceSupersedes reports whether the replayed version (sourceMtimeNs) is stale:
// true if the source entry is gone or strictly newer, false on a same/older version
// or any non-"not found" lookup error (so a transient failure never skips a live
// file). GetEntry signals missing as ErrNotFound — possibly wrapped or a plain gRPC
// string, occasionally a nil entry — all treated as gone.
func sourceSupersedes(sourcePath util.FullPath, sourceEntry *filer_pb.Entry, lookupErr error, sourceMtimeNs int64) bool {
if lookupErr != nil {
if errors.Is(lookupErr, filer_pb.ErrNotFound) || strings.Contains(lookupErr.Error(), filer_pb.ErrNotFound.Error()) {
glog.V(1).Infof("source entry %s no longer exists: %v", sourcePath, lookupErr)
return true
}
glog.V(1).Infof("lookup source entry %s: %v", sourcePath, lookupErr)
return false
}
if sourceEntry == nil {
@@ -467,7 +479,12 @@ func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64)
return true
}
return sourceEntry.Attributes != nil && sourceEntry.Attributes.Mtime > sourceMtime
// source still holds the entry; only a strictly-newer mtime proves staleness,
// and only when there is a valid replayed mtime to compare against.
if sourceMtimeNs <= 0 {
return false
}
return getEntryMtimeNs(sourceEntry) > sourceMtimeNs
}
func (fs *FilerSink) targetPathToSourcePath(targetPath string) (util.FullPath, bool) {
@@ -475,6 +492,13 @@ func (fs *FilerSink) targetPathToSourcePath(targetPath string) (util.FullPath, b
return "", false
}
// Incremental sink keys carry a date prefix (sinkDir/YYYY-MM-DD/relPath) that
// can't be reversed to a source path; report unmappable rather than build a path
// under a nonexistent dated dir, which would read as ErrNotFound and skip a live entry.
if fs.isIncremental {
return "", false
}
normalizePath := func(p string) string {
p = strings.TrimSuffix(p, "/")
if p == "" {
@@ -26,12 +26,13 @@ func TestMain(m *testing.M) {
func TestTargetPathToSourcePath(t *testing.T) {
tests := []struct {
name string
targetRoot string
sourceRoot string
targetPath string
wantPath util.FullPath
wantOK bool
name string
targetRoot string
sourceRoot string
targetPath string
incremental bool
wantPath util.FullPath
wantOK bool
}{
{
name: "basic mapping",
@@ -41,6 +42,16 @@ func TestTargetPathToSourcePath(t *testing.T) {
wantPath: "/source/path/file.txt",
wantOK: true,
},
{
// incremental keys carry a date prefix that can't be reversed; unmappable
name: "incremental sink is unmappable",
targetRoot: "/target",
sourceRoot: "/source",
targetPath: "/target/2026-06-09/path/file.txt",
incremental: true,
wantPath: "",
wantOK: false,
},
{
name: "trailing slash roots",
targetRoot: "/target/",
@@ -78,7 +89,8 @@ func TestTargetPathToSourcePath(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
fs := &FilerSink{
dir: tc.targetRoot,
dir: tc.targetRoot,
isIncremental: tc.incremental,
filerSource: &source.FilerSource{
Dir: tc.sourceRoot,
},
@@ -321,3 +333,54 @@ func TestReplicateChunksPreservesSizeMismatchSentinel(t *testing.T) {
t.Fatalf("error chain broken: errors.Is(err, errChunkSizeMismatch) = false; got %v", err)
}
}
// sourceSupersedes decides whether to skip a stale replayed event. The replayed
// mtime is fixed; the table varies what the source lookup returned.
func TestSourceSupersedes(t *testing.T) {
const eventNs int64 = 5_000_000_500 // the version being replayed (sec 5, ns 500)
withMtime := func(sec int64, ns int32) *filer_pb.Entry {
return &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: sec, MtimeNs: ns}}
}
tests := []struct {
name string
entry *filer_pb.Entry
lookupErr error
want bool
}{
// deleted on source: ErrNotFound in several shapes, all read as gone -> skip
{"not-found sentinel", nil, filer_pb.ErrNotFound, true},
{"not-found wrapped", nil, fmt.Errorf("lookup /x: %w", filer_pb.ErrNotFound), true},
{"not-found as string (gRPC)", nil, errors.New("rpc error: " + filer_pb.ErrNotFound.Error()), true},
{"nil entry, nil error", nil, nil, true},
// transient lookup failure must NOT skip a possibly-live file
{"network error", nil, errors.New("dial tcp: i/o timeout"), false},
// live entry: compare full-ns mtime against the replayed version
{"source strictly newer", withMtime(5, 600), nil, true},
{"source same version", withMtime(5, 500), nil, false},
{"source older (out-of-order replay)", withMtime(5, 400), nil, false},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := sourceSupersedes("/source/x/config", tc.entry, tc.lookupErr, eventNs)
if got != tc.want {
t.Fatalf("sourceSupersedes = %v, want %v", got, tc.want)
}
})
}
}
// An epoch/unset replayed mtime (0) must not block "gone" detection: a deleted
// source still reports superseded so the event is skipped instead of wedging on
// permanent retries. A live source stays not-superseded — no valid mtime to compare.
func TestSourceSupersedesEpochMtime(t *testing.T) {
live := &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: 5, MtimeNs: 600}}
if !sourceSupersedes("/source/x", nil, filer_pb.ErrNotFound, 0) {
t.Fatal("epoch-mtime deleted source must be reported gone")
}
if sourceSupersedes("/source/x", live, nil, 0) {
t.Fatal("epoch-mtime live source must not be reported superseded")
}
}
+49 -30
View File
@@ -183,26 +183,22 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
glog.V(3).Infof("already replicated %s", key)
return nil
}
if resp.Entry.Attributes != nil && resp.Entry.Attributes.Mtime >= entry.Attributes.Mtime {
if getEntryMtimeNs(resp.Entry) >= getEntryMtimeNs(entry) {
if filer.FileSize(resp.Entry) >= filer.FileSize(entry) {
glog.V(3).Infof("skip overwriting %s", key)
return nil
}
// destination is shorter despite a newer mtime: a truncated copy
// an earlier failed replication left behind; overwrite from source.
glog.Warningf("repair truncated %s: destination %d bytes < source %d bytes but has a newer mtime; overwriting from source",
key, filer.FileSize(resp.Entry), filer.FileSize(entry))
}
}
replicatedChunks, err := fs.replicateChunks(context.Background(), entry.GetChunks(), key, getEntryMtime(entry))
replicatedChunks, err := fs.replicateChunks(context.Background(), entry.GetChunks(), key, getEntryMtimeNs(entry))
if err != nil {
// Don't swallow size-mismatch: source bytes disagree with source
// metadata, so committing would propagate corruption silently.
// don't swallow: committing mismatched bytes would propagate corruption
if errors.Is(err, errChunkSizeMismatch) {
glog.Errorf("refuse to replicate entry with corrupt chunk %s: %v", key, err)
return err
return fs.onCorruptChunk(key, entry, err)
}
glog.Warningf("replicate entry chunks %s: %v", key, err)
return nil
@@ -266,20 +262,22 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
// the supersession checks below must look up where the incoming entry lives
// now, which for a rename is newParentPath/newEntry.Name, not the old key.
targetKey := updatedEntryKey(key, newParentPath, newEntry)
switch chooseUpdateAction(existingEntry, newEntry) {
case updateSkip:
// a newer, complete version already landed; usually out-of-order messages.
// leave the destination untouched — no point rewriting the same entry.
// a newer, complete version already landed (out-of-order); leave it
glog.V(2).Infof("late updates %s", key)
return true, nil
case updateRepair:
glog.Warningf("repair truncated %s: destination %d bytes < source %d bytes but has a newer mtime; re-replicating full source content",
key, filer.FileSize(existingEntry), filer.FileSize(newEntry))
replicatedChunks, err := fs.replicateChunks(context.Background(), newEntry.GetChunks(), key, getEntryMtime(newEntry))
replicatedChunks, err := fs.replicateChunks(context.Background(), newEntry.GetChunks(), targetKey, getEntryMtimeNs(newEntry))
if err != nil {
if errors.Is(err, errChunkSizeMismatch) {
glog.Errorf("refuse to replicate entry with corrupt chunk %s: %v", key, err)
return true, err
return true, fs.onCorruptChunk(targetKey, newEntry, err)
}
glog.Warningf("replicate entry chunks %s: %v", key, err)
return true, nil
@@ -305,11 +303,10 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
}
// replicate the chunks that are new in the source
replicatedChunks, err := fs.replicateChunks(context.Background(), newChunks, key, getEntryMtime(newEntry))
replicatedChunks, err := fs.replicateChunks(context.Background(), newChunks, targetKey, getEntryMtimeNs(newEntry))
if err != nil {
if errors.Is(err, errChunkSizeMismatch) {
glog.Errorf("refuse to replicate entry with corrupt chunk %s: %v", key, err)
return true, err
return true, fs.onCorruptChunk(targetKey, newEntry, err)
}
glog.Warningf("replicate entry chunks %s: %v", key, err)
return true, nil
@@ -360,34 +357,56 @@ func compareChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunc
return
}
func getEntryMtime(entry *filer_pb.Entry) int64 {
// getEntryMtimeNs returns the mtime at full nanosecond precision so versions
// rewritten within the same second can be ordered. int64 ns is safe until ~2262.
func getEntryMtimeNs(entry *filer_pb.Entry) int64 {
if entry == nil || entry.Attributes == nil {
return 0
}
return entry.Attributes.Mtime
return entry.Attributes.Mtime*int64(1e9) + int64(entry.Attributes.MtimeNs)
}
// updateAction decides how UpdateEntry should reconcile an incoming source
// version with the destination's current entry.
// onCorruptChunk handles a permanent chunk size-mismatch (fetched source bytes
// disagree with source metadata). If the source already holds a newer version the
// event is a stale replay whose chunk was overwritten and GC'd — skip it (lossless:
// meta events are full snapshots, so a later event re-carries the current chunks).
// Otherwise the live version is corrupt, so return the error and halt loudly.
func (fs *FilerSink) onCorruptChunk(key string, entry *filer_pb.Entry, err error) error {
if fs.hasSourceNewerVersion(key, getEntryMtimeNs(entry)) {
glog.Warningf("skip stale entry %s with superseded corrupt chunk: %v", key, err)
return nil
}
glog.Errorf("refuse to replicate entry with corrupt chunk %s: %v", key, err)
return err
}
// updateAction decides how UpdateEntry reconciles an incoming source version with
// the destination's current entry.
type updateAction int
const (
// updateNormal applies the incremental source diff: the destination is at
// or behind the source version.
// updateNormal applies the source diff: destination at or behind the source.
updateNormal updateAction = iota
// updateSkip leaves the destination untouched because a newer, complete
// version already landed out of order (last-writer-wins).
// updateSkip leaves the destination untouched: a newer complete version already landed.
updateSkip
// updateRepair re-replicates the full source content over a destination
// that is strictly shorter than the source despite a newer mtime — a
// truncated copy an earlier failed replication left behind. Its mtime can
// look "newer" (the source preserved an old mtime while the partial copy
// was written recently), which would otherwise strand the corruption.
// updateRepair re-replicates full source over a destination strictly shorter than
// the source despite a newer mtime — a truncated copy from a failed replication.
updateRepair
)
// updatedEntryKey returns the sink path the incoming entry lands at: for a rename
// newParentPath/newEntry.Name, else key's parent + name. Supersession must use this,
// not the pre-rename key, which would look deleted on the source and skip a live event.
func updatedEntryKey(key, newParentPath string, newEntry *filer_pb.Entry) string {
targetDir := newParentPath
if targetDir == "" {
targetDir, _ = util.FullPath(key).DirAndName()
}
return string(util.NewFullPath(targetDir, newEntry.Name))
}
func chooseUpdateAction(existing, incoming *filer_pb.Entry) updateAction {
if getEntryMtime(existing) <= getEntryMtime(incoming) {
if getEntryMtimeNs(existing) <= getEntryMtimeNs(incoming) {
return updateNormal
}
if filer.FileSize(existing) < filer.FileSize(incoming) {
@@ -1,6 +1,7 @@
package filersink
import (
"errors"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -12,6 +13,49 @@ func entry(mtime int64, size uint64) *filer_pb.Entry {
return &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: mtime, FileSize: size}}
}
// entryNs adds a sub-second component to check same-second ordering.
func entryNs(mtime int64, ns int32, size uint64) *filer_pb.Entry {
return &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: mtime, MtimeNs: ns, FileSize: size}}
}
// getEntryMtimeNs must order versions written within the same second, which
// plain second-grained mtime cannot. It also must be nil-safe.
func TestGetEntryMtimeNs(t *testing.T) {
if got := getEntryMtimeNs(nil); got != 0 {
t.Fatalf("nil entry: got %d, want 0", got)
}
if got := getEntryMtimeNs(&filer_pb.Entry{}); got != 0 {
t.Fatalf("nil attributes: got %d, want 0", got)
}
mk := func(sec int64, ns int32) *filer_pb.Entry {
return &filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: sec, MtimeNs: ns}}
}
if got, want := getEntryMtimeNs(mk(5, 200)), int64(5_000_000_200); got != want {
t.Fatalf("full-ns: got %d, want %d", got, want)
}
// same second, later sub-second component must compare strictly greater
if !(getEntryMtimeNs(mk(5, 200)) > getEntryMtimeNs(mk(5, 100))) {
t.Fatalf("same-second ordering failed: %d not > %d",
getEntryMtimeNs(mk(5, 200)), getEntryMtimeNs(mk(5, 100)))
}
}
// onCorruptChunk must surface the error, not skip, when it can't confirm
// supersession (no source filer here) — otherwise it could drop a live file.
func TestOnCorruptChunkRefusesWhenSupersessionUnconfirmed(t *testing.T) {
fs := &FilerSink{} // filerSource nil => hasSourceNewerVersion is always false
sentinel := errors.New("corrupt chunk: read 0 bytes, source says 107")
got := fs.onCorruptChunk("/buckets/x/config",
&filer_pb.Entry{Attributes: &filer_pb.FuseAttributes{Mtime: 5, MtimeNs: 200}},
sentinel)
if !errors.Is(got, sentinel) {
t.Fatalf("expected the error to be returned (refuse), got %v", got)
}
}
// chooseUpdateAction decides skip vs repair vs normal. A destination left
// truncated by an earlier failed replication can carry a newer mtime (the
// source preserved an old mtime while the partial copy was written recently),
@@ -30,6 +74,9 @@ func TestChooseUpdateAction(t *testing.T) {
{"destination newer and complete", entry(300, 100), entry(200, 100), updateSkip},
{"destination newer and larger", entry(300, 200), entry(200, 100), updateSkip},
{"destination newer but truncated", entry(300, 90), entry(200, 100), updateRepair},
// same second: sub-second ordering must still pick the winner
{"same second, destination newer and complete", entryNs(5, 200, 100), entryNs(5, 100, 100), updateSkip},
{"same second, destination older", entryNs(5, 100, 100), entryNs(5, 200, 100), updateNormal},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
@@ -39,3 +86,29 @@ func TestChooseUpdateAction(t *testing.T) {
})
}
}
// updatedEntryKey must resolve the incoming entry's new path. For a rename the
// supersession check has to target newParentPath/newEntry.Name, not the old key,
// or the renamed-away old path looks deleted on the source and skips a live event.
func TestUpdatedEntryKey(t *testing.T) {
named := func(name string) *filer_pb.Entry { return &filer_pb.Entry{Name: name} }
tests := []struct {
name string
key string
newParentPath string
entry *filer_pb.Entry
want string
}{
{"content update, no new parent", "/dst/a.txt", "", named("a.txt"), "/dst/a.txt"},
{"rename same dir", "/dst/old.txt", "/dst", named("new.txt"), "/dst/new.txt"},
{"rename to subdir", "/dst/old.txt", "/dst/sub", named("new.txt"), "/dst/sub/new.txt"},
{"root parent", "/old.txt", "/", named("new.txt"), "/new.txt"},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
if got := updatedEntryKey(tc.key, tc.newParentPath, tc.entry); got != tc.want {
t.Fatalf("updatedEntryKey = %q, want %q", got, tc.want)
}
})
}
}