mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
8cc10460b4
* fix(remote): reject short reads when caching remote objects A short read from the remote (stale listing size, truncated or flaky response) was silently zero-padded: the S3 and Azure clients pre-size the buffer and discard the downloaded byte count, and the chunk is recorded with the requested size. The cached file then matched the expected size but its tail was NULL, and the entry was marked cached so it never re-fetched. Check the byte count against the requested size in both clients, and add a backend-agnostic guard in FetchAndWriteNeedle. The cache now fails loudly and the entry stays remote-only for a later retry. * fix(remote): match S3 default modes when syncing remote metadata Remote object listings carry no POSIX mode, so synced entries were created with a hardcoded 0644. Against a SeaweedFS remote, whose S3 layer writes objects as 0660 and auto-creates directories as 0771 (0660|0111), the mounted copy ended up 0644/0755 and the permissions visibly diverged from the source. Default to the S3 modes instead (files 0660, directories 0771). The filer derives parent-dir modes from the child as fileMode|0111, so fixing the file default also brings the directories into line. Directory mtimes still reflect sync time: S3 listings don't enumerate directories, so the remote's directory timestamps aren't available.
428 lines
15 KiB
Go
428 lines
15 KiB
Go
package shell
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"sync"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandRemoteCache{})
|
|
}
|
|
|
|
type commandRemoteCache struct {
|
|
}
|
|
|
|
func (c *commandRemoteCache) Name() string {
|
|
return "remote.cache"
|
|
}
|
|
|
|
func (c *commandRemoteCache) Help() string {
|
|
return `comprehensive synchronization and caching between local and remote storage
|
|
|
|
# assume a remote storage is configured to name "cloud1"
|
|
remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy
|
|
# mount and pull one bucket
|
|
remote.mount -dir=/xxx -remote=cloud1/bucket
|
|
|
|
# comprehensive sync and cache: update metadata, cache content, and remove deleted files
|
|
remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default)
|
|
remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching
|
|
remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote
|
|
remote.cache -dir=/xxx -concurrent=32 # with custom file-level concurrency
|
|
remote.cache -dir=/xxx -chunkConcurrency=16 # parallel chunk downloads per file (0 = server default 8)
|
|
remote.cache -dir=/xxx -downloadConcurrency=10 # S3 multipart download concurrency per chunk (0 = server default 5)
|
|
remote.cache -dir=/xxx -include=*.pdf # only sync PDF files
|
|
remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files
|
|
remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes
|
|
|
|
This command will:
|
|
1. Synchronize metadata from remote storage
|
|
2. Cache file content from remote by default
|
|
3. Remove local files that no longer exist on remote by default (use -deleteLocalExtra=false to disable)
|
|
|
|
This is designed to run regularly. So you can add it to some cronjob.
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandRemoteCache) HasTag(CommandTag) bool {
|
|
return false
|
|
}
|
|
|
|
func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
remoteCacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
|
|
dir := remoteCacheCommand.String("dir", "", "a directory in filer")
|
|
cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote")
|
|
deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote")
|
|
concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations")
|
|
chunkConcurrency := remoteCacheCommand.Int("chunkConcurrency", 0, "parallel chunk downloads per file (0 = server default 8)")
|
|
downloadConcurrency := remoteCacheCommand.Int("downloadConcurrency", 0, "S3 multipart download concurrency per chunk (0 = server default 5)")
|
|
dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes")
|
|
fileFiler := newFileFilter(remoteCacheCommand)
|
|
|
|
if err = remoteCacheCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
if *dir == "" {
|
|
return fmt.Errorf("need to specify -dir option")
|
|
}
|
|
if *chunkConcurrency < 0 || *chunkConcurrency > math.MaxInt32 {
|
|
return fmt.Errorf("chunkConcurrency must be between 0 and %d", math.MaxInt32)
|
|
}
|
|
if *downloadConcurrency < 0 || *downloadConcurrency > math.MaxInt32 {
|
|
return fmt.Errorf("downloadConcurrency must be between 0 and %d", math.MaxInt32)
|
|
}
|
|
|
|
mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
|
|
if detectErr != nil {
|
|
jsonPrintln(writer, mappings)
|
|
return detectErr
|
|
}
|
|
|
|
// perform comprehensive sync
|
|
return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, int32(*chunkConcurrency), int32(*downloadConcurrency), *dryRun, fileFiler)
|
|
}
|
|
|
|
func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, chunkConcurrency int32, downloadConcurrency int32, dryRun bool, fileFilter *FileFilter) error {
|
|
|
|
// visit remote storage
|
|
remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToSync)
|
|
|
|
// Step 1: Collect all remote files
|
|
remoteFiles := make(map[string]*filer_pb.RemoteEntry)
|
|
err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
|
|
localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
|
|
fullPath := string(localDir.Child(name))
|
|
remoteFiles[fullPath] = remoteEntry
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to traverse remote storage: %w", err)
|
|
}
|
|
|
|
fmt.Fprintf(writer, "Found %d files/directories in remote storage\n", len(remoteFiles))
|
|
|
|
// Step 2: Collect all local files (only if we need to delete local extra files)
|
|
localFiles := make(map[string]*filer_pb.Entry)
|
|
if deleteLocalExtra {
|
|
err = recursivelyTraverseDirectory(commandEnv, dirToSync, func(dir util.FullPath, entry *filer_pb.Entry) bool {
|
|
if entry.RemoteEntry != nil { // only consider files that are part of remote mount
|
|
fullPath := string(dir.Child(entry.Name))
|
|
localFiles[fullPath] = entry
|
|
}
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to traverse local directory: %w", err)
|
|
}
|
|
fmt.Fprintf(writer, "Found %d files/directories in local storage\n", len(localFiles))
|
|
} else {
|
|
fmt.Fprintf(writer, "Skipping local file collection (deleteLocalExtra=false)\n")
|
|
}
|
|
|
|
// Step 3: Determine actions needed
|
|
var filesToDelete []string
|
|
var filesToUpdate []string
|
|
var filesToCache []string
|
|
|
|
// Find files to delete (exist locally but not remotely) - only if deleteLocalExtra is enabled
|
|
if deleteLocalExtra {
|
|
for localPath := range localFiles {
|
|
if _, exists := remoteFiles[localPath]; !exists {
|
|
filesToDelete = append(filesToDelete, localPath)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Find files to update/cache (exist remotely)
|
|
for remotePath, remoteEntry := range remoteFiles {
|
|
if deleteLocalExtra {
|
|
// When deleteLocalExtra is enabled, we have localFiles to compare with
|
|
if localEntry, exists := localFiles[remotePath]; exists {
|
|
// File exists locally, check if it needs updating
|
|
if localEntry.RemoteEntry == nil ||
|
|
localEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag ||
|
|
localEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime {
|
|
filesToUpdate = append(filesToUpdate, remotePath)
|
|
}
|
|
// Check if it needs caching
|
|
if shouldCache && shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
|
|
filesToCache = append(filesToCache, remotePath)
|
|
}
|
|
} else {
|
|
// File doesn't exist locally, needs to be created
|
|
filesToUpdate = append(filesToUpdate, remotePath)
|
|
}
|
|
} else {
|
|
// When deleteLocalExtra is disabled, we check each file individually
|
|
// All remote files are candidates for update/creation
|
|
filesToUpdate = append(filesToUpdate, remotePath)
|
|
|
|
// For caching, we need to check if the local file exists and needs caching
|
|
if shouldCache {
|
|
// We need to look up the local file to check if it needs caching
|
|
localDir, name := util.FullPath(remotePath).DirAndName()
|
|
err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: localDir,
|
|
Name: name,
|
|
})
|
|
if lookupErr == nil {
|
|
localEntry := lookupResp.Entry
|
|
if shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
|
|
filesToCache = append(filesToCache, remotePath)
|
|
}
|
|
}
|
|
return nil // Don't propagate lookup errors here
|
|
})
|
|
if err != nil {
|
|
// Log error but continue
|
|
fmt.Fprintf(writer, "Warning: failed to lookup local file %s for caching check: %v\n", remotePath, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fmt.Fprintf(writer, "Actions needed: %d files to delete, %d files to update, %d files to cache\n",
|
|
len(filesToDelete), len(filesToUpdate), len(filesToCache))
|
|
|
|
if dryRun {
|
|
fmt.Fprintf(writer, "DRY RUN - showing what would be done:\n")
|
|
for _, path := range filesToDelete {
|
|
fmt.Fprintf(writer, "DELETE: %s\n", path)
|
|
}
|
|
for _, path := range filesToUpdate {
|
|
fmt.Fprintf(writer, "UPDATE: %s\n", path)
|
|
}
|
|
for _, path := range filesToCache {
|
|
fmt.Fprintf(writer, "CACHE: %s\n", path)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Step 4: Execute actions
|
|
return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
ctx := context.Background()
|
|
|
|
// Delete files that no longer exist on remote (only if deleteLocalExtra is enabled)
|
|
if deleteLocalExtra {
|
|
for _, pathToDelete := range filesToDelete {
|
|
fmt.Fprintf(writer, "Deleting %s... ", pathToDelete)
|
|
|
|
dir, name := util.FullPath(pathToDelete).DirAndName()
|
|
_, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
|
|
Directory: dir,
|
|
Name: name,
|
|
IgnoreRecursiveError: false,
|
|
IsDeleteData: true,
|
|
IsRecursive: false,
|
|
IsFromOtherCluster: false,
|
|
})
|
|
if err != nil {
|
|
fmt.Fprintf(writer, "failed: %v\n", err)
|
|
return err
|
|
}
|
|
fmt.Fprintf(writer, "done\n")
|
|
}
|
|
}
|
|
|
|
// Update metadata for files that exist on remote
|
|
for _, pathToUpdate := range filesToUpdate {
|
|
remoteEntry := remoteFiles[pathToUpdate]
|
|
localDir, name := util.FullPath(pathToUpdate).DirAndName()
|
|
|
|
fmt.Fprintf(writer, "Updating metadata for %s... ", pathToUpdate)
|
|
|
|
// Check if file exists locally
|
|
lookupResp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: string(localDir),
|
|
Name: name,
|
|
})
|
|
|
|
if lookupErr != nil && lookupErr != filer_pb.ErrNotFound {
|
|
fmt.Fprintf(writer, "failed to lookup: %v\n", lookupErr)
|
|
continue
|
|
}
|
|
|
|
isDirectory := remoteEntry.RemoteSize == 0 && remoteEntry.RemoteMtime == 0
|
|
if lookupErr == filer_pb.ErrNotFound {
|
|
// Create new entry
|
|
_, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
|
|
Directory: string(localDir),
|
|
Entry: &filer_pb.Entry{
|
|
Name: name,
|
|
IsDirectory: isDirectory,
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: uint64(remoteEntry.RemoteSize),
|
|
Mtime: remoteEntry.RemoteMtime,
|
|
FileMode: remoteEntryFileMode(isDirectory),
|
|
},
|
|
RemoteEntry: remoteEntry,
|
|
},
|
|
})
|
|
if createErr != nil {
|
|
fmt.Fprintf(writer, "failed to create: %v\n", createErr)
|
|
continue
|
|
}
|
|
} else {
|
|
// Update existing entry
|
|
existingEntry := lookupResp.Entry
|
|
if existingEntry.RemoteEntry == nil {
|
|
// This is a local file, skip to avoid overwriting
|
|
fmt.Fprintf(writer, "skipped (local file)\n")
|
|
continue
|
|
}
|
|
|
|
existingEntry.RemoteEntry = remoteEntry
|
|
existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize)
|
|
existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime
|
|
existingEntry.Attributes.Md5 = nil
|
|
existingEntry.Chunks = nil
|
|
existingEntry.Content = nil
|
|
|
|
_, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
|
|
Directory: string(localDir),
|
|
Entry: existingEntry,
|
|
})
|
|
if updateErr != nil {
|
|
fmt.Fprintf(writer, "failed to update: %v\n", updateErr)
|
|
continue
|
|
}
|
|
}
|
|
fmt.Fprintf(writer, "done\n")
|
|
}
|
|
|
|
// Cache file content if requested
|
|
if shouldCache && len(filesToCache) > 0 {
|
|
fmt.Fprintf(writer, "Caching file content...\n")
|
|
|
|
var wg sync.WaitGroup
|
|
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
|
|
var executionErr error
|
|
var execErrMu sync.Mutex
|
|
|
|
for _, pathToCache := range filesToCache {
|
|
wg.Add(1)
|
|
pathToCacheCopy := pathToCache // Capture for closure
|
|
limitedConcurrentExecutor.Execute(func() {
|
|
defer wg.Done()
|
|
|
|
// Get local entry (either from localFiles map or by lookup)
|
|
var localEntry *filer_pb.Entry
|
|
if deleteLocalExtra {
|
|
localEntry = localFiles[pathToCacheCopy]
|
|
if localEntry == nil {
|
|
fmt.Fprintf(writer, "Warning: skipping cache for %s (local entry not found)\n", pathToCacheCopy)
|
|
return
|
|
}
|
|
} else {
|
|
// Look up the local entry since we don't have it in localFiles
|
|
localDir, name := util.FullPath(pathToCacheCopy).DirAndName()
|
|
lookupErr := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
lookupResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: localDir,
|
|
Name: name,
|
|
})
|
|
if err == nil {
|
|
localEntry = lookupResp.Entry
|
|
}
|
|
return err
|
|
})
|
|
if lookupErr != nil {
|
|
fmt.Fprintf(writer, "Warning: failed to lookup local entry for caching %s: %v\n", pathToCacheCopy, lookupErr)
|
|
return
|
|
}
|
|
}
|
|
|
|
dir, _ := util.FullPath(pathToCacheCopy).DirAndName()
|
|
|
|
fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy)
|
|
|
|
if _, err := filer.CacheRemoteObjectToLocalCluster(commandEnv, util.FullPath(dir), localEntry, chunkConcurrency, downloadConcurrency); err != nil {
|
|
fmt.Fprintf(writer, "failed: %v\n", err)
|
|
execErrMu.Lock()
|
|
if executionErr == nil {
|
|
executionErr = err
|
|
}
|
|
execErrMu.Unlock()
|
|
return
|
|
}
|
|
fmt.Fprintf(writer, "done\n")
|
|
})
|
|
}
|
|
|
|
wg.Wait()
|
|
if executionErr != nil {
|
|
return executionErr
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) {
|
|
|
|
err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error {
|
|
if entry.IsDirectory {
|
|
if !visitEntry(dirPath, entry) {
|
|
return nil
|
|
}
|
|
subDir := dirPath.Child(entry.Name)
|
|
if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if !visitEntry(dirPath, entry) {
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return
|
|
}
|
|
|
|
func shouldCacheToLocal(entry *filer_pb.Entry) bool {
|
|
if entry.IsDirectory {
|
|
return false
|
|
}
|
|
if entry.RemoteEntry == nil {
|
|
return false
|
|
}
|
|
if entry.RemoteEntry.LastLocalSyncTsNs == 0 && entry.RemoteEntry.RemoteSize > 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
|
|
if entry.IsDirectory {
|
|
return false
|
|
}
|
|
if entry.RemoteEntry == nil {
|
|
return false // should not uncache an entry that is not in remote
|
|
}
|
|
if entry.RemoteEntry.LastLocalSyncTsNs > 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|