Files
seaweedfs/weed/command/filer_sync.go
T
Chris Lu e12052ee6b fix(filer.sync): replicate a rename as an atomic move, not a no-op update (#9895)
* fix(filer.sync): replicate a rename as create-then-delete, not an in-place update

A rename arrives as a single metadata event carrying both the old and new
entry. The filer sink was routed to UpdateEntry, which looks up the old
path but issues the update against the new parent without changing the
name — and the filer UpdateEntry RPC cannot move an entry. So the rename
was dropped: the old path lingered and the new path never appeared
(same-dir renames rewrote the old name in place).

Route a real move (the sink path changed) through CreateEntry(new) then
DeleteEntry(old) in both the replicator and the filer.sync/backup driver,
the way the other sinks already handle it; reach UpdateEntry only for true
in-place updates. Create before delete so a crash between the two leaves
the entry visible rather than lost.

* fix(filer.sync): derive the rename delete key like the create key, guard the watched root

The rename delete leg rebuilt the old key with a raw util.Join, bypassing the
sink-side key normalization the create leg gets from buildKey — so a rename
could create the new entry and then fail to delete the old one under a
transformed key. Build the old key through buildKey too, and skip the delete
when the moved entry is the watched root itself (where the old key would
resolve to the target root and recursively delete the whole sink tree).

* test(filer.sync): cover the in-place update delete-then-create fallback order

The recording sinks always reported foundExisting, so the fallback that an
in-place update takes when the entry is missing on the sink was never run.
Make it configurable and assert the fallback deletes before it recreates the
same key, in both the replicator and the filer.sync drivers.

* feat(filer.sync): move filer-sink renames natively via AtomicRenameEntry

create-then-delete is unsafe for the filer sink: CreateEntry returns nil
without creating on a transient chunk-copy error, so the paired delete could
remove the only valid destination copy; a directory rename also deleted the
old subtree before descendants were recreated, and left old chunks behind.

Add an optional EntryMover sink capability and implement it on the filer sink
via AtomicRenameEntry — one atomic, metadata-only move that relocates a whole
subtree in a single transaction. Renames prefer it; sinks without a native
move keep create-then-delete. When the old path is already gone (a descendant
the parent rename moved, or one never replicated) MoveEntry creates the new
path instead, re-checking existence with a lookup so a rolled-back move that
left the old entry intact is retried rather than mistaken for gone.

* docs(filer.sync): note entryMissing's gRPC not-found string fallback is deliberate
2026-06-09 12:54:28 -07:00

811 lines
32 KiB
Go

package command
import (
"context"
"errors"
"fmt"
"os"
"regexp"
"strings"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/security"
statsCollect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
"google.golang.org/grpc"
)
type SyncOptions struct {
isActivePassive *bool
filerA *string
filerB *string
aPath *string
aExcludePaths *string
bPath *string
bExcludePaths *string
aReplication *string
bReplication *string
aCollection *string
bCollection *string
aTtlSec *int
bTtlSec *int
aDiskType *string
bDiskType *string
aDebug *bool
bDebug *bool
aFromTsMs *int64
bFromTsMs *int64
aProxyByFiler *bool
bProxyByFiler *bool
metricsHttpIp *string
metricsHttpPort *int
concurrency *int
chunkConcurrency *int
aDoDeleteFiles *bool
bDoDeleteFiles *bool
aSecurity *string
bSecurity *string
clientId int32
clientEpoch atomic.Int32
debug *bool
debugPort *int
}
const (
SyncKeyPrefix = "sync."
DefaultConcurrencyLimit = 32
)
// syncState tracks the current sync state for graceful shutdown checkpoint saving
type syncState struct {
processor *MetadataProcessor
grpcDialOption grpc.DialOption
targetFiler pb.ServerAddress
sourcePath string
sourceFilerSignature int32
}
var (
syncOptions SyncOptions
syncCpuProfile *string
syncMemProfile *string
// atomic pointers to current sync states for graceful shutdown
syncStateA2B atomic.Pointer[syncState]
syncStateB2A atomic.Pointer[syncState]
)
func init() {
cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A")
syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B")
syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrencyLimit, "The maximum number of files that will be synced concurrently.")
syncOptions.chunkConcurrency = cmdFilerSynchronize.Flag.Int("chunkConcurrency", 32, "The maximum number of chunks that will be replicated concurrently per file.")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip")
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A")
syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B")
syncOptions.aSecurity = cmdFilerSynchronize.Flag.String("a.security", "", "security.toml file for filer A when clusters use different certificates")
syncOptions.bSecurity = cmdFilerSynchronize.Flag.String("b.security", "", "security.toml file for filer B when clusters use different certificates")
syncOptions.debug = cmdFilerSynchronize.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
syncOptions.debugPort = cmdFilerSynchronize.Flag.Int("debug.port", 6060, "http port for debugging")
syncOptions.clientId = util.RandomInt32()
}
var cmdFilerSynchronize = &Command{
UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination. Different from filer.replicate:
* filer.sync only works between two filers.
* filer.sync does not need any special message queue setup.
* filer.sync supports both active-active and active-passive modes.
If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
A fresh sync will start from the earliest metadata logs.
`,
}
func runFilerSynchronize(cmd *Command, args []string) bool {
if *syncOptions.debug {
grace.StartDebugServer(*syncOptions.debugPort)
}
*syncCpuProfile = util.ResolvePath(*syncCpuProfile)
*syncMemProfile = util.ResolvePath(*syncMemProfile)
*syncOptions.aSecurity = util.ResolvePath(*syncOptions.aSecurity)
*syncOptions.bSecurity = util.ResolvePath(*syncOptions.bSecurity)
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
// per-filer TLS when clusters use different certificates
grpcDialOptionA := grpcDialOption
grpcDialOptionB := grpcDialOption
if *syncOptions.aSecurity != "" {
var err error
if grpcDialOptionA, err = security.LoadClientTLSFromFile(*syncOptions.aSecurity, "grpc.client"); err != nil {
glog.Fatalf("load security config for filer A: %v", err)
}
}
if *syncOptions.bSecurity != "" {
var err error
if grpcDialOptionB, err = security.LoadClientTLSFromFile(*syncOptions.bSecurity, "grpc.client"); err != nil {
glog.Fatalf("load security config for filer B: %v", err)
}
}
// per-cluster HTTPS clients for volume server connections
var httpClientA, httpClientB *util_http_client.HTTPClient
if *syncOptions.aSecurity != "" {
var err error
if httpClientA, err = security.LoadHTTPClientFromFile(*syncOptions.aSecurity); err != nil {
glog.Fatalf("load HTTPS client config for filer A: %v", err)
}
}
if *syncOptions.bSecurity != "" {
var err error
if httpClientB, err = security.LoadHTTPClientFromFile(*syncOptions.bSecurity); err != nil {
glog.Fatalf("load HTTPS client config for filer B: %v", err)
}
}
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
filerA := pb.ServerAddress(*syncOptions.filerA)
filerB := pb.ServerAddress(*syncOptions.filerB)
// start filer.sync metrics server
go statsCollect.StartMetricsServer(*syncOptions.metricsHttpIp, *syncOptions.metricsHttpPort)
// read a filer signature
aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOptionA, filerA)
if aFilerErr != nil {
glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
return true
}
// read b filer signature
bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOptionB, filerB)
if bFilerErr != nil {
glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
return true
}
// register graceful shutdown hook to save checkpoints
grace.OnInterrupt(func() {
saveCheckpoint := func(name string, state *syncState) {
if state == nil || state.processor == nil {
return
}
offsetTsNs := state.processor.processedTsWatermark.Load()
if offsetTsNs == 0 {
return
}
if err := setOffset(state.grpcDialOption, state.targetFiler, getSignaturePrefixByPath(state.sourcePath), state.sourceFilerSignature, offsetTsNs); err != nil {
glog.Errorf("failed to save checkpoint for %s on shutdown: %v", name, err)
} else {
glog.V(0).Infof("saved checkpoint for %s on shutdown: %v", name, time.Unix(0, offsetTsNs))
}
}
saveCheckpoint("A->B", syncStateA2B.Load())
saveCheckpoint("B->A", syncStateB2A.Load())
})
go func() {
// a->b
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOptionB, filerB, aFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath))
if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
os.Exit(2)
}
for {
syncOptions.clientEpoch.Add(1)
err := doSubscribeFilerMetaChanges(
syncOptions.clientId,
syncOptions.clientEpoch.Load(),
grpcDialOptionA,
filerA,
*syncOptions.aPath,
util.StringSplit(*syncOptions.aExcludePaths, ","),
*syncOptions.aProxyByFiler,
grpcDialOptionB,
filerB,
*syncOptions.bPath,
*syncOptions.bReplication,
*syncOptions.bCollection,
*syncOptions.bTtlSec,
*syncOptions.bProxyByFiler,
*syncOptions.bDiskType,
*syncOptions.bDebug,
*syncOptions.concurrency,
*syncOptions.chunkConcurrency,
*syncOptions.bDoDeleteFiles,
aFilerSignature,
bFilerSignature,
&syncStateA2B,
httpClientA,
httpClientB)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
}
}
}()
if !*syncOptions.isActivePassive {
// b->a
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOptionA, filerA, bFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath))
if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
os.Exit(2)
}
go func() {
for {
syncOptions.clientEpoch.Add(1)
err := doSubscribeFilerMetaChanges(
syncOptions.clientId,
syncOptions.clientEpoch.Load(),
grpcDialOptionB,
filerB,
*syncOptions.bPath,
util.StringSplit(*syncOptions.bExcludePaths, ","),
*syncOptions.bProxyByFiler,
grpcDialOptionA,
filerA,
*syncOptions.aPath,
*syncOptions.aReplication,
*syncOptions.aCollection,
*syncOptions.aTtlSec,
*syncOptions.aProxyByFiler,
*syncOptions.aDiskType,
*syncOptions.aDebug,
*syncOptions.concurrency,
*syncOptions.chunkConcurrency,
*syncOptions.aDoDeleteFiles,
bFilerSignature,
aFilerSignature,
&syncStateB2A,
httpClientB,
httpClientA)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
}
}
}()
}
select {}
}
// initOffsetFromTsMs Initialize offset
func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64, signaturePrefix string) error {
if fromTsMs <= 0 {
return nil
}
// convert to nanosecond
fromTsNs := fromTsMs * 1000_000
// If not successful, exit the program.
setOffsetErr := setOffset(grpcDialOption, targetFiler, signaturePrefix, sourceFilerSignature, fromTsNs)
if setOffsetErr != nil {
return setOffsetErr
}
glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
return nil
}
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetGrpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, chunkConcurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState],
sourceHttpClient *util_http_client.HTTPClient, sinkHttpClient *util_http_client.HTTPClient) error {
// if first time, start from now
// if has previously synced, resume from that point of time
sourceFilerOffsetTsNs, err := getOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
if err != nil {
return err
}
glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
// create filer sink
filerSource := &source.FilerSource{}
filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
filerSource.SetGrpcDialOption(sourceGrpcDialOption)
if sourceHttpClient != nil {
filerSource.SetHttpClient(sourceHttpClient)
}
filerSink := &filersink.FilerSink{}
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, targetGrpcDialOption, sinkWriteChunkByFiler)
filerSink.SetChunkConcurrency(chunkConcurrency)
if sinkHttpClient != nil {
filerSink.SetUploader(operation.NewUploaderWithHttpClient(sinkHttpClient))
}
filerSink.SetSourceFiler(filerSource)
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, nil, nil, filerSink, doDeleteFiles, debug)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
for _, sig := range message.Signatures {
if sig == targetFilerSignature && targetFilerSignature != 0 {
fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
return nil
}
}
return persistEventFn(resp)
}
if concurrency < 0 || concurrency > 1024 {
glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit)
concurrency = DefaultConcurrencyLimit
}
processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
// update sync state for graceful shutdown checkpoint saving
if statePtr != nil {
statePtr.Store(&syncState{
processor: processor,
grpcDialOption: targetGrpcDialOption,
targetFiler: targetFiler,
sourcePath: sourcePath,
sourceFilerSignature: sourceFilerSignature,
})
}
var lastLogTsNs = time.Now().UnixNano()
var lastProgressedTsNs int64
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
processor.AddSyncJob(resp)
return nil
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
offsetTsNs := processor.processedTsWatermark.Load()
if offsetTsNs == 0 {
return nil
}
// use processor.processedTsWatermark instead of the lastTsNs from the most recent job
now := time.Now().UnixNano()
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now
if offsetTsNs == lastProgressedTsNs {
for _, t := range filerSink.ActiveTransfers() {
if t.LastErr != "" {
glog.V(0).Infof(" %s %s: %d bytes received, %s, last error: %s",
t.ChunkFileId, t.Path, t.BytesReceived, t.Status, t.LastErr)
} else {
glog.V(0).Infof(" %s %s: %d bytes received, %s",
t.ChunkFileId, t.Path, t.BytesReceived, t.Status)
}
}
}
lastProgressedTsNs = offsetTsNs
// collect synchronous offset
statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs))
return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
})
prefix := sourcePath
if !strings.HasSuffix(prefix, "/") {
prefix = prefix + "/"
}
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: clientName,
ClientId: clientId,
ClientEpoch: clientEpoch,
SelfSignature: targetFilerSignature,
PathPrefix: prefix,
AdditionalPathPrefixes: nil,
DirectoriesToWatch: nil,
StartTsNs: sourceFilerOffsetTsNs,
StopTsNs: 0,
EventErrorType: pb.RetryForeverOnError,
// While the source has only read activity it emits no metadata events, so
// the watermark above never advances and sync_offset would look stuck.
// The idle heartbeat moves the gauge to the source's current time once we
// are caught up, so now-sync_offset reflects real lag and stays alertable.
OnIdleHeartbeat: func(tsNs int64) {
statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(tsNs))
},
}
return pb.FollowMetadata(sourceFiler, sourceGrpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
// When each business is distinguished according to path, and offsets need to be maintained separately.
func getSignaturePrefixByPath(path string) string {
// compatible historical version
if path == "/" {
return SyncKeyPrefix
} else {
return SyncKeyPrefix + path
}
}
func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
if err != nil {
return err
}
if len(resp.Error) != 0 {
return errors.New(resp.Error)
}
if len(resp.Value) < 8 {
return nil
}
lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
return nil
})
return
}
func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
valueBuf := make([]byte, 8)
util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
Key: syncKey,
Value: valueBuf,
})
if err != nil {
return err
}
if len(resp.Error) != 0 {
return errors.New(resp.Error)
}
return nil
})
}
func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, excludeFileNames []*wildcard.WildcardMatcher, excludePathPatterns []*wildcard.WildcardMatcher, dataSink sink.ReplicationSink, doDeleteFiles bool, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
// process function
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
// Derive the target (new-side) directory once. MetadataEventTargetDirectory
// returns NewParentPath when set, falling back to resp.Directory for
// delete events or legacy events with an empty NewParentPath.
targetDir := filer_pb.MetadataEventTargetDirectory(resp)
var sourceOldKey, sourceNewKey util.FullPath
if message.OldEntry != nil {
sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
}
if message.NewEntry != nil {
sourceNewKey = util.FullPath(targetDir).Child(message.NewEntry.Name)
}
if debug {
glog.V(0).Infof("received %v", resp)
}
if isMultipartUploadDir(resp.Directory + "/") {
return nil
}
// For rename events the key/directory is the old (source) path.
// Check both old and new directories so cross-boundary renames
// are not silently dropped. The downstream old/new key handling
// (lines below) already converts these to create or delete.
oldDirExcluded := matchesExcludePath(resp.Directory, excludePaths)
newDirExcluded := matchesExcludePath(targetDir, excludePaths)
oldDirInScope := util.IsEqualOrUnder(resp.Directory, sourcePath) && !oldDirExcluded
newDirInScope := message.NewEntry != nil &&
util.IsEqualOrUnder(targetDir, sourcePath) &&
!newDirExcluded
if !oldDirInScope && !newDirInScope {
return nil
}
// Compute per-side exclusion so that rename events crossing an
// exclude boundary are handled as delete + create rather than
// being entirely skipped.
oldExcluded := oldDirExcluded || isEntryExcluded(resp.Directory, message.OldEntry, reExcludeFileName, excludeFileNames, excludePathPatterns)
newExcluded := newDirExcluded || isEntryExcluded(targetDir, message.NewEntry, reExcludeFileName, excludeFileNames, excludePathPatterns)
if oldExcluded && newExcluded {
return nil
}
if oldExcluded {
// Old side is excluded — treat as pure create of new entry.
message.OldEntry = nil
}
if newExcluded {
// New side is excluded — treat as pure delete of old entry.
message.NewEntry = nil
sourceNewKey = ""
}
if dataSink.IsIncremental() {
doDeleteFiles = false
}
// handle deletions
if filer_pb.IsDelete(resp) {
if !doDeleteFiles {
return nil
}
if !util.IsEqualOrUnder(string(sourceOldKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
// handle new entries
if filer_pb.IsCreate(resp) {
if !util.IsEqualOrUnder(string(sourceNewKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
return fmt.Errorf("create entry1 : %w", err)
} else {
return nil
}
}
// this is something special?
if filer_pb.IsEmpty(resp) {
return nil
}
// handle updates
if util.IsEqualOrUnder(string(sourceOldKey), sourcePath) {
// old key is in the watched directory
if util.IsEqualOrUnder(string(sourceNewKey), sourcePath) {
// new key is also in the watched directory
if filer_pb.IsRename(resp) {
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
// With deletes enabled a rename relocates the entry. Guard the
// watched root itself, whose old key would resolve to the target
// root and recursively delete the whole sink tree.
if doDeleteFiles && string(sourceOldKey) != sourcePath {
oldKey := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
if mover, ok := dataSink.(sink.EntryMover); ok {
// native atomic move: no re-copy, no descendant gap, no chunk leak.
return mover.MoveEntry(oldKey, newKey, message.NewEntry, message.Signatures)
}
// no native move: create the new entry first, then delete the
// old, so a crash between the two leaves the entry visible.
if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
return fmt.Errorf("create entry2 : %w", err)
}
if err := dataSink.DeleteEntry(oldKey, message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
return fmt.Errorf("delete old entry %v: %w", oldKey, err)
}
return nil
}
// deletes disabled (backup/incremental) or the watched root moved:
// create the new entry and keep the old.
if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
return fmt.Errorf("create entry2 : %w", err)
}
return nil
}
// in-place update (same path): mutate via UpdateEntry; never
// delete-then-recreate the same key.
if doDeleteFiles {
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
var sinkNewParentPath string
if strings.HasSuffix(sourcePath, "/") {
sinkNewParentPath = util.Join(targetPath, targetDir[len(sourcePath)-1:])
} else {
sinkNewParentPath = util.Join(targetPath, targetDir[len(sourcePath):])
}
foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, sinkNewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
return err
}
// old entry missing on the destination; fall through to create it
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
return fmt.Errorf("delete old entry %v: %w", oldKey, err)
}
}
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
return fmt.Errorf("create entry2 : %w", err)
}
return nil
} else {
// new key is outside the watched directory
if doDeleteFiles {
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
}
} else {
// old key is outside the watched directory
if util.IsEqualOrUnder(string(sourceNewKey), sourcePath) {
// new key is in the watched directory
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
return fmt.Errorf("create entry3 : %w", err)
} else {
return nil
}
} else {
// new key is also outside the watched directory
// skip
}
}
return nil
}
return processEventFn
}
func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
var mTime int64
if message.NewEntry != nil && message.NewEntry.Attributes != nil {
mTime = message.NewEntry.Attributes.Mtime
} else if message.OldEntry != nil && message.OldEntry.Attributes != nil {
mTime = message.OldEntry.Attributes.Mtime
}
return destKey(dataSink, targetPath, sourcePath, sourceKey, mTime)
}
// destKey derives the sink-side key for a source entry. Shared between the
// event-log path (buildKey) and the initialSnapshot walk (both paths need the
// same target layout so a walk-seeded file and an event-replayed file resolve
// to the same destination key). Normalizing to a trailing-slash base avoids
// indexing past the end of sourceKey when callers differ on trailing-slash
// conventions or when sourceKey equals sourcePath exactly.
func destKey(dataSink sink.ReplicationSink, targetPath, sourcePath string, sourceKey util.FullPath, mTime int64) string {
base := strings.TrimSuffix(sourcePath, "/") + "/"
sk := string(sourceKey)
var relative string
switch {
case strings.HasPrefix(sk, base):
relative = sk[len(base):]
case sk == strings.TrimSuffix(sourcePath, "/"):
relative = ""
default:
relative = strings.TrimPrefix(sk, "/")
}
if !dataSink.IsIncremental() {
return util.Join(targetPath, relative)
}
dateKey := time.Unix(mTime, 0).Format("2006-01-02")
return util.Join(targetPath, dateKey, relative)
}
// isEntryExcluded checks whether a single side (old or new) of an event is excluded
// by the deprecated filename regexp, the wildcard file-name matchers, or the
// wildcard path-pattern matchers.
func isEntryExcluded(dir string, entry *filer_pb.Entry, reExcludeFileName *regexp.Regexp, excludeFileNames []*wildcard.WildcardMatcher, excludePathPatterns []*wildcard.WildcardMatcher) bool {
if entry == nil {
return false
}
// deprecated regexp-based filename exclusion
if reExcludeFileName != nil && reExcludeFileName.MatchString(entry.Name) {
return true
}
// wildcard-based filename exclusion
if len(excludeFileNames) > 0 && matchesAnyWildcard(excludeFileNames, entry.Name) {
return true
}
// wildcard-based path-pattern exclusion: match against each directory
// component and the entry name itself
if len(excludePathPatterns) > 0 {
if pathContainsWildcardMatch(dir, excludePathPatterns) {
return true
}
if matchesAnyWildcard(excludePathPatterns, entry.Name) {
return true
}
}
return false
}
func matchesExcludePath(dir string, excludePaths []string) bool {
for _, excludePath := range excludePaths {
if util.IsEqualOrUnder(dir, excludePath) {
return true
}
}
return false
}
// compileExcludePattern compiles a regexp pattern string, returning nil if empty.
func compileExcludePattern(pattern string, label string) (*regexp.Regexp, error) {
if pattern == "" {
return nil, nil
}
re, err := regexp.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("error compile regexp %v for %s: %+v", pattern, label, err)
}
return re, nil
}
// matchesAnyWildcard returns true if any matcher matches the value.
// Returns false when matchers is empty (unlike wildcard.MatchesAnyWildcard
// which returns true for empty matchers).
func matchesAnyWildcard(matchers []*wildcard.WildcardMatcher, value string) bool {
for _, m := range matchers {
if m != nil && m.Match(value) {
return true
}
}
return false
}
// pathContainsWildcardMatch checks if any component of the given path matches
// any of the wildcard matchers, without allocating a slice.
func pathContainsWildcardMatch(path string, matchers []*wildcard.WildcardMatcher) bool {
for path != "" {
i := strings.IndexByte(path, '/')
var component string
if i < 0 {
component = path
path = ""
} else {
component = path[:i]
path = path[i+1:]
}
if component != "" && matchesAnyWildcard(matchers, component) {
return true
}
}
return false
}