mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(filer.sync): scope filesystem key sanitization to the local sink (#9894)
* fix(filer.sync): scope filesystem key sanitization to the local sink destKey ran every sink key through escapeKey, whose Windows build strips colons. Colons are illegal in NTFS filenames so the local sink needs that, but s3/filer/azure/gcs/b2 accept them as ordinary key bytes — stripping them silently diverged the destination key (a source a:b replicated as ab). Move the sanitization into the local sink behind a Windows build tag, applied at every entry point so the previously-unescaped in-place-update paths stay consistent. Non-local sinks now keep the raw key; non-Windows builds are unchanged; a leading drive-letter colon is preserved. * test(filer.sync): cover incremental destKey and localsink update/delete sanitization Lock the colon-preserving behavior for the incremental destKey branch, and extend the Windows local-sink test to assert UpdateEntry and DeleteEntry also sanitize the key, not just CreateEntry.
This commit is contained in:
@@ -694,10 +694,10 @@ func destKey(dataSink sink.ReplicationSink, targetPath, sourcePath string, sourc
|
||||
relative = strings.TrimPrefix(sk, "/")
|
||||
}
|
||||
if !dataSink.IsIncremental() {
|
||||
return escapeKey(util.Join(targetPath, relative))
|
||||
return util.Join(targetPath, relative)
|
||||
}
|
||||
dateKey := time.Unix(mTime, 0).Format("2006-01-02")
|
||||
return escapeKey(util.Join(targetPath, dateKey, relative))
|
||||
return util.Join(targetPath, dateKey, relative)
|
||||
}
|
||||
|
||||
// isEntryExcluded checks whether a single side (old or new) of an event is excluded
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
@@ -12,9 +13,10 @@ import (
|
||||
var _ sink.ReplicationSink = (*recordingSyncSink)(nil)
|
||||
|
||||
type recordingSyncSink struct {
|
||||
deleteKeys []string
|
||||
createKeys []string
|
||||
updateKeys []string
|
||||
deleteKeys []string
|
||||
createKeys []string
|
||||
updateKeys []string
|
||||
incremental bool
|
||||
}
|
||||
|
||||
func (s *recordingSyncSink) GetName() string { return "recording" }
|
||||
@@ -35,7 +37,27 @@ func (s *recordingSyncSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, ne
|
||||
}
|
||||
func (s *recordingSyncSink) GetSinkToDirectory() string { return "/dest" }
|
||||
func (s *recordingSyncSink) SetSourceFiler(*source.FilerSource) {}
|
||||
func (s *recordingSyncSink) IsIncremental() bool { return false }
|
||||
func (s *recordingSyncSink) IsIncremental() bool { return s.incremental }
|
||||
|
||||
// TestDestKeyPreservesColonForNonLocalSink guards against the command layer
|
||||
// stripping colons from keys destined for non-local sinks. Colon
|
||||
// sanitization belongs to the local filesystem sink only.
|
||||
func TestDestKeyPreservesColonForNonLocalSink(t *testing.T) {
|
||||
t.Run("non-incremental", func(t *testing.T) {
|
||||
got := destKey(&recordingSyncSink{}, "/backup", "/src", util.FullPath("/src/2024:01:02/file:name.txt"), 0)
|
||||
want := "/backup/2024:01:02/file:name.txt"
|
||||
if got != want {
|
||||
t.Errorf("destKey() = %q, want %q", got, want)
|
||||
}
|
||||
})
|
||||
t.Run("incremental", func(t *testing.T) {
|
||||
got := destKey(&recordingSyncSink{incremental: true}, "/backup", "/src", util.FullPath("/src/file:name.txt"), 0)
|
||||
// the date partition varies by timezone; assert the colon-bearing name survives.
|
||||
if !strings.HasPrefix(got, "/backup/") || !strings.HasSuffix(got, "/file:name.txt") {
|
||||
t.Errorf("destKey() = %q, want /backup/<date>/file:name.txt with colon preserved", got)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestPathIsEqualOrUnderUsesDirectoryBoundaries(t *testing.T) {
|
||||
tests := []struct {
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
//go:build !windows
|
||||
|
||||
package command
|
||||
|
||||
func escapeKey(key string) string {
|
||||
return key
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
func escapeKey(key string) string {
|
||||
if strings.Contains(key, ":") {
|
||||
return strings.ReplaceAll(key, ":", "")
|
||||
}
|
||||
return key
|
||||
}
|
||||
@@ -61,6 +61,7 @@ func (localsink *LocalSink) IsIncremental() bool {
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
|
||||
key = sanitizeFsKey(key)
|
||||
if localsink.isMultiPartEntry(key) {
|
||||
return nil
|
||||
}
|
||||
@@ -72,6 +73,7 @@ func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeCh
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
|
||||
key = sanitizeFsKey(key)
|
||||
if entry.IsDirectory || localsink.isMultiPartEntry(key) {
|
||||
return nil
|
||||
}
|
||||
@@ -151,6 +153,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
|
||||
}
|
||||
|
||||
func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
|
||||
key = sanitizeFsKey(key)
|
||||
if localsink.isMultiPartEntry(key) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
//go:build !windows
|
||||
|
||||
package localsink
|
||||
|
||||
// sanitizeFsKey is a no-op on non-Windows filesystems. The Windows build
|
||||
// strips characters that are illegal in NTFS paths.
|
||||
func sanitizeFsKey(key string) string {
|
||||
return key
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
//go:build !windows
|
||||
|
||||
package localsink
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestSanitizeFsKeyKeepsColons(t *testing.T) {
|
||||
key := "/backup/a:b/c:d.txt"
|
||||
if got := sanitizeFsKey(key); got != key {
|
||||
t.Errorf("sanitizeFsKey(%q) = %q, want identity", key, got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
//go:build windows
|
||||
|
||||
package localsink
|
||||
|
||||
import "strings"
|
||||
|
||||
// sanitizeFsKey strips characters illegal in NTFS paths (notably ':') so the
|
||||
// local filesystem sink can write the key. A leading drive-letter colon (e.g.
|
||||
// "C:\...") is preserved so absolute target paths stay valid. Only the local
|
||||
// sink needs this; other sinks accept the raw key.
|
||||
func sanitizeFsKey(key string) string {
|
||||
prefix, rest := "", key
|
||||
if len(key) >= 2 && key[1] == ':' && isDriveLetter(key[0]) {
|
||||
prefix, rest = key[:2], key[2:]
|
||||
}
|
||||
return prefix + strings.ReplaceAll(rest, ":", "")
|
||||
}
|
||||
|
||||
func isDriveLetter(b byte) bool {
|
||||
return (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z')
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
//go:build windows
|
||||
|
||||
package localsink
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/replication/source"
|
||||
)
|
||||
|
||||
func TestSanitizeFsKeyStripsColons(t *testing.T) {
|
||||
if got := sanitizeFsKey("/backup/a:b/c:d.txt"); got != "/backup/ab/cd.txt" {
|
||||
t.Errorf("sanitizeFsKey() = %q, want %q", got, "/backup/ab/cd.txt")
|
||||
}
|
||||
if got := sanitizeFsKey(`C:\a:b`); got != `C:\ab` {
|
||||
t.Errorf("sanitizeFsKey() = %q, want drive letter preserved %q", got, `C:\ab`)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCreateEntryWritesSanitizedPath confirms CreateEntry strips colons from
|
||||
// the destination path before writing, so keys land at NTFS-legal paths.
|
||||
func TestCreateEntryWritesSanitizedPath(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
sink := &LocalSink{}
|
||||
sink.initialize(tmpDir, false)
|
||||
sink.SetSourceFiler(&source.FilerSource{})
|
||||
|
||||
key := filepath.Join(tmpDir, "20:24", "fi:le.txt")
|
||||
entry := &filer_pb.Entry{
|
||||
Attributes: &filer_pb.FuseAttributes{FileMode: uint32(0644)},
|
||||
Content: []byte("data"),
|
||||
}
|
||||
if err := sink.CreateEntry(key, entry, nil); err != nil {
|
||||
t.Fatalf("CreateEntry failed: %v", err)
|
||||
}
|
||||
|
||||
want := sanitizeFsKey(key)
|
||||
if _, err := os.Stat(want); err != nil {
|
||||
t.Errorf("expected file at sanitized path %q: %v", want, err)
|
||||
}
|
||||
if _, err := os.Stat(key); err == nil {
|
||||
t.Errorf("unexpected file at unsanitized path %q", key)
|
||||
}
|
||||
|
||||
updated := &filer_pb.Entry{
|
||||
Attributes: &filer_pb.FuseAttributes{FileMode: uint32(0644)},
|
||||
Content: []byte("data-v2"),
|
||||
}
|
||||
if _, err := sink.UpdateEntry(key, entry, filepath.Dir(key), updated, false, nil); err != nil {
|
||||
t.Fatalf("UpdateEntry failed: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(want); err != nil {
|
||||
t.Errorf("expected updated file at sanitized path %q: %v", want, err)
|
||||
}
|
||||
|
||||
if err := sink.DeleteEntry(key, false, false, nil); err != nil {
|
||||
t.Fatalf("DeleteEntry failed: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(want); !os.IsNotExist(err) {
|
||||
t.Errorf("expected sanitized path %q removed, stat err=%v", want, err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user