diff --git a/test/s3/lifecycle/s3_lifecycle_versioning_test.go b/test/s3/lifecycle/s3_lifecycle_versioning_test.go index 39ea89e01..c21a37e04 100644 --- a/test/s3/lifecycle/s3_lifecycle_versioning_test.go +++ b/test/s3/lifecycle/s3_lifecycle_versioning_test.go @@ -86,8 +86,12 @@ func backdateVersionedMtime(t *testing.T, fc filer_pb.SeaweedFilerClient, bucket func runLifecycleShard(t *testing.T) string { t.Helper() + // -refresh 1s drives an inter-pass loop inside the 10s runtime so + // rules that need two passes to settle (e.g. + // TestLifecycleExpiredDeleteMarkerCleanup: pass 1 removes v1, pass 2 + // removes the now-orphaned marker) get the second pass. out := runShellCommand(t, fmt.Sprintf( - "s3.lifecycle.run-shard -shards 0-15 -s3 %s -events 0 -dispatch 200ms -checkpoint 5s -runtime 10s", + "s3.lifecycle.run-shard -shards 0-15 -s3 %s -events 0 -refresh 1s -runtime 10s", envOr("S3_GRPC_ENDPOINT", defaultS3GrpcEndpoint), )) require.NotContains(t, out, "FATAL", "shell output:\n%s", out) diff --git a/weed/s3api/s3lifecycle/dailyrun/filer_list_func.go b/weed/s3api/s3lifecycle/dailyrun/filer_list_func.go index 39c5192c2..aa251f809 100644 --- a/weed/s3api/s3lifecycle/dailyrun/filer_list_func.go +++ b/weed/s3api/s3lifecycle/dailyrun/filer_list_func.go @@ -104,6 +104,7 @@ func walkBucketTree(ctx context.Context, client filer_pb.SeaweedFilerClient, dir ModTime: time.Unix(e.Attributes.Mtime, int64(e.Attributes.MtimeNs)), Size: int64(e.Attributes.FileSize), IsLatest: true, // Non-versioned default. + Tags: extractTags(e.Extended), } return cb(entry) }) @@ -183,7 +184,7 @@ func expandVersionsDir(ctx context.Context, client filer_pb.SeaweedFilerClient, // Resolve latest: // 1. Pointer names a real id -> that wins. // 2. Pointer absent (or stale: set but no sibling carries it) - // + items[0] is an EXPLICIT null -> null is latest. + // + any EXPLICIT null bare exists -> null is latest. // 3. Otherwise -> newest sibling (latestPos = 0 by default). // // A stale pointer falls through to the no-pointer fallback rather @@ -203,8 +204,13 @@ func expandVersionsDir(ctx context.Context, client filer_pb.SeaweedFilerClient, } } } - if !pointerResolved && len(items) > 0 && items[0].versionID == "null" && items[0].isExplicitNull { - latestPos = 0 + if !pointerResolved { + for i, it := range items { + if it.versionID == "null" && it.isExplicitNull { + latestPos = i + break + } + } } if start != "" && logical <= start { @@ -228,6 +234,7 @@ func expandVersionsDir(ctx context.Context, client filer_pb.SeaweedFilerClient, IsDeleteMarker: string(it.entry.Extended[s3_constants.ExtDeleteMarkerKey]) == "true", NumVersions: len(items), SuccessorModTime: successor, + Tags: extractTags(it.entry.Extended), } if !isLatest { rank := i @@ -318,3 +325,21 @@ func isMPUInitDir(key string, entry *filer_pb.Entry) bool { v, ok := entry.Extended[s3_constants.ExtMultipartObjectKey] return ok && len(v) > 0 } + +func extractTags(ext map[string][]byte) map[string]string { + if len(ext) == 0 { + return nil + } + prefix := s3_constants.AmzObjectTagging + "-" + var tags map[string]string + for k, v := range ext { + if !strings.HasPrefix(k, prefix) { + continue + } + if tags == nil { + tags = make(map[string]string) + } + tags[k[len(prefix):]] = string(v) + } + return tags +} diff --git a/weed/s3api/s3lifecycle/dailyrun/filer_list_func_test.go b/weed/s3api/s3lifecycle/dailyrun/filer_list_func_test.go index 47ec0514a..7796a5da6 100644 --- a/weed/s3api/s3lifecycle/dailyrun/filer_list_func_test.go +++ b/weed/s3api/s3lifecycle/dailyrun/filer_list_func_test.go @@ -126,6 +126,25 @@ func TestFilerListFunc_EmitsFlatFiles(t *testing.T) { assert.Equal(t, []string{"a.txt", "b.txt"}, got) } +func TestFilerListFunc_PropagatesTagsOnFlatFiles(t *testing.T) { + mtime := time.Now().Add(-7 * 24 * time.Hour) + tagged := fileWithExt("tagged.txt", mtime, 10, map[string][]byte{ + s3_constants.AmzObjectTagging + "-env": []byte("temp"), + s3_constants.AmzObjectTagging + "-tier": []byte("cold"), + }) + client := &fakeFiler{tree: map[string][]*filer_pb.Entry{ + "/buckets/bkt": {tagged}, + }} + listFn := FilerListFunc(client, "/buckets") + var got *bootstrap.Entry + require.NoError(t, listFn(context.Background(), "bkt", "", func(e *bootstrap.Entry) error { + got = e + return nil + })) + require.NotNil(t, got) + assert.Equal(t, map[string]string{"env": "temp", "tier": "cold"}, got.Tags) +} + func TestFilerListFunc_RecursesIntoSubdirs(t *testing.T) { mtime := time.Now() client := &fakeFiler{tree: map[string][]*filer_pb.Entry{ @@ -360,6 +379,40 @@ func TestFilerListFunc_VersionedExpansionExplicitNullIsLatestWhenPointerMissing( assert.Equal(t, 2, count) } +func TestFilerListFunc_VersionedExpansionExplicitNullBeatsNewerSiblingWhenPointerMissing(t *testing.T) { + // Suspended-versioning current-ness is carried by the explicit bare + // null, not by mtime ordering. Backdating the null to make it due + // must not demote it behind a newer noncurrent sibling when the + // .versions pointer is absent. + tNull := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) // older + tV1 := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) // newer + bareNull := fileWithExt("foo", tNull, 1, map[string][]byte{s3_constants.ExtVersionIdKey: []byte("null")}) + client := &fakeFiler{tree: map[string][]*filer_pb.Entry{ + "/buckets/bkt": { + bareNull, + versionsDir("foo"+s3_constants.VersionsFolder, ""), + }, + "/buckets/bkt/foo" + s3_constants.VersionsFolder: { + fileWithExt("v1", tV1, 1, map[string][]byte{s3_constants.ExtVersionIdKey: []byte("v1")}), + }, + }} + listFn := FilerListFunc(client, "/buckets") + var got []*bootstrap.Entry + require.NoError(t, listFn(context.Background(), "bkt", "", func(e *bootstrap.Entry) error { + got = append(got, e) + return nil + })) + require.Len(t, got, 2) + byID := map[string]*bootstrap.Entry{} + for _, e := range got { + byID[e.VersionID] = e + } + require.NotNil(t, byID["null"]) + require.NotNil(t, byID["v1"]) + assert.True(t, byID["null"].IsLatest, "explicit null must stay latest even when older than noncurrent siblings") + assert.False(t, byID["v1"].IsLatest) +} + func TestFilerListFunc_VersionsDirWithoutMarkersRecursesAsRegular(t *testing.T) { // A `.versions`-named folder whose children have no // ExtVersionIdKey is a coincidence (user folder). Recurse into diff --git a/weed/s3api/s3lifecycle/dailyrun/run.go b/weed/s3api/s3lifecycle/dailyrun/run.go index 2ac02ecb6..82fed0624 100644 --- a/weed/s3api/s3lifecycle/dailyrun/run.go +++ b/weed/s3api/s3lifecycle/dailyrun/run.go @@ -204,6 +204,20 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim promoted := engine.PromotedHash(snap, retentionWindow) if rsh == [32]byte{} { + // No replay-eligible rules. Walker-only rules + // (ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent) + // or scan_only-promoted rules might still need a walk; run + // the steady-state walker before persisting the empty + // cursor and returning. Without this, a bucket whose only + // rule is walker-bound would never have it dispatched — + // the bug TestLifecycleExpirationDateInThePast caught. + if cfg.Walker != nil { + if _, walkView := snap.RulesForShard(shardID, retentionWindow); walkView != nil && len(walkView.AllActions()) > 0 { + if werr := cfg.Walker(ctx, walkView, shardID); werr != nil { + return fmt.Errorf("shard=%d: steady walk (empty replay): %w", shardID, werr) + } + } + } return cfg.Persister.Save(ctx, shardID, Cursor{ TsNs: 0, RuleSetHash: rsh, @@ -211,22 +225,36 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim }) } - // Recovery: rule-content edit (RuleSetHash mismatch) or partition - // flip (PromotedHash mismatch). Walk the rewritten rule set so - // already-due objects fire before the cursor rewinds; then rewind - // and let the sliding meta-log replay catch up steady state. - if found && (persisted.RuleSetHash != rsh || persisted.PromotedHash != promoted) { + // Recovery / cold-start walker: + // - found && hashes mismatch: rule edit or partition flip — walk + // the rewritten rule set so already-due objects fire before the + // cursor rewinds, then rewind for meta-log replay. + // - !found: first run for this shard. Pre-existing objects PUT + // before the rule was added live OUTSIDE the meta-log scan + // window (TsNs > runNow - maxTTL) and would never replay; the + // walker has to discover them. The streaming worker did this + // via BucketBootstrapper; daily-replay needs the same. + mustWalkRecovery := found && (persisted.RuleSetHash != rsh || persisted.PromotedHash != promoted) + mustWalkColdStart := !found + if mustWalkRecovery || mustWalkColdStart { if cfg.Walker != nil { if werr := cfg.Walker(ctx, engine.RecoveryView(snap), shardID); werr != nil { return fmt.Errorf("shard=%d: recovery walk: %w", shardID, werr) } } - next := Cursor{ - TsNs: runNow.Add(-maxTTL).UnixNano(), - RuleSetHash: rsh, - PromotedHash: promoted, + if mustWalkRecovery { + // Rule changed: rewind cursor so the sliding replay re-scans + // the new max-TTL window and the persisted hashes match the + // new rule set. + next := Cursor{ + TsNs: runNow.Add(-maxTTL).UnixNano(), + RuleSetHash: rsh, + PromotedHash: promoted, + } + return cfg.Persister.Save(ctx, shardID, next) } - return cfg.Persister.Save(ctx, shardID, next) + // Cold start: keep TsNs=0 so the drain below floors to + // runNow - maxTTL and the cursor is saved fresh after the run. } // Steady-state walker for walker-bound and scan_only-promoted rules. @@ -244,20 +272,39 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim } // Cold start: scan from now-maxTTL so already-due objects within - // meta-log retention still expire. + // meta-log retention still expire. In steady state honor the + // cursor as-is: the drain freezes the cursor at the last pre-skip + // event so pending matches with DueTime == TsNs+maxTTL stay in + // scope across passes. Bumping forward to runNow-maxTTL would + // orphan exactly those events (the test_lifecyclev2_expiration + // regression: cursor saved at the no-match event right before + // the not-yet-due expire3 PUT, then floor at runNow=PUT+maxTTL + // equals PUT — bumping past the expire3 event itself). startTsNs := persisted.TsNs - floor := runNow.Add(-maxTTL).UnixNano() - if startTsNs < floor { - startTsNs = floor + if !found { + startTsNs = runNow.Add(-maxTTL).UnixNano() } lastOK, _, drainErr := drainShardEvents(ctx, cfg, runNow, shardID, snap, startTsNs) + // Cursor save uses a fresh ctx because the steady-state drain exits + // via passCtx cancellation (the only signal the filer subscription + // gets when no new events arrive). Saving with the canceled passCtx + // would silently drop the cursor and the next pass would re-replay + // from the same floor — defeating advancement entirely. + saveCtx, saveCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer saveCancel() if drainErr != nil { - _ = cfg.Persister.Save(ctx, shardID, Cursor{TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted}) + _ = cfg.Persister.Save(saveCtx, shardID, Cursor{TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted}) + // passCtx timeout is the expected end-of-pass for an idle + // subscription; not a real error. Other drain errors still + // propagate. + if errors.Is(drainErr, context.DeadlineExceeded) || errors.Is(drainErr, context.Canceled) { + return nil + } return fmt.Errorf("shard=%d: drain: %w", shardID, drainErr) } - return cfg.Persister.Save(ctx, shardID, Cursor{ + return cfg.Persister.Save(saveCtx, shardID, Cursor{ TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted, diff --git a/weed/s3api/s3lifecycle/dailyrun/walker_dispatcher.go b/weed/s3api/s3lifecycle/dailyrun/walker_dispatcher.go index 65fdd03ce..d54e7bd81 100644 --- a/weed/s3api/s3lifecycle/dailyrun/walker_dispatcher.go +++ b/weed/s3api/s3lifecycle/dailyrun/walker_dispatcher.go @@ -35,16 +35,18 @@ func (d *WalkerDispatcher) Delete(ctx context.Context, action *engine.CompiledAc } objectPath := entry.Path if entry.IsMPUInit { - // Rule-prefix matching used DestKey; the server takes the - // canonical object path for the LifecycleDelete RPC, which - // is also DestKey. The walker hits the .uploads/ - // directory itself only when ActionKind=ABORT_MPU, and the - // server resolves the upload from (bucket, object_path) + - // the init record's metadata. + // Rule-prefix matching uses DestKey (the user's intended + // object key); dispatch uses entry.Path (.uploads/), + // which is what the server's ABORT_MPU handler expects in + // req.ObjectPath — it strips the .uploads/ prefix to get + // the upload id and reads the init record from that + // directory. DestKey is the dispatch ANTI-pattern here: it + // looks like a regular object path, the server's check for + // the .uploads/ prefix fails, and the dispatch comes back + // as BLOCKED FATAL_EVENT_ERROR. if entry.DestKey == "" { return fmt.Errorf("walker dispatch: MPU init entry with empty DestKey: %s", entry.Path) } - objectPath = entry.DestKey } rh := action.Key.RuleHash req := &s3_lifecycle_pb.LifecycleDeleteRequest{ diff --git a/weed/s3api/s3lifecycle/dailyrun/walker_dispatcher_test.go b/weed/s3api/s3lifecycle/dailyrun/walker_dispatcher_test.go index a826ee51b..68c067cd6 100644 --- a/weed/s3api/s3lifecycle/dailyrun/walker_dispatcher_test.go +++ b/weed/s3api/s3lifecycle/dailyrun/walker_dispatcher_test.go @@ -72,9 +72,11 @@ func TestWalkerDispatcher_VersionedPassesVersionID(t *testing.T) { assert.Equal(t, "v-abc", c.lastReq.VersionId) } -func TestWalkerDispatcher_MPUInitUsesDestKey(t *testing.T) { - // Rule-prefix matching used DestKey; the RPC ObjectPath must match - // so the server resolves the upload from the user's intended key. +func TestWalkerDispatcher_MPUInitUsesUploadsPath(t *testing.T) { + // Rule-prefix matching uses DestKey to decide IF this MPU init + // matches; dispatch uses Path (.uploads/) because the server's + // ABORT_MPU handler strips the .uploads/ prefix to get the upload + // id. Sending DestKey here would BLOCK with FATAL_EVENT_ERROR. c := &walkerStubClient{outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_DONE} d := &WalkerDispatcher{Client: c} a := sampleAction(t, s3lifecycle.ActionKindAbortMPU) @@ -84,7 +86,7 @@ func TestWalkerDispatcher_MPUInitUsesDestKey(t *testing.T) { IsMPUInit: true, }) require.NoError(t, err) - assert.Equal(t, "user/path/object", c.lastReq.ObjectPath) + assert.Equal(t, ".uploads/abc123", c.lastReq.ObjectPath) } func TestWalkerDispatcher_MPUInitEmptyDestKeyErrors(t *testing.T) { diff --git a/weed/shell/command_s3_lifecycle_run_shard.go b/weed/shell/command_s3_lifecycle_run_shard.go index 51c383a18..e53b56c53 100644 --- a/weed/shell/command_s3_lifecycle_run_shard.go +++ b/weed/shell/command_s3_lifecycle_run_shard.go @@ -122,9 +122,24 @@ func (c *commandS3LifecycleRunShard) Do(args []string, env *CommandEnv, writer i fmt.Fprintf(writer, "running shards %s (event budget=%d, runtime=%s, refresh=%s)…\n", formatShardLabel(shards), *eventBudget, *runtime, *cadence) + announcedLoad := false runPass := func() error { + // When -refresh is set we MUST cap each pass; otherwise + // drainShardEvents blocks until the outer runtime ctx + // expires and the loop's ticker never fires. With cadence=0 + // (one-shot) the pass uses the full ctx and returns when + // the runtime cap hits. + passCtx := ctx + if *cadence > 0 { + var passCancel context.CancelFunc + // Pass budget = cadence + grace. Grace gives the + // drain time to actually process events that arrived + // during the cadence window. + passCtx, passCancel = context.WithTimeout(ctx, *cadence+5*time.Second) + defer passCancel() + } eng := engine.New() - inputs, parseErrors, err := scheduler.LoadCompileInputs(ctx, filerClient, bucketsPath) + inputs, parseErrors, err := scheduler.LoadCompileInputs(passCtx, filerClient, bucketsPath) if err != nil { return fmt.Errorf("load lifecycle configs: %w", err) } @@ -140,6 +155,10 @@ func (c *commandS3LifecycleRunShard) Do(args []string, env *CommandEnv, writer i if len(inputs) == 0 { return nil } + if !announcedLoad { + fmt.Fprintf(writer, "loaded lifecycle for %d bucket(s)\n", len(inputs)) + announcedLoad = true + } buckets := make([]string, 0, len(inputs)) for _, in := range inputs { if in.Bucket != "" { @@ -149,7 +168,7 @@ func (c *commandS3LifecycleRunShard) Do(args []string, env *CommandEnv, writer i walker := dailyrun.WalkerFunc(func(walkCtx context.Context, view *engine.Snapshot, shardID int) error { return dailyrun.WalkBuckets(walkCtx, view, shardID, buckets, listFn, walkerDispatch) }) - return dailyrun.Run(ctx, dailyrun.Config{ + return dailyrun.Run(passCtx, dailyrun.Config{ Shards: shards, BucketsPath: bucketsPath, Engine: eng, @@ -157,6 +176,11 @@ func (c *commandS3LifecycleRunShard) Do(args []string, env *CommandEnv, writer i Client: client, Persister: &dailyrun.FilerCursorPersister{Store: dispatcher.NewFilerStoreClient(filerClient)}, Lister: dispatcher.NewFilerSiblingLister(filerClient, bucketsPath), + // The shell command is used for bounded one-shot sweeps in + // integration tests and CI. Fan out across the selected shards + // so recovery walks do not serialize 16 shard scans into a 10s + // timeout budget. + Workers: len(shards), Walker: walker, EventBudget: *eventBudget, ClientName: fmt.Sprintf("shell-lifecycle-%s", formatShardLabel(shards)),