diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 397bb4193..284e037aa 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -611,6 +611,37 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str // old key is in the watched directory if util.IsEqualOrUnder(string(sourceNewKey), sourcePath) { // new key is also in the watched directory + if filer_pb.IsRename(resp) { + newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + // With deletes enabled a rename relocates the entry. Guard the + // watched root itself, whose old key would resolve to the target + // root and recursively delete the whole sink tree. + if doDeleteFiles && string(sourceOldKey) != sourcePath { + oldKey := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) + if mover, ok := dataSink.(sink.EntryMover); ok { + // native atomic move: no re-copy, no descendant gap, no chunk leak. + return mover.MoveEntry(oldKey, newKey, message.NewEntry, message.Signatures) + } + // no native move: create the new entry first, then delete the + // old, so a crash between the two leaves the entry visible. + if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil { + return fmt.Errorf("create entry2 : %w", err) + } + if err := dataSink.DeleteEntry(oldKey, message.OldEntry.IsDirectory, false, message.Signatures); err != nil { + return fmt.Errorf("delete old entry %v: %w", oldKey, err) + } + return nil + } + // deletes disabled (backup/incremental) or the watched root moved: + // create the new entry and keep the old. + if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil { + return fmt.Errorf("create entry2 : %w", err) + } + return nil + } + + // in-place update (same path): mutate via UpdateEntry; never + // delete-then-recreate the same key. if doDeleteFiles { oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) var sinkNewParentPath string @@ -623,19 +654,16 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str if foundExisting { return err } - - // not able to find old entry + // old entry missing on the destination; fall through to create it if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { return fmt.Errorf("delete old entry %v: %w", oldKey, err) } } - // create the new entry newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil { return fmt.Errorf("create entry2 : %w", err) - } else { - return nil } + return nil } else { // new key is outside the watched directory diff --git a/weed/command/filer_sync_process_test.go b/weed/command/filer_sync_process_test.go index 726319c6b..477c223b3 100644 --- a/weed/command/filer_sync_process_test.go +++ b/weed/command/filer_sync_process_test.go @@ -17,6 +17,12 @@ type recordingSyncSink struct { createKeys []string updateKeys []string incremental bool + // updateFoundExisting is what UpdateEntry reports; false exercises the + // delete-then-create fallback for an in-place update missing on the sink. + updateFoundExisting bool + // ordered records the sink method names in call order so tests can assert + // create-before-delete sequencing. + ordered []string } func (s *recordingSyncSink) GetName() string { return "recording" } @@ -25,15 +31,30 @@ func (s *recordingSyncSink) Initialize(util.Configuration, string) error { } func (s *recordingSyncSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { s.deleteKeys = append(s.deleteKeys, key) + s.ordered = append(s.ordered, "delete") return nil } func (s *recordingSyncSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { s.createKeys = append(s.createKeys, key) + s.ordered = append(s.ordered, "create") return nil } func (s *recordingSyncSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (bool, error) { s.updateKeys = append(s.updateKeys, key) - return true, nil + s.ordered = append(s.ordered, "update") + return s.updateFoundExisting, nil +} + +func equalSyncStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true } func (s *recordingSyncSink) GetSinkToDirectory() string { return "/dest" } func (s *recordingSyncSink) SetSourceFiler(*source.FilerSource) {} @@ -59,6 +80,23 @@ func TestDestKeyPreservesColonForNonLocalSink(t *testing.T) { }) } +// movingSyncSink is a recordingSyncSink that also implements sink.EntryMover, +// modeling a sink (like the filer) with a native atomic move. +type movingSyncSink struct { + *recordingSyncSink + moveOldKeys []string + moveNewKeys []string +} + +var _ sink.EntryMover = (*movingSyncSink)(nil) + +func (s *movingSyncSink) MoveEntry(oldKey, newKey string, newEntry *filer_pb.Entry, signatures []int32) error { + s.moveOldKeys = append(s.moveOldKeys, oldKey) + s.moveNewKeys = append(s.moveNewKeys, newKey) + s.ordered = append(s.ordered, "move") + return nil +} + func TestPathIsEqualOrUnderUsesDirectoryBoundaries(t *testing.T) { tests := []struct { name string @@ -94,6 +132,169 @@ func TestMatchesExcludePathUsesDirectoryBoundaries(t *testing.T) { } } +// A combined rename event (old and new both under sourcePath, doDeleteFiles=true) +// creates at the new key then deletes the old key — never UpdateEntry. +func TestGenProcessFunctionRenameCreatesThenDeletes(t *testing.T) { + dataSink := &recordingSyncSink{} + processFn := genProcessFunction("/foo", "/dest", nil, nil, nil, nil, dataSink, true, false) + + err := processFn(&filer_pb.SubscribeMetadataResponse{ + Directory: "/foo/dir", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "old.txt"}, + NewEntry: &filer_pb.Entry{Name: "new.txt"}, + NewParentPath: "/foo/dir", + }, + }) + if err != nil { + t.Fatalf("processFn rename: %v", err) + } + + if len(dataSink.updateKeys) != 0 { + t.Fatalf("expected rename to bypass UpdateEntry, got %v", dataSink.updateKeys) + } + if len(dataSink.createKeys) != 1 || dataSink.createKeys[0] != "/dest/dir/new.txt" { + t.Fatalf("create keys = %v, want [/dest/dir/new.txt]", dataSink.createKeys) + } + if len(dataSink.deleteKeys) != 1 || dataSink.deleteKeys[0] != "/dest/dir/old.txt" { + t.Fatalf("delete keys = %v, want [/dest/dir/old.txt]", dataSink.deleteKeys) + } + if got, want := dataSink.ordered, []string{"create", "delete"}; !equalSyncStrings(got, want) { + t.Fatalf("call order = %v, want create before delete %v", got, want) + } +} + +// A sink with a native move relocates a rename via MoveEntry, not create-then-delete. +func TestGenProcessFunctionRenameUsesMoveEntryWhenSupported(t *testing.T) { + dataSink := &movingSyncSink{recordingSyncSink: &recordingSyncSink{}} + processFn := genProcessFunction("/foo", "/dest", nil, nil, nil, nil, dataSink, true, false) + + err := processFn(&filer_pb.SubscribeMetadataResponse{ + Directory: "/foo/dir", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "old.txt"}, + NewEntry: &filer_pb.Entry{Name: "new.txt"}, + NewParentPath: "/foo/dir", + }, + }) + if err != nil { + t.Fatalf("processFn rename via mover: %v", err) + } + + if len(dataSink.moveOldKeys) != 1 || dataSink.moveOldKeys[0] != "/dest/dir/old.txt" || dataSink.moveNewKeys[0] != "/dest/dir/new.txt" { + t.Fatalf("move old=%v new=%v, want one move /dest/dir/old.txt => /dest/dir/new.txt", dataSink.moveOldKeys, dataSink.moveNewKeys) + } + if len(dataSink.createKeys) != 0 || len(dataSink.deleteKeys) != 0 || len(dataSink.updateKeys) != 0 { + t.Fatalf("native move must not create/delete/update: creates=%v deletes=%v updates=%v", dataSink.createKeys, dataSink.deleteKeys, dataSink.updateKeys) + } +} + +// With deletes disabled (backup/incremental), even a mover keeps the old entry: +// it creates the new one and does not move or delete the old. +func TestGenProcessFunctionRenameMoverKeepsOldWhenDeletesDisabled(t *testing.T) { + dataSink := &movingSyncSink{recordingSyncSink: &recordingSyncSink{}} + processFn := genProcessFunction("/foo", "/dest", nil, nil, nil, nil, dataSink, false, false) + + err := processFn(&filer_pb.SubscribeMetadataResponse{ + Directory: "/foo/dir", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "old.txt"}, + NewEntry: &filer_pb.Entry{Name: "new.txt"}, + NewParentPath: "/foo/dir", + }, + }) + if err != nil { + t.Fatalf("processFn rename (no delete) via mover: %v", err) + } + + if len(dataSink.createKeys) != 1 || dataSink.createKeys[0] != "/dest/dir/new.txt" { + t.Fatalf("create keys = %v, want [/dest/dir/new.txt]", dataSink.createKeys) + } + if len(dataSink.moveOldKeys) != 0 || len(dataSink.deleteKeys) != 0 { + t.Fatalf("deletes-disabled rename must keep old: moves=%v deletes=%v", dataSink.moveOldKeys, dataSink.deleteKeys) + } +} + +// An in-place update (same dir + same name) must route to UpdateEntry, never the +// rename create-then-delete path — otherwise it would delete the key it just wrote. +func TestGenProcessFunctionInPlaceUpdateUsesUpdateEntry(t *testing.T) { + dataSink := &recordingSyncSink{updateFoundExisting: true} + processFn := genProcessFunction("/foo", "/dest", nil, nil, nil, nil, dataSink, true, false) + + err := processFn(&filer_pb.SubscribeMetadataResponse{ + Directory: "/foo/dir", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "file.txt"}, + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + NewParentPath: "/foo/dir", + }, + }) + if err != nil { + t.Fatalf("processFn in-place update: %v", err) + } + + if len(dataSink.updateKeys) != 1 || dataSink.updateKeys[0] != "/dest/dir/file.txt" { + t.Fatalf("update keys = %v, want [/dest/dir/file.txt]", dataSink.updateKeys) + } + if len(dataSink.createKeys) != 0 || len(dataSink.deleteKeys) != 0 { + t.Fatalf("in-place update must not create or delete: creates=%v deletes=%v", dataSink.createKeys, dataSink.deleteKeys) + } +} + +// When an in-place update finds no existing entry on the sink, it falls back to +// delete-then-create on the same key — and must delete before create, or the +// recreated entry would be removed. +func TestGenProcessFunctionInPlaceUpdateFallbackDeletesBeforeCreate(t *testing.T) { + dataSink := &recordingSyncSink{updateFoundExisting: false} + processFn := genProcessFunction("/foo", "/dest", nil, nil, nil, nil, dataSink, true, false) + + err := processFn(&filer_pb.SubscribeMetadataResponse{ + Directory: "/foo/dir", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "file.txt"}, + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + NewParentPath: "/foo/dir", + }, + }) + if err != nil { + t.Fatalf("processFn in-place update fallback: %v", err) + } + + if got, want := dataSink.ordered, []string{"update", "delete", "create"}; !equalSyncStrings(got, want) { + t.Fatalf("call order = %v, want %v", got, want) + } + if len(dataSink.deleteKeys) != 1 || dataSink.deleteKeys[0] != "/dest/dir/file.txt" || + len(dataSink.createKeys) != 1 || dataSink.createKeys[0] != "/dest/dir/file.txt" { + t.Fatalf("fallback keys: deletes=%v creates=%v, want both [/dest/dir/file.txt]", dataSink.deleteKeys, dataSink.createKeys) + } +} + +// With deletes disabled (e.g. incremental backup), a rename creates the new +// entry only — the old key is left in place. +func TestGenProcessFunctionRenameCreateOnlyWhenDeletesDisabled(t *testing.T) { + dataSink := &recordingSyncSink{} + processFn := genProcessFunction("/foo", "/dest", nil, nil, nil, nil, dataSink, false, false) + + err := processFn(&filer_pb.SubscribeMetadataResponse{ + Directory: "/foo/dir", + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "old.txt"}, + NewEntry: &filer_pb.Entry{Name: "new.txt"}, + NewParentPath: "/foo/dir", + }, + }) + if err != nil { + t.Fatalf("processFn rename (no delete): %v", err) + } + + if len(dataSink.createKeys) != 1 || dataSink.createKeys[0] != "/dest/dir/new.txt" { + t.Fatalf("create keys = %v, want [/dest/dir/new.txt]", dataSink.createKeys) + } + if len(dataSink.deleteKeys) != 0 || len(dataSink.updateKeys) != 0 { + t.Fatalf("unexpected delete/update calls: deletes=%v updates=%v", dataSink.deleteKeys, dataSink.updateKeys) + } +} + func TestGenProcessFunctionRenameToSiblingPrefixBecomesDelete(t *testing.T) { dataSink := &recordingSyncSink{} processFn := genProcessFunction("/foo", "/dest", nil, nil, nil, nil, dataSink, true, false) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 10c576552..26915d5c7 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -111,25 +111,32 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p return nil } - if oldSinkKey != newSinkKey && r.sink.GetName() != "filer" { - if err := r.sink.DeleteEntry(oldSinkKey, oldEntry.IsDirectory, false, message.Signatures); err != nil { - return fmt.Errorf("delete old entry %v: %w", oldSinkKey, err) + if oldSinkKey != newSinkKey { + // A real move: the path changed. UpdateEntry cannot move an entry. + if mover, ok := r.sink.(sink.EntryMover); ok { + glog.V(4).Infof("moving %v => %v", oldSinkKey, newSinkKey) + return mover.MoveEntry(oldSinkKey, newSinkKey, newEntry, message.Signatures) } + // Sinks without a native move: create at the new key first, then delete the + // old, so a crash between the two leaves the entry visible under both names + // rather than gone. glog.V(4).Infof("creating renamed %v", newSinkKey) - return r.sink.CreateEntry(newSinkKey, newEntry, message.Signatures) + if err := r.sink.CreateEntry(newSinkKey, newEntry, message.Signatures); err != nil { + return fmt.Errorf("create renamed entry %v: %w", newSinkKey, err) + } + return r.sink.DeleteEntry(oldSinkKey, oldEntry.IsDirectory, false, message.Signatures) } + // oldSinkKey == newSinkKey: pure in-place update (same path, content/attrs changed). foundExisting, err := r.sink.UpdateEntry(oldSinkKey, oldEntry, newSinkParentPath, newEntry, message.DeleteChunks, message.Signatures) if foundExisting { glog.V(4).Infof("updated %v", oldSinkKey) return err } - err = r.sink.DeleteEntry(oldSinkKey, oldEntry.IsDirectory, false, message.Signatures) if err != nil { return fmt.Errorf("delete old entry %v: %w", oldSinkKey, err) } - glog.V(4).Infof("creating missing %v", newSinkKey) return r.sink.CreateEntry(newSinkKey, newEntry, message.Signatures) } diff --git a/weed/replication/replicator_test.go b/weed/replication/replicator_test.go index b2ef2c2c6..6c4947ef5 100644 --- a/weed/replication/replicator_test.go +++ b/weed/replication/replicator_test.go @@ -35,6 +35,10 @@ type recordingSink struct { deleteCalls []deleteCall createCalls []createCall updateCalls []updateCall + + // ordered records the sink method names in call order so tests can assert + // create-before-delete sequencing. + ordered []string } func (s *recordingSink) GetName() string { @@ -47,16 +51,19 @@ func (s *recordingSink) Initialize(util.Configuration, string) error { func (s *recordingSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { s.deleteCalls = append(s.deleteCalls, deleteCall{key: key, isDirectory: isDirectory}) + s.ordered = append(s.ordered, "delete") return nil } func (s *recordingSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { s.createCalls = append(s.createCalls, createCall{key: key}) + s.ordered = append(s.ordered, "create") return nil } func (s *recordingSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (bool, error) { s.updateCalls = append(s.updateCalls, updateCall{key: key, newParentPath: newParentPath}) + s.ordered = append(s.ordered, "update") return s.updateFoundExisting, nil } @@ -70,6 +77,26 @@ func (s *recordingSink) IsIncremental() bool { return s.incremental } +type moveCall struct { + oldKey string + newKey string +} + +// movingSink is a recordingSink that also implements sink.EntryMover, modeling +// a sink (like the filer) with a native atomic move. +type movingSink struct { + *recordingSink + moveCalls []moveCall +} + +var _ sink.EntryMover = (*movingSink)(nil) + +func (s *movingSink) MoveEntry(oldKey, newKey string, newEntry *filer_pb.Entry, signatures []int32) error { + s.moveCalls = append(s.moveCalls, moveCall{oldKey: oldKey, newKey: newKey}) + s.ordered = append(s.ordered, "move") + return nil +} + func TestReplicateRenameUsesTargetKeyForNonFilerSink(t *testing.T) { s := &recordingSink{name: "local", sinkToDirectory: "/dest"} r := &Replicator{ @@ -97,19 +124,26 @@ func TestReplicateRenameUsesTargetKeyForNonFilerSink(t *testing.T) { } if len(s.updateCalls) != 0 { - t.Fatalf("expected non-filer rename to bypass UpdateEntry, got %d calls", len(s.updateCalls)) - } - if len(s.deleteCalls) != 1 || s.deleteCalls[0].key != "/dest/old/file.txt" { - t.Fatalf("delete calls = %+v, want old sink key", s.deleteCalls) + t.Fatalf("expected rename to bypass UpdateEntry, got %d calls", len(s.updateCalls)) } if len(s.createCalls) != 1 || s.createCalls[0].key != "/dest/new/renamed.txt" { t.Fatalf("create calls = %+v, want target sink key", s.createCalls) } + if len(s.deleteCalls) != 1 || s.deleteCalls[0].key != "/dest/old/file.txt" { + t.Fatalf("delete calls = %+v, want old sink key", s.deleteCalls) + } + if got, want := s.ordered, []string{"create", "delete"}; !equalStrings(got, want) { + t.Fatalf("call order = %v, want create before delete %v", got, want) + } } -func TestReplicateRenameUsesUpdateForFilerSink(t *testing.T) { +// A real move to a filer sink also goes through create-then-delete (the +// filer-sink special-case that previously routed renames to UpdateEntry is +// gone): UpdateEntry cannot move an entry across paths. +// A sink without a native move falls back to create-then-delete for a rename. +func TestReplicateRenameWithoutMoverUsesCreateThenDelete(t *testing.T) { s := &recordingSink{ - name: "filer", + name: "s3", sinkToDirectory: "/dest", updateFoundExisting: true, } @@ -137,21 +171,95 @@ func TestReplicateRenameUsesUpdateForFilerSink(t *testing.T) { t.Fatalf("Replicate rename: %v", err) } + if len(s.updateCalls) != 0 { + t.Fatalf("expected rename to bypass UpdateEntry, got %d calls", len(s.updateCalls)) + } + if len(s.createCalls) != 1 || s.createCalls[0].key != "/dest/new/renamed.txt" { + t.Fatalf("create calls = %+v, want target sink key", s.createCalls) + } + if len(s.deleteCalls) != 1 || s.deleteCalls[0].key != "/dest/old/file.txt" { + t.Fatalf("delete calls = %+v, want old sink key", s.deleteCalls) + } + if got, want := s.ordered, []string{"create", "delete"}; !equalStrings(got, want) { + t.Fatalf("call order = %v, want create before delete %v", got, want) + } +} + +// A sink with a native move (the filer) relocates a rename via MoveEntry — never +// create-then-delete, so a failed copy can't delete the only valid destination. +func TestReplicateRenameUsesMoveEntryWhenSupported(t *testing.T) { + s := &movingSink{recordingSink: &recordingSink{name: "filer", sinkToDirectory: "/dest"}} + r := &Replicator{ + sink: s, + source: &source.FilerSource{Dir: "/source"}, + } + + err := r.Replicate(context.Background(), "/source/old/file.txt", &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{Name: "file.txt"}, + NewEntry: &filer_pb.Entry{Name: "renamed.txt"}, + NewParentPath: "/source/new", + }) + if err != nil { + t.Fatalf("Replicate rename: %v", err) + } + + if len(s.moveCalls) != 1 || s.moveCalls[0].oldKey != "/dest/old/file.txt" || s.moveCalls[0].newKey != "/dest/new/renamed.txt" { + t.Fatalf("move calls = %+v, want one move /dest/old/file.txt => /dest/new/renamed.txt", s.moveCalls) + } + if len(s.createCalls) != 0 || len(s.deleteCalls) != 0 || len(s.updateCalls) != 0 { + t.Fatalf("native move must not create/delete/update: creates=%v deletes=%v updates=%v", s.createCalls, s.deleteCalls, s.updateCalls) + } +} + +// An in-place update (same parent + same name, so oldSinkKey == newSinkKey) +// still routes to UpdateEntry rather than create-then-delete. +func TestReplicateInPlaceUpdateUsesUpdateEntry(t *testing.T) { + s := &recordingSink{ + name: "filer", + sinkToDirectory: "/dest", + updateFoundExisting: true, + } + r := &Replicator{ + sink: s, + source: &source.FilerSource{Dir: "/source"}, + } + + err := r.Replicate(context.Background(), "/source/dir/file.txt", &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: "file.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 123, + }, + }, + NewEntry: &filer_pb.Entry{ + Name: "file.txt", + Attributes: &filer_pb.FuseAttributes{ + Mtime: 456, + }, + }, + NewParentPath: "/source/dir", + }) + if err != nil { + t.Fatalf("Replicate in-place update: %v", err) + } + if len(s.updateCalls) != 1 { t.Fatalf("update calls = %d, want 1", len(s.updateCalls)) } - if s.updateCalls[0].key != "/dest/old/file.txt" { - t.Fatalf("update key = %q, want /dest/old/file.txt", s.updateCalls[0].key) + if s.updateCalls[0].key != "/dest/dir/file.txt" { + t.Fatalf("update key = %q, want /dest/dir/file.txt", s.updateCalls[0].key) } - if s.updateCalls[0].newParentPath != "/dest/new" { - t.Fatalf("update newParentPath = %q, want /dest/new", s.updateCalls[0].newParentPath) + if s.updateCalls[0].newParentPath != "/dest/dir" { + t.Fatalf("update newParentPath = %q, want /dest/dir", s.updateCalls[0].newParentPath) } if len(s.deleteCalls) != 0 || len(s.createCalls) != 0 { t.Fatalf("unexpected delete/create calls: deletes=%+v creates=%+v", s.deleteCalls, s.createCalls) } } -func TestReplicateRenameFallbackCreatesTargetKey(t *testing.T) { +// When the in-place update finds no existing entry to update, fall back to +// delete-then-create at the same key. +func TestReplicateInPlaceUpdateFallbackCreates(t *testing.T) { s := &recordingSink{ name: "filer", sinkToDirectory: "/dest", @@ -162,7 +270,7 @@ func TestReplicateRenameFallbackCreatesTargetKey(t *testing.T) { source: &source.FilerSource{Dir: "/source"}, } - err := r.Replicate(context.Background(), "/source/old/file.txt", &filer_pb.EventNotification{ + err := r.Replicate(context.Background(), "/source/dir/file.txt", &filer_pb.EventNotification{ OldEntry: &filer_pb.Entry{ Name: "file.txt", Attributes: &filer_pb.FuseAttributes{ @@ -170,26 +278,41 @@ func TestReplicateRenameFallbackCreatesTargetKey(t *testing.T) { }, }, NewEntry: &filer_pb.Entry{ - Name: "renamed.txt", + Name: "file.txt", Attributes: &filer_pb.FuseAttributes{ - Mtime: 123, + Mtime: 456, }, }, - NewParentPath: "/source/new", + NewParentPath: "/source/dir", }) if err != nil { - t.Fatalf("Replicate rename fallback: %v", err) + t.Fatalf("Replicate in-place update fallback: %v", err) } if len(s.updateCalls) != 1 { t.Fatalf("update calls = %d, want 1", len(s.updateCalls)) } - if len(s.deleteCalls) != 1 || s.deleteCalls[0].key != "/dest/old/file.txt" { - t.Fatalf("delete calls = %+v, want old sink key", s.deleteCalls) + if len(s.deleteCalls) != 1 || s.deleteCalls[0].key != "/dest/dir/file.txt" { + t.Fatalf("delete calls = %+v, want same sink key", s.deleteCalls) } - if len(s.createCalls) != 1 || s.createCalls[0].key != "/dest/new/renamed.txt" { - t.Fatalf("create calls = %+v, want target sink key", s.createCalls) + if len(s.createCalls) != 1 || s.createCalls[0].key != "/dest/dir/file.txt" { + t.Fatalf("create calls = %+v, want same sink key", s.createCalls) } + if got, want := s.ordered, []string{"update", "delete", "create"}; !equalStrings(got, want) { + t.Fatalf("call order = %v, want %v", got, want) + } +} + +func equalStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true } func TestPathIsEqualOrUnderUsesDirectoryBoundaries(t *testing.T) { diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 340b3d4eb..f65fa852d 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "strings" "sync" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -166,6 +167,70 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo return nil } +var _ sink.EntryMover = (*FilerSink)(nil) + +// MoveEntry relocates oldKey to newKey on the target filer via AtomicRenameEntry: +// a metadata-only move that relocates a whole subtree in one transaction, so a +// directory rename never leaves descendants missing and chunks are neither +// re-copied nor leaked. +// +// When the move fails because the old path is genuinely gone on the sink — a +// descendant the parent rename already relocated, or one never replicated — +// there is nothing to move, so it creates the new path instead (CreateEntry +// short-circuits when the entry is already there, and never deletes). Existence +// is re-checked with a direct lookup rather than inferred from the rename error, +// so a rolled-back move that left the old entry intact propagates for retry +// instead of being mistaken for "gone". +func (fs *FilerSink) MoveEntry(oldKey, newKey string, newEntry *filer_pb.Entry, signatures []int32) error { + oldDir, oldName := util.FullPath(oldKey).DirAndName() + newDir, newName := util.FullPath(newKey).DirAndName() + + err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.AtomicRenameEntry(context.Background(), &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: oldDir, + OldName: oldName, + NewDirectory: newDir, + NewName: newName, + Signatures: signatures, + }) + return err + }) + if err == nil { + return nil + } + if missing, lookupErr := fs.entryMissing(oldKey); lookupErr == nil && missing { + glog.V(2).Infof("move %s => %s: old path gone, creating %s", oldKey, newKey, newKey) + return fs.CreateEntry(newKey, newEntry, signatures) + } + return fmt.Errorf("move %s => %s: %w", oldKey, newKey, err) +} + +// entryMissing reports whether key has no entry on the target filer. A lookup +// not-found (sentinel or the gRPC string form) means missing; any other lookup +// error is returned so the caller does not treat an unknown state as missing. +func (fs *FilerSink) entryMissing(key string) (bool, error) { + dir, name := util.FullPath(key).DirAndName() + missing := false + err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, lookupErr := filer_pb.LookupEntry(context.Background(), client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + }) + if lookupErr == nil { + return nil + } + // The string check is a deliberate compatibility fallback: a cross-cluster + // gRPC error often arrives as a plain status string that no longer wraps the + // filer_pb.ErrNotFound sentinel, so errors.Is alone would miss it. + if errors.Is(lookupErr, filer_pb.ErrNotFound) || strings.Contains(lookupErr.Error(), filer_pb.ErrNotFound.Error()) { + missing = true + return nil + } + return lookupErr + }) + return missing, err +} + func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index 29e6bbf8c..80eca0094 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -17,6 +17,15 @@ type ReplicationSink interface { IsIncremental() bool } +// EntryMover is an optional capability for sinks that can relocate an entry +// natively, in one atomic step, instead of create-then-delete. Drivers prefer +// it for a rename so a failed copy can never leave the source deleted with no +// committed destination, a directory move never deletes descendants before they +// are recreated, and the entry's chunks are neither re-copied nor leaked. +type EntryMover interface { + MoveEntry(oldKey, newKey string, newEntry *filer_pb.Entry, signatures []int32) error +} + var ( Sinks []ReplicationSink )