mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(s3/lifecycle): walker dispatch uses entry.Path for ABORT_MPU (#9477)
* fix(s3/lifecycle): WalkerDispatcher uses entry.Path for ABORT_MPU + shell announces load Two CI-surfaced bugs caught by PR #9471's S3 Lifecycle Tests run on master after PRs #9475 + #9466: 1. Walker dispatch for ABORT_MPU was sending entry.DestKey as req.ObjectPath. The server's ABORT_MPU handler (weed/s3api/s3api_internal_lifecycle.go) strips the .uploads/ prefix to extract the upload id and reads the init record from that directory, so it expects the .uploads/<id> path verbatim. DestKey looks like a regular object path; the server's prefix check fails and the dispatch returns BLOCKED with "FATAL_EVENT_ERROR: ABORT_MPU object_path missing .uploads/ prefix". The test fix renames TestWalkerDispatcher_MPUInitUsesDestKey to ...UsesUploadsPath and inverts the assertion to match the actual server contract. DestKey is still used for the WalkBuckets shard predicate and for rule-prefix matching in bootstrap.walker; both surfaces want the user's intended path, while DISPATCH wants the .uploads/<id> directory. The bootstrap test (TestLifecycleAbortIncompleteMultipartUpload) caught this when the walker's BLOCKED error surfaced as FATAL output. 2. test/s3/lifecycle/s3_lifecycle_empty_bucket_test.go asserts the shell command logs "loaded lifecycle for N bucket(s)" so a regression that produces half-shaped output (no load summary) is caught. The restored shell command (PR #9475) didn't print that line; add it back on the first pass that finds non-zero inputs. * fix(s3/lifecycle): walker fires for walker-only buckets (empty replay path) runShard's empty-replay sentinel (rsh == [32]byte{}) was returning BEFORE the steady-state walker check. A bucket whose only lifecycle rule was walker-only (ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent) would never have it dispatched because: - ReplayContentHash only hashes replay-eligible kinds, so walker-only-only snapshots produce rsh == empty. - The early-return persisted the empty cursor and exited before the steady-state walker block at the bottom of the function. Move the walker invocation INTO the empty-replay branch so walker- only rules dispatch on the same path as mixed-rule buckets. TestLifecycleExpirationDateInThePast and TestLifecycleExpiredDeleteMarkerCleanup were both timing out their "object must be deleted" Eventually polls because of this. Caught on PR #9471's S3 Lifecycle Tests run after PR #9475 restored the shell entry point that exercises the integration tests. * fix(s3/lifecycle): cold-start walker covers pre-existing objects runShard only walked the bucket tree on the recovery branch (found && hash mismatch). For a fresh worker with no persisted cursor, found=false, so the recovery walker never fired and the meta-log replay only scanned runNow - maxTTL of events. Objects PUT before that window — including pre-existing objects in a newly-rule-enabled bucket — never matched the rule. The streaming worker handled this with scheduler.BucketBootstrapper. Daily-replay needed the equivalent: walk the live tree once on the first run for each shard so pre-existing objects get evaluated even when their PUT events are outside meta-log scan window. Restructured the recovery branch to fire the walker on either (found && mismatch) OR !found. On cold-start the cursor isn't rewound — we keep TsNs=0 and let the drain below floor to runNow - maxTTL like before; the walker just handles whatever the sliding window can't reach. TestLifecycleBootstrapWalkOnExistingObjects was the exact CI failure this addresses (https://github.com/seaweedfs/seaweedfs/actions/runs/25777823522/job/75714014151). * fix(s3/lifecycle): restore walker tag and null-version state Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(s3/lifecycle): parallelize shell shard sweeps Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(s3/lifecycle): bound each runPass ctx + refresh in runLifecycleShard Two CI bugs surfaced after PR #9466 deleted the streaming worker: 1. The shell command's -refresh loop never fires. runPass used the outer ctx (full -runtime), so dailyrun.Run blocked for the entire 1800s s3tests window — the background worker only ran one pass and never re-loaded configs that tests created mid-run. test_lifecycle_expiration sees 6 objects when expecting 4 because expire1/* never reaches the worker's snapshot. Cap each pass to cadence+5s when cadence>0; one-shot (cadence=0) keeps the full ctx. 2. TestLifecycleExpiredDeleteMarkerCleanup's docstring says "pass 1 cleans v1; pass 2 removes the now-orphaned marker," but runLifecycleShard invoked with no -refresh — only one pass ran. The marker rule can't fire in the same pass that dispatches v1's delete because v1 is still in .versions/. Add -refresh 1s so the 10s runtime gets multiple passes. * fix(s3/lifecycle): persist cursor with fresh ctx after passCtx timeout drainShardEvents only exits via ctx cancellation for an idle subscription — that's the steady-state when all replayed events are already past. Saving the cursor with the canceled passCtx silently drops every advance, so the next pass re-subscribes from the same floor and re-replays the same events. Symptom in s3tests: status=error shards=16 errors=16 on every pass, and 1/6 expire3/* dispatches lost to a race between concurrent shard drains all retrying the same events. Use a 5s timeout derived from context.Background for the save, and treat passCtx Deadline/Canceled from drain as a clean end-of-pass — not a shard-level error to log. * fix(s3/lifecycle): trust persisted cursor; never bump past pending events The drain freezes cursorAdvanceTo at the last pre-skip event so pending matches (DueTime > runNow) re-enter the subscription next pass. Combined with the new cursor persistence, the floor bump (runNow - maxTTL) then orphans the very events the drain stopped at. Concrete: a rule with TTL == maxTTL fires at runNow == PUT_TIME + maxTTL, so floor (= runNow - maxTTL) lands exactly on PUT_TIME. If the last advance saved a cursor right before the not-yet-due PUT (e.g., keep2/* between expire1/* and expire3/* on the same shard), the floor bump on pass 9 skips past the expire3 event itself — the worker never re-reads it. Test symptom: expire3/* never expires when worker shards include other earlier no-match events. Cold start (found=false) still subscribes from runNow - maxTTL. Steady state honors the cursor verbatim. --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -86,8 +86,12 @@ func backdateVersionedMtime(t *testing.T, fc filer_pb.SeaweedFilerClient, bucket
|
||||
|
||||
func runLifecycleShard(t *testing.T) string {
|
||||
t.Helper()
|
||||
// -refresh 1s drives an inter-pass loop inside the 10s runtime so
|
||||
// rules that need two passes to settle (e.g.
|
||||
// TestLifecycleExpiredDeleteMarkerCleanup: pass 1 removes v1, pass 2
|
||||
// removes the now-orphaned marker) get the second pass.
|
||||
out := runShellCommand(t, fmt.Sprintf(
|
||||
"s3.lifecycle.run-shard -shards 0-15 -s3 %s -events 0 -dispatch 200ms -checkpoint 5s -runtime 10s",
|
||||
"s3.lifecycle.run-shard -shards 0-15 -s3 %s -events 0 -refresh 1s -runtime 10s",
|
||||
envOr("S3_GRPC_ENDPOINT", defaultS3GrpcEndpoint),
|
||||
))
|
||||
require.NotContains(t, out, "FATAL", "shell output:\n%s", out)
|
||||
|
||||
@@ -104,6 +104,7 @@ func walkBucketTree(ctx context.Context, client filer_pb.SeaweedFilerClient, dir
|
||||
ModTime: time.Unix(e.Attributes.Mtime, int64(e.Attributes.MtimeNs)),
|
||||
Size: int64(e.Attributes.FileSize),
|
||||
IsLatest: true, // Non-versioned default.
|
||||
Tags: extractTags(e.Extended),
|
||||
}
|
||||
return cb(entry)
|
||||
})
|
||||
@@ -183,7 +184,7 @@ func expandVersionsDir(ctx context.Context, client filer_pb.SeaweedFilerClient,
|
||||
// Resolve latest:
|
||||
// 1. Pointer names a real id -> that wins.
|
||||
// 2. Pointer absent (or stale: set but no sibling carries it)
|
||||
// + items[0] is an EXPLICIT null -> null is latest.
|
||||
// + any EXPLICIT null bare exists -> null is latest.
|
||||
// 3. Otherwise -> newest sibling (latestPos = 0 by default).
|
||||
//
|
||||
// A stale pointer falls through to the no-pointer fallback rather
|
||||
@@ -203,8 +204,13 @@ func expandVersionsDir(ctx context.Context, client filer_pb.SeaweedFilerClient,
|
||||
}
|
||||
}
|
||||
}
|
||||
if !pointerResolved && len(items) > 0 && items[0].versionID == "null" && items[0].isExplicitNull {
|
||||
latestPos = 0
|
||||
if !pointerResolved {
|
||||
for i, it := range items {
|
||||
if it.versionID == "null" && it.isExplicitNull {
|
||||
latestPos = i
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if start != "" && logical <= start {
|
||||
@@ -228,6 +234,7 @@ func expandVersionsDir(ctx context.Context, client filer_pb.SeaweedFilerClient,
|
||||
IsDeleteMarker: string(it.entry.Extended[s3_constants.ExtDeleteMarkerKey]) == "true",
|
||||
NumVersions: len(items),
|
||||
SuccessorModTime: successor,
|
||||
Tags: extractTags(it.entry.Extended),
|
||||
}
|
||||
if !isLatest {
|
||||
rank := i
|
||||
@@ -318,3 +325,21 @@ func isMPUInitDir(key string, entry *filer_pb.Entry) bool {
|
||||
v, ok := entry.Extended[s3_constants.ExtMultipartObjectKey]
|
||||
return ok && len(v) > 0
|
||||
}
|
||||
|
||||
func extractTags(ext map[string][]byte) map[string]string {
|
||||
if len(ext) == 0 {
|
||||
return nil
|
||||
}
|
||||
prefix := s3_constants.AmzObjectTagging + "-"
|
||||
var tags map[string]string
|
||||
for k, v := range ext {
|
||||
if !strings.HasPrefix(k, prefix) {
|
||||
continue
|
||||
}
|
||||
if tags == nil {
|
||||
tags = make(map[string]string)
|
||||
}
|
||||
tags[k[len(prefix):]] = string(v)
|
||||
}
|
||||
return tags
|
||||
}
|
||||
|
||||
@@ -126,6 +126,25 @@ func TestFilerListFunc_EmitsFlatFiles(t *testing.T) {
|
||||
assert.Equal(t, []string{"a.txt", "b.txt"}, got)
|
||||
}
|
||||
|
||||
func TestFilerListFunc_PropagatesTagsOnFlatFiles(t *testing.T) {
|
||||
mtime := time.Now().Add(-7 * 24 * time.Hour)
|
||||
tagged := fileWithExt("tagged.txt", mtime, 10, map[string][]byte{
|
||||
s3_constants.AmzObjectTagging + "-env": []byte("temp"),
|
||||
s3_constants.AmzObjectTagging + "-tier": []byte("cold"),
|
||||
})
|
||||
client := &fakeFiler{tree: map[string][]*filer_pb.Entry{
|
||||
"/buckets/bkt": {tagged},
|
||||
}}
|
||||
listFn := FilerListFunc(client, "/buckets")
|
||||
var got *bootstrap.Entry
|
||||
require.NoError(t, listFn(context.Background(), "bkt", "", func(e *bootstrap.Entry) error {
|
||||
got = e
|
||||
return nil
|
||||
}))
|
||||
require.NotNil(t, got)
|
||||
assert.Equal(t, map[string]string{"env": "temp", "tier": "cold"}, got.Tags)
|
||||
}
|
||||
|
||||
func TestFilerListFunc_RecursesIntoSubdirs(t *testing.T) {
|
||||
mtime := time.Now()
|
||||
client := &fakeFiler{tree: map[string][]*filer_pb.Entry{
|
||||
@@ -360,6 +379,40 @@ func TestFilerListFunc_VersionedExpansionExplicitNullIsLatestWhenPointerMissing(
|
||||
assert.Equal(t, 2, count)
|
||||
}
|
||||
|
||||
func TestFilerListFunc_VersionedExpansionExplicitNullBeatsNewerSiblingWhenPointerMissing(t *testing.T) {
|
||||
// Suspended-versioning current-ness is carried by the explicit bare
|
||||
// null, not by mtime ordering. Backdating the null to make it due
|
||||
// must not demote it behind a newer noncurrent sibling when the
|
||||
// .versions pointer is absent.
|
||||
tNull := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) // older
|
||||
tV1 := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) // newer
|
||||
bareNull := fileWithExt("foo", tNull, 1, map[string][]byte{s3_constants.ExtVersionIdKey: []byte("null")})
|
||||
client := &fakeFiler{tree: map[string][]*filer_pb.Entry{
|
||||
"/buckets/bkt": {
|
||||
bareNull,
|
||||
versionsDir("foo"+s3_constants.VersionsFolder, ""),
|
||||
},
|
||||
"/buckets/bkt/foo" + s3_constants.VersionsFolder: {
|
||||
fileWithExt("v1", tV1, 1, map[string][]byte{s3_constants.ExtVersionIdKey: []byte("v1")}),
|
||||
},
|
||||
}}
|
||||
listFn := FilerListFunc(client, "/buckets")
|
||||
var got []*bootstrap.Entry
|
||||
require.NoError(t, listFn(context.Background(), "bkt", "", func(e *bootstrap.Entry) error {
|
||||
got = append(got, e)
|
||||
return nil
|
||||
}))
|
||||
require.Len(t, got, 2)
|
||||
byID := map[string]*bootstrap.Entry{}
|
||||
for _, e := range got {
|
||||
byID[e.VersionID] = e
|
||||
}
|
||||
require.NotNil(t, byID["null"])
|
||||
require.NotNil(t, byID["v1"])
|
||||
assert.True(t, byID["null"].IsLatest, "explicit null must stay latest even when older than noncurrent siblings")
|
||||
assert.False(t, byID["v1"].IsLatest)
|
||||
}
|
||||
|
||||
func TestFilerListFunc_VersionsDirWithoutMarkersRecursesAsRegular(t *testing.T) {
|
||||
// A `.versions`-named folder whose children have no
|
||||
// ExtVersionIdKey is a coincidence (user folder). Recurse into
|
||||
|
||||
@@ -204,6 +204,20 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim
|
||||
promoted := engine.PromotedHash(snap, retentionWindow)
|
||||
|
||||
if rsh == [32]byte{} {
|
||||
// No replay-eligible rules. Walker-only rules
|
||||
// (ExpirationDate / ExpiredDeleteMarker / NewerNoncurrent)
|
||||
// or scan_only-promoted rules might still need a walk; run
|
||||
// the steady-state walker before persisting the empty
|
||||
// cursor and returning. Without this, a bucket whose only
|
||||
// rule is walker-bound would never have it dispatched —
|
||||
// the bug TestLifecycleExpirationDateInThePast caught.
|
||||
if cfg.Walker != nil {
|
||||
if _, walkView := snap.RulesForShard(shardID, retentionWindow); walkView != nil && len(walkView.AllActions()) > 0 {
|
||||
if werr := cfg.Walker(ctx, walkView, shardID); werr != nil {
|
||||
return fmt.Errorf("shard=%d: steady walk (empty replay): %w", shardID, werr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return cfg.Persister.Save(ctx, shardID, Cursor{
|
||||
TsNs: 0,
|
||||
RuleSetHash: rsh,
|
||||
@@ -211,22 +225,36 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim
|
||||
})
|
||||
}
|
||||
|
||||
// Recovery: rule-content edit (RuleSetHash mismatch) or partition
|
||||
// flip (PromotedHash mismatch). Walk the rewritten rule set so
|
||||
// already-due objects fire before the cursor rewinds; then rewind
|
||||
// and let the sliding meta-log replay catch up steady state.
|
||||
if found && (persisted.RuleSetHash != rsh || persisted.PromotedHash != promoted) {
|
||||
// Recovery / cold-start walker:
|
||||
// - found && hashes mismatch: rule edit or partition flip — walk
|
||||
// the rewritten rule set so already-due objects fire before the
|
||||
// cursor rewinds, then rewind for meta-log replay.
|
||||
// - !found: first run for this shard. Pre-existing objects PUT
|
||||
// before the rule was added live OUTSIDE the meta-log scan
|
||||
// window (TsNs > runNow - maxTTL) and would never replay; the
|
||||
// walker has to discover them. The streaming worker did this
|
||||
// via BucketBootstrapper; daily-replay needs the same.
|
||||
mustWalkRecovery := found && (persisted.RuleSetHash != rsh || persisted.PromotedHash != promoted)
|
||||
mustWalkColdStart := !found
|
||||
if mustWalkRecovery || mustWalkColdStart {
|
||||
if cfg.Walker != nil {
|
||||
if werr := cfg.Walker(ctx, engine.RecoveryView(snap), shardID); werr != nil {
|
||||
return fmt.Errorf("shard=%d: recovery walk: %w", shardID, werr)
|
||||
}
|
||||
}
|
||||
next := Cursor{
|
||||
TsNs: runNow.Add(-maxTTL).UnixNano(),
|
||||
RuleSetHash: rsh,
|
||||
PromotedHash: promoted,
|
||||
if mustWalkRecovery {
|
||||
// Rule changed: rewind cursor so the sliding replay re-scans
|
||||
// the new max-TTL window and the persisted hashes match the
|
||||
// new rule set.
|
||||
next := Cursor{
|
||||
TsNs: runNow.Add(-maxTTL).UnixNano(),
|
||||
RuleSetHash: rsh,
|
||||
PromotedHash: promoted,
|
||||
}
|
||||
return cfg.Persister.Save(ctx, shardID, next)
|
||||
}
|
||||
return cfg.Persister.Save(ctx, shardID, next)
|
||||
// Cold start: keep TsNs=0 so the drain below floors to
|
||||
// runNow - maxTTL and the cursor is saved fresh after the run.
|
||||
}
|
||||
|
||||
// Steady-state walker for walker-bound and scan_only-promoted rules.
|
||||
@@ -244,20 +272,39 @@ func runShard(ctx context.Context, cfg Config, snap *engine.Snapshot, runNow tim
|
||||
}
|
||||
|
||||
// Cold start: scan from now-maxTTL so already-due objects within
|
||||
// meta-log retention still expire.
|
||||
// meta-log retention still expire. In steady state honor the
|
||||
// cursor as-is: the drain freezes the cursor at the last pre-skip
|
||||
// event so pending matches with DueTime == TsNs+maxTTL stay in
|
||||
// scope across passes. Bumping forward to runNow-maxTTL would
|
||||
// orphan exactly those events (the test_lifecyclev2_expiration
|
||||
// regression: cursor saved at the no-match event right before
|
||||
// the not-yet-due expire3 PUT, then floor at runNow=PUT+maxTTL
|
||||
// equals PUT — bumping past the expire3 event itself).
|
||||
startTsNs := persisted.TsNs
|
||||
floor := runNow.Add(-maxTTL).UnixNano()
|
||||
if startTsNs < floor {
|
||||
startTsNs = floor
|
||||
if !found {
|
||||
startTsNs = runNow.Add(-maxTTL).UnixNano()
|
||||
}
|
||||
|
||||
lastOK, _, drainErr := drainShardEvents(ctx, cfg, runNow, shardID, snap, startTsNs)
|
||||
// Cursor save uses a fresh ctx because the steady-state drain exits
|
||||
// via passCtx cancellation (the only signal the filer subscription
|
||||
// gets when no new events arrive). Saving with the canceled passCtx
|
||||
// would silently drop the cursor and the next pass would re-replay
|
||||
// from the same floor — defeating advancement entirely.
|
||||
saveCtx, saveCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer saveCancel()
|
||||
if drainErr != nil {
|
||||
_ = cfg.Persister.Save(ctx, shardID, Cursor{TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted})
|
||||
_ = cfg.Persister.Save(saveCtx, shardID, Cursor{TsNs: lastOK, RuleSetHash: rsh, PromotedHash: promoted})
|
||||
// passCtx timeout is the expected end-of-pass for an idle
|
||||
// subscription; not a real error. Other drain errors still
|
||||
// propagate.
|
||||
if errors.Is(drainErr, context.DeadlineExceeded) || errors.Is(drainErr, context.Canceled) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("shard=%d: drain: %w", shardID, drainErr)
|
||||
}
|
||||
|
||||
return cfg.Persister.Save(ctx, shardID, Cursor{
|
||||
return cfg.Persister.Save(saveCtx, shardID, Cursor{
|
||||
TsNs: lastOK,
|
||||
RuleSetHash: rsh,
|
||||
PromotedHash: promoted,
|
||||
|
||||
@@ -35,16 +35,18 @@ func (d *WalkerDispatcher) Delete(ctx context.Context, action *engine.CompiledAc
|
||||
}
|
||||
objectPath := entry.Path
|
||||
if entry.IsMPUInit {
|
||||
// Rule-prefix matching used DestKey; the server takes the
|
||||
// canonical object path for the LifecycleDelete RPC, which
|
||||
// is also DestKey. The walker hits the .uploads/<id>
|
||||
// directory itself only when ActionKind=ABORT_MPU, and the
|
||||
// server resolves the upload from (bucket, object_path) +
|
||||
// the init record's metadata.
|
||||
// Rule-prefix matching uses DestKey (the user's intended
|
||||
// object key); dispatch uses entry.Path (.uploads/<id>),
|
||||
// which is what the server's ABORT_MPU handler expects in
|
||||
// req.ObjectPath — it strips the .uploads/ prefix to get
|
||||
// the upload id and reads the init record from that
|
||||
// directory. DestKey is the dispatch ANTI-pattern here: it
|
||||
// looks like a regular object path, the server's check for
|
||||
// the .uploads/ prefix fails, and the dispatch comes back
|
||||
// as BLOCKED FATAL_EVENT_ERROR.
|
||||
if entry.DestKey == "" {
|
||||
return fmt.Errorf("walker dispatch: MPU init entry with empty DestKey: %s", entry.Path)
|
||||
}
|
||||
objectPath = entry.DestKey
|
||||
}
|
||||
rh := action.Key.RuleHash
|
||||
req := &s3_lifecycle_pb.LifecycleDeleteRequest{
|
||||
|
||||
@@ -72,9 +72,11 @@ func TestWalkerDispatcher_VersionedPassesVersionID(t *testing.T) {
|
||||
assert.Equal(t, "v-abc", c.lastReq.VersionId)
|
||||
}
|
||||
|
||||
func TestWalkerDispatcher_MPUInitUsesDestKey(t *testing.T) {
|
||||
// Rule-prefix matching used DestKey; the RPC ObjectPath must match
|
||||
// so the server resolves the upload from the user's intended key.
|
||||
func TestWalkerDispatcher_MPUInitUsesUploadsPath(t *testing.T) {
|
||||
// Rule-prefix matching uses DestKey to decide IF this MPU init
|
||||
// matches; dispatch uses Path (.uploads/<id>) because the server's
|
||||
// ABORT_MPU handler strips the .uploads/ prefix to get the upload
|
||||
// id. Sending DestKey here would BLOCK with FATAL_EVENT_ERROR.
|
||||
c := &walkerStubClient{outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_DONE}
|
||||
d := &WalkerDispatcher{Client: c}
|
||||
a := sampleAction(t, s3lifecycle.ActionKindAbortMPU)
|
||||
@@ -84,7 +86,7 @@ func TestWalkerDispatcher_MPUInitUsesDestKey(t *testing.T) {
|
||||
IsMPUInit: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "user/path/object", c.lastReq.ObjectPath)
|
||||
assert.Equal(t, ".uploads/abc123", c.lastReq.ObjectPath)
|
||||
}
|
||||
|
||||
func TestWalkerDispatcher_MPUInitEmptyDestKeyErrors(t *testing.T) {
|
||||
|
||||
@@ -122,9 +122,24 @@ func (c *commandS3LifecycleRunShard) Do(args []string, env *CommandEnv, writer i
|
||||
fmt.Fprintf(writer, "running shards %s (event budget=%d, runtime=%s, refresh=%s)…\n",
|
||||
formatShardLabel(shards), *eventBudget, *runtime, *cadence)
|
||||
|
||||
announcedLoad := false
|
||||
runPass := func() error {
|
||||
// When -refresh is set we MUST cap each pass; otherwise
|
||||
// drainShardEvents blocks until the outer runtime ctx
|
||||
// expires and the loop's ticker never fires. With cadence=0
|
||||
// (one-shot) the pass uses the full ctx and returns when
|
||||
// the runtime cap hits.
|
||||
passCtx := ctx
|
||||
if *cadence > 0 {
|
||||
var passCancel context.CancelFunc
|
||||
// Pass budget = cadence + grace. Grace gives the
|
||||
// drain time to actually process events that arrived
|
||||
// during the cadence window.
|
||||
passCtx, passCancel = context.WithTimeout(ctx, *cadence+5*time.Second)
|
||||
defer passCancel()
|
||||
}
|
||||
eng := engine.New()
|
||||
inputs, parseErrors, err := scheduler.LoadCompileInputs(ctx, filerClient, bucketsPath)
|
||||
inputs, parseErrors, err := scheduler.LoadCompileInputs(passCtx, filerClient, bucketsPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load lifecycle configs: %w", err)
|
||||
}
|
||||
@@ -140,6 +155,10 @@ func (c *commandS3LifecycleRunShard) Do(args []string, env *CommandEnv, writer i
|
||||
if len(inputs) == 0 {
|
||||
return nil
|
||||
}
|
||||
if !announcedLoad {
|
||||
fmt.Fprintf(writer, "loaded lifecycle for %d bucket(s)\n", len(inputs))
|
||||
announcedLoad = true
|
||||
}
|
||||
buckets := make([]string, 0, len(inputs))
|
||||
for _, in := range inputs {
|
||||
if in.Bucket != "" {
|
||||
@@ -149,7 +168,7 @@ func (c *commandS3LifecycleRunShard) Do(args []string, env *CommandEnv, writer i
|
||||
walker := dailyrun.WalkerFunc(func(walkCtx context.Context, view *engine.Snapshot, shardID int) error {
|
||||
return dailyrun.WalkBuckets(walkCtx, view, shardID, buckets, listFn, walkerDispatch)
|
||||
})
|
||||
return dailyrun.Run(ctx, dailyrun.Config{
|
||||
return dailyrun.Run(passCtx, dailyrun.Config{
|
||||
Shards: shards,
|
||||
BucketsPath: bucketsPath,
|
||||
Engine: eng,
|
||||
@@ -157,6 +176,11 @@ func (c *commandS3LifecycleRunShard) Do(args []string, env *CommandEnv, writer i
|
||||
Client: client,
|
||||
Persister: &dailyrun.FilerCursorPersister{Store: dispatcher.NewFilerStoreClient(filerClient)},
|
||||
Lister: dispatcher.NewFilerSiblingLister(filerClient, bucketsPath),
|
||||
// The shell command is used for bounded one-shot sweeps in
|
||||
// integration tests and CI. Fan out across the selected shards
|
||||
// so recovery walks do not serialize 16 shard scans into a 10s
|
||||
// timeout budget.
|
||||
Workers: len(shards),
|
||||
Walker: walker,
|
||||
EventBudget: *eventBudget,
|
||||
ClientName: fmt.Sprintf("shell-lifecycle-%s", formatShardLabel(shards)),
|
||||
|
||||
Reference in New Issue
Block a user