Files
seaweedfs/weed/shell/command_remote_cache.go
T
Chris Lu 8cc10460b4 fix(remote): correct content and permissions when syncing/caching remote objects (#9879)
* fix(remote): reject short reads when caching remote objects

A short read from the remote (stale listing size, truncated or flaky
response) was silently zero-padded: the S3 and Azure clients pre-size
the buffer and discard the downloaded byte count, and the chunk is
recorded with the requested size. The cached file then matched the
expected size but its tail was NULL, and the entry was marked cached
so it never re-fetched.

Check the byte count against the requested size in both clients, and
add a backend-agnostic guard in FetchAndWriteNeedle. The cache now
fails loudly and the entry stays remote-only for a later retry.

* fix(remote): match S3 default modes when syncing remote metadata

Remote object listings carry no POSIX mode, so synced entries were
created with a hardcoded 0644. Against a SeaweedFS remote, whose S3
layer writes objects as 0660 and auto-creates directories as 0771
(0660|0111), the mounted copy ended up 0644/0755 and the permissions
visibly diverged from the source.

Default to the S3 modes instead (files 0660, directories 0771). The
filer derives parent-dir modes from the child as fileMode|0111, so
fixing the file default also brings the directories into line.

Directory mtimes still reflect sync time: S3 listings don't enumerate
directories, so the remote's directory timestamps aren't available.
2026-06-08 13:55:53 -07:00

428 lines
15 KiB
Go

package shell
import (
"context"
"flag"
"fmt"
"io"
"math"
"sync"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func init() {
Commands = append(Commands, &commandRemoteCache{})
}
type commandRemoteCache struct {
}
func (c *commandRemoteCache) Name() string {
return "remote.cache"
}
func (c *commandRemoteCache) Help() string {
return `comprehensive synchronization and caching between local and remote storage
# assume a remote storage is configured to name "cloud1"
remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy
# mount and pull one bucket
remote.mount -dir=/xxx -remote=cloud1/bucket
# comprehensive sync and cache: update metadata, cache content, and remove deleted files
remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default)
remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching
remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote
remote.cache -dir=/xxx -concurrent=32 # with custom file-level concurrency
remote.cache -dir=/xxx -chunkConcurrency=16 # parallel chunk downloads per file (0 = server default 8)
remote.cache -dir=/xxx -downloadConcurrency=10 # S3 multipart download concurrency per chunk (0 = server default 5)
remote.cache -dir=/xxx -include=*.pdf # only sync PDF files
remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files
remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes
This command will:
1. Synchronize metadata from remote storage
2. Cache file content from remote by default
3. Remove local files that no longer exist on remote by default (use -deleteLocalExtra=false to disable)
This is designed to run regularly. So you can add it to some cronjob.
`
}
func (c *commandRemoteCache) HasTag(CommandTag) bool {
return false
}
func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteCacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
dir := remoteCacheCommand.String("dir", "", "a directory in filer")
cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote")
deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote")
concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations")
chunkConcurrency := remoteCacheCommand.Int("chunkConcurrency", 0, "parallel chunk downloads per file (0 = server default 8)")
downloadConcurrency := remoteCacheCommand.Int("downloadConcurrency", 0, "S3 multipart download concurrency per chunk (0 = server default 5)")
dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes")
fileFiler := newFileFilter(remoteCacheCommand)
if err = remoteCacheCommand.Parse(args); err != nil {
return nil
}
if *dir == "" {
return fmt.Errorf("need to specify -dir option")
}
if *chunkConcurrency < 0 || *chunkConcurrency > math.MaxInt32 {
return fmt.Errorf("chunkConcurrency must be between 0 and %d", math.MaxInt32)
}
if *downloadConcurrency < 0 || *downloadConcurrency > math.MaxInt32 {
return fmt.Errorf("downloadConcurrency must be between 0 and %d", math.MaxInt32)
}
mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
if detectErr != nil {
jsonPrintln(writer, mappings)
return detectErr
}
// perform comprehensive sync
return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, int32(*chunkConcurrency), int32(*downloadConcurrency), *dryRun, fileFiler)
}
func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, chunkConcurrency int32, downloadConcurrency int32, dryRun bool, fileFilter *FileFilter) error {
// visit remote storage
remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
if err != nil {
return err
}
remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToSync)
// Step 1: Collect all remote files
remoteFiles := make(map[string]*filer_pb.RemoteEntry)
err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
fullPath := string(localDir.Child(name))
remoteFiles[fullPath] = remoteEntry
return nil
})
if err != nil {
return fmt.Errorf("failed to traverse remote storage: %w", err)
}
fmt.Fprintf(writer, "Found %d files/directories in remote storage\n", len(remoteFiles))
// Step 2: Collect all local files (only if we need to delete local extra files)
localFiles := make(map[string]*filer_pb.Entry)
if deleteLocalExtra {
err = recursivelyTraverseDirectory(commandEnv, dirToSync, func(dir util.FullPath, entry *filer_pb.Entry) bool {
if entry.RemoteEntry != nil { // only consider files that are part of remote mount
fullPath := string(dir.Child(entry.Name))
localFiles[fullPath] = entry
}
return true
})
if err != nil {
return fmt.Errorf("failed to traverse local directory: %w", err)
}
fmt.Fprintf(writer, "Found %d files/directories in local storage\n", len(localFiles))
} else {
fmt.Fprintf(writer, "Skipping local file collection (deleteLocalExtra=false)\n")
}
// Step 3: Determine actions needed
var filesToDelete []string
var filesToUpdate []string
var filesToCache []string
// Find files to delete (exist locally but not remotely) - only if deleteLocalExtra is enabled
if deleteLocalExtra {
for localPath := range localFiles {
if _, exists := remoteFiles[localPath]; !exists {
filesToDelete = append(filesToDelete, localPath)
}
}
}
// Find files to update/cache (exist remotely)
for remotePath, remoteEntry := range remoteFiles {
if deleteLocalExtra {
// When deleteLocalExtra is enabled, we have localFiles to compare with
if localEntry, exists := localFiles[remotePath]; exists {
// File exists locally, check if it needs updating
if localEntry.RemoteEntry == nil ||
localEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag ||
localEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime {
filesToUpdate = append(filesToUpdate, remotePath)
}
// Check if it needs caching
if shouldCache && shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
filesToCache = append(filesToCache, remotePath)
}
} else {
// File doesn't exist locally, needs to be created
filesToUpdate = append(filesToUpdate, remotePath)
}
} else {
// When deleteLocalExtra is disabled, we check each file individually
// All remote files are candidates for update/creation
filesToUpdate = append(filesToUpdate, remotePath)
// For caching, we need to check if the local file exists and needs caching
if shouldCache {
// We need to look up the local file to check if it needs caching
localDir, name := util.FullPath(remotePath).DirAndName()
err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: localDir,
Name: name,
})
if lookupErr == nil {
localEntry := lookupResp.Entry
if shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
filesToCache = append(filesToCache, remotePath)
}
}
return nil // Don't propagate lookup errors here
})
if err != nil {
// Log error but continue
fmt.Fprintf(writer, "Warning: failed to lookup local file %s for caching check: %v\n", remotePath, err)
}
}
}
}
fmt.Fprintf(writer, "Actions needed: %d files to delete, %d files to update, %d files to cache\n",
len(filesToDelete), len(filesToUpdate), len(filesToCache))
if dryRun {
fmt.Fprintf(writer, "DRY RUN - showing what would be done:\n")
for _, path := range filesToDelete {
fmt.Fprintf(writer, "DELETE: %s\n", path)
}
for _, path := range filesToUpdate {
fmt.Fprintf(writer, "UPDATE: %s\n", path)
}
for _, path := range filesToCache {
fmt.Fprintf(writer, "CACHE: %s\n", path)
}
return nil
}
// Step 4: Execute actions
return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
// Delete files that no longer exist on remote (only if deleteLocalExtra is enabled)
if deleteLocalExtra {
for _, pathToDelete := range filesToDelete {
fmt.Fprintf(writer, "Deleting %s... ", pathToDelete)
dir, name := util.FullPath(pathToDelete).DirAndName()
_, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
Directory: dir,
Name: name,
IgnoreRecursiveError: false,
IsDeleteData: true,
IsRecursive: false,
IsFromOtherCluster: false,
})
if err != nil {
fmt.Fprintf(writer, "failed: %v\n", err)
return err
}
fmt.Fprintf(writer, "done\n")
}
}
// Update metadata for files that exist on remote
for _, pathToUpdate := range filesToUpdate {
remoteEntry := remoteFiles[pathToUpdate]
localDir, name := util.FullPath(pathToUpdate).DirAndName()
fmt.Fprintf(writer, "Updating metadata for %s... ", pathToUpdate)
// Check if file exists locally
lookupResp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
Directory: string(localDir),
Name: name,
})
if lookupErr != nil && lookupErr != filer_pb.ErrNotFound {
fmt.Fprintf(writer, "failed to lookup: %v\n", lookupErr)
continue
}
isDirectory := remoteEntry.RemoteSize == 0 && remoteEntry.RemoteMtime == 0
if lookupErr == filer_pb.ErrNotFound {
// Create new entry
_, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
Directory: string(localDir),
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: isDirectory,
Attributes: &filer_pb.FuseAttributes{
FileSize: uint64(remoteEntry.RemoteSize),
Mtime: remoteEntry.RemoteMtime,
FileMode: remoteEntryFileMode(isDirectory),
},
RemoteEntry: remoteEntry,
},
})
if createErr != nil {
fmt.Fprintf(writer, "failed to create: %v\n", createErr)
continue
}
} else {
// Update existing entry
existingEntry := lookupResp.Entry
if existingEntry.RemoteEntry == nil {
// This is a local file, skip to avoid overwriting
fmt.Fprintf(writer, "skipped (local file)\n")
continue
}
existingEntry.RemoteEntry = remoteEntry
existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize)
existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime
existingEntry.Attributes.Md5 = nil
existingEntry.Chunks = nil
existingEntry.Content = nil
_, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
Directory: string(localDir),
Entry: existingEntry,
})
if updateErr != nil {
fmt.Fprintf(writer, "failed to update: %v\n", updateErr)
continue
}
}
fmt.Fprintf(writer, "done\n")
}
// Cache file content if requested
if shouldCache && len(filesToCache) > 0 {
fmt.Fprintf(writer, "Caching file content...\n")
var wg sync.WaitGroup
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
var executionErr error
var execErrMu sync.Mutex
for _, pathToCache := range filesToCache {
wg.Add(1)
pathToCacheCopy := pathToCache // Capture for closure
limitedConcurrentExecutor.Execute(func() {
defer wg.Done()
// Get local entry (either from localFiles map or by lookup)
var localEntry *filer_pb.Entry
if deleteLocalExtra {
localEntry = localFiles[pathToCacheCopy]
if localEntry == nil {
fmt.Fprintf(writer, "Warning: skipping cache for %s (local entry not found)\n", pathToCacheCopy)
return
}
} else {
// Look up the local entry since we don't have it in localFiles
localDir, name := util.FullPath(pathToCacheCopy).DirAndName()
lookupErr := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
lookupResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: localDir,
Name: name,
})
if err == nil {
localEntry = lookupResp.Entry
}
return err
})
if lookupErr != nil {
fmt.Fprintf(writer, "Warning: failed to lookup local entry for caching %s: %v\n", pathToCacheCopy, lookupErr)
return
}
}
dir, _ := util.FullPath(pathToCacheCopy).DirAndName()
fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy)
if _, err := filer.CacheRemoteObjectToLocalCluster(commandEnv, util.FullPath(dir), localEntry, chunkConcurrency, downloadConcurrency); err != nil {
fmt.Fprintf(writer, "failed: %v\n", err)
execErrMu.Lock()
if executionErr == nil {
executionErr = err
}
execErrMu.Unlock()
return
}
fmt.Fprintf(writer, "done\n")
})
}
wg.Wait()
if executionErr != nil {
return executionErr
}
}
return nil
})
}
func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) {
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
if !visitEntry(dirPath, entry) {
return nil
}
subDir := dirPath.Child(entry.Name)
if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil {
return err
}
} else {
if !visitEntry(dirPath, entry) {
return nil
}
}
return nil
})
return
}
func shouldCacheToLocal(entry *filer_pb.Entry) bool {
if entry.IsDirectory {
return false
}
if entry.RemoteEntry == nil {
return false
}
if entry.RemoteEntry.LastLocalSyncTsNs == 0 && entry.RemoteEntry.RemoteSize > 0 {
return true
}
return false
}
func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
if entry.IsDirectory {
return false
}
if entry.RemoteEntry == nil {
return false // should not uncache an entry that is not in remote
}
if entry.RemoteEntry.LastLocalSyncTsNs > 0 {
return true
}
return false
}