Files
seaweedfs/weed/shell/command_s3_lifecycle_run_shard.go
T
Chris Lu b1d59b04a8 fix(s3/lifecycle): walker dispatch uses entry.Path for ABORT_MPU (#9477)
* fix(s3/lifecycle): WalkerDispatcher uses entry.Path for ABORT_MPU + shell announces load

Two CI-surfaced bugs caught by PR #9471's S3 Lifecycle Tests run on
master after PRs #9475 + #9466:

1. Walker dispatch for ABORT_MPU was sending entry.DestKey as
   req.ObjectPath. The server's ABORT_MPU handler
   (weed/s3api/s3api_internal_lifecycle.go) strips the .uploads/
   prefix to extract the upload id and reads the init record from
   that directory, so it expects the .uploads/<id> path verbatim.
   DestKey looks like a regular object path; the server's prefix
   check fails and the dispatch returns BLOCKED with
   "FATAL_EVENT_ERROR: ABORT_MPU object_path missing .uploads/
   prefix". The test fix renames TestWalkerDispatcher_MPUInitUsesDestKey
   to ...UsesUploadsPath and inverts the assertion to match the
   actual server contract.

   DestKey is still used for the WalkBuckets shard predicate and
   for rule-prefix matching in bootstrap.walker; both surfaces want
   the user's intended path, while DISPATCH wants the .uploads/<id>
   directory. The bootstrap test
   (TestLifecycleAbortIncompleteMultipartUpload) caught this when
   the walker's BLOCKED error surfaced as FATAL output.

2. test/s3/lifecycle/s3_lifecycle_empty_bucket_test.go asserts the
   shell command logs "loaded lifecycle for N bucket(s)" so a
   regression that produces half-shaped output (no load summary)
   is caught. The restored shell command (PR #9475) didn't print
   that line; add it back on the first pass that finds non-zero
   inputs.

* fix(s3/lifecycle): walker fires for walker-only buckets (empty replay path)

runShard's empty-replay sentinel (rsh == [32]byte{}) was returning
BEFORE the steady-state walker check. A bucket whose only lifecycle
rule was walker-only (ExpirationDate / ExpiredDeleteMarker /
NewerNoncurrent) would never have it dispatched because:

  - ReplayContentHash only hashes replay-eligible kinds, so
    walker-only-only snapshots produce rsh == empty.
  - The early-return persisted the empty cursor and exited before
    the steady-state walker block at the bottom of the function.

Move the walker invocation INTO the empty-replay branch so walker-
only rules dispatch on the same path as mixed-rule buckets.

TestLifecycleExpirationDateInThePast and
TestLifecycleExpiredDeleteMarkerCleanup were both timing out their
"object must be deleted" Eventually polls because of this. Caught
on PR #9471's S3 Lifecycle Tests run after PR #9475 restored the
shell entry point that exercises the integration tests.

* fix(s3/lifecycle): cold-start walker covers pre-existing objects

runShard only walked the bucket tree on the recovery branch (found
&& hash mismatch). For a fresh worker with no persisted cursor,
found=false, so the recovery walker never fired and the meta-log
replay only scanned runNow - maxTTL of events. Objects PUT before
that window — including pre-existing objects in a newly-rule-enabled
bucket — never matched the rule.

The streaming worker handled this with scheduler.BucketBootstrapper.
Daily-replay needed the equivalent: walk the live tree once on the
first run for each shard so pre-existing objects get evaluated even
when their PUT events are outside meta-log scan window.

Restructured the recovery branch to fire the walker on either
(found && mismatch) OR !found. On cold-start the cursor isn't
rewound — we keep TsNs=0 and let the drain below floor to
runNow - maxTTL like before; the walker just handles whatever the
sliding window can't reach.

TestLifecycleBootstrapWalkOnExistingObjects was the exact CI failure
this addresses (https://github.com/seaweedfs/seaweedfs/actions/runs/25777823522/job/75714014151).

* fix(s3/lifecycle): restore walker tag and null-version state

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(s3/lifecycle): parallelize shell shard sweeps

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(s3/lifecycle): bound each runPass ctx + refresh in runLifecycleShard

Two CI bugs surfaced after PR #9466 deleted the streaming worker:

1. The shell command's -refresh loop never fires. runPass used the
   outer ctx (full -runtime), so dailyrun.Run blocked for the entire
   1800s s3tests window — the background worker only ran one pass
   and never re-loaded configs that tests created mid-run.
   test_lifecycle_expiration sees 6 objects when expecting 4 because
   expire1/* never reaches the worker's snapshot. Cap each pass to
   cadence+5s when cadence>0; one-shot (cadence=0) keeps the full ctx.

2. TestLifecycleExpiredDeleteMarkerCleanup's docstring says
   "pass 1 cleans v1; pass 2 removes the now-orphaned marker," but
   runLifecycleShard invoked with no -refresh — only one pass ran.
   The marker rule can't fire in the same pass that dispatches v1's
   delete because v1 is still in .versions/. Add -refresh 1s so the
   10s runtime gets multiple passes.

* fix(s3/lifecycle): persist cursor with fresh ctx after passCtx timeout

drainShardEvents only exits via ctx cancellation for an idle subscription
— that's the steady-state when all replayed events are already past.
Saving the cursor with the canceled passCtx silently drops every
advance, so the next pass re-subscribes from the same floor and
re-replays the same events. Symptom in s3tests: status=error shards=16
errors=16 on every pass, and 1/6 expire3/* dispatches lost to a race
between concurrent shard drains all retrying the same events.

Use a 5s timeout derived from context.Background for the save, and
treat passCtx Deadline/Canceled from drain as a clean end-of-pass —
not a shard-level error to log.

* fix(s3/lifecycle): trust persisted cursor; never bump past pending events

The drain freezes cursorAdvanceTo at the last pre-skip event so pending
matches (DueTime > runNow) re-enter the subscription next pass. Combined
with the new cursor persistence, the floor bump (runNow - maxTTL) then
orphans the very events the drain stopped at.

Concrete: a rule with TTL == maxTTL fires at runNow == PUT_TIME +
maxTTL, so floor (= runNow - maxTTL) lands exactly on PUT_TIME. If the
last advance saved a cursor right before the not-yet-due PUT (e.g.,
keep2/* between expire1/* and expire3/* on the same shard), the floor
bump on pass 9 skips past the expire3 event itself — the worker never
re-reads it. Test symptom: expire3/* never expires when worker shards
include other earlier no-match events.

Cold start (found=false) still subscribes from runNow - maxTTL. Steady
state honors the cursor verbatim.

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-13 00:19:05 -07:00

340 lines
11 KiB
Go

package shell
import (
"context"
"errors"
"flag"
"fmt"
"io"
"sort"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_lifecycle_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/dailyrun"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/dispatcher"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/engine"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/scheduler"
)
func init() {
Commands = append(Commands, &commandS3LifecycleRunShard{})
}
type commandS3LifecycleRunShard struct{}
func (c *commandS3LifecycleRunShard) Name() string {
return "s3.lifecycle.run-shard"
}
func (c *commandS3LifecycleRunShard) Help() string {
return `manually run one daily-replay pass for the given shards
Drives dailyrun.Run once against the live filer + S3 server: builds
the engine snapshot from filer-backed bucket configs, opens the
meta-log subscription per shard, dispatches due actions via
LifecycleDelete, and walks the live tree for any walker-bound rules.
Persists each shard's cursor to /etc/s3/lifecycle/daily-cursors/
so subsequent runs resume.
Used by the s3-tests CI workflow and the test/s3/lifecycle/
integration tests to drive expirations on demand without standing up
the full admin+worker plugin stack.
# single shard
s3.lifecycle.run-shard -shard 0 -s3 localhost:8333 -events 100
# contiguous range
s3.lifecycle.run-shard -shards 0-15 -s3 localhost:8333 -events 5000
# explicit set
s3.lifecycle.run-shard -shards 0,3,7 -s3 localhost:8333
# bounded wall-clock
s3.lifecycle.run-shard -shards 0-15 -s3 localhost:8333 -runtime 10s
`
}
func (c *commandS3LifecycleRunShard) HasTag(CommandTag) bool { return false }
func (c *commandS3LifecycleRunShard) Do(args []string, env *CommandEnv, writer io.Writer) error {
fs := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
shard := fs.Int("shard", -1, "single shard id in [0, 16); use -shards for a range or set")
shardsSpec := fs.String("shards", "", "shard range \"lo-hi\" or comma list \"a,b,c\"; mutually exclusive with -shard")
s3Endpoint := fs.String("s3", "", "s3 server gRPC endpoint, host:port")
eventBudget := fs.Int("events", 1000, "max in-shard events per pass (0 = drain to now)")
runtime := fs.Duration("runtime", 0, "wall-clock cap on the whole run; 0 = no timeout")
// -refresh drives the inter-pass cadence when the command runs as a
// long-lived worker (the s3tests CI workflow case): every refresh
// the engine snapshot is re-loaded and dailyrun.Run fires another
// pass. 0 means "run once and exit" (the integration-test case).
cadence := fs.Duration("refresh", 0, "inter-pass interval; 0 = single pass, then exit")
// Obsolete flags kept for back-compat with existing CI scripts and
// integration tests. Accept and ignore.
_ = fs.Duration("dispatch", 0, "ignored (legacy streaming flag)")
_ = fs.Duration("checkpoint", 0, "ignored (legacy streaming flag)")
_ = fs.Duration("bootstrap-interval", 0, "ignored (legacy streaming flag)")
if err := fs.Parse(args); err != nil {
return err
}
shards, err := resolveShardSelection(*shard, *shardsSpec)
if err != nil {
return err
}
if *s3Endpoint == "" {
return fmt.Errorf("-s3 required (host:port of s3 server gRPC)")
}
if *eventBudget < 0 {
return fmt.Errorf("-events must be >= 0 (0 = unbounded)")
}
bucketsPath, err := resolveBucketsPath(env)
if err != nil {
return fmt.Errorf("resolve buckets path: %w", err)
}
fmt.Fprintf(writer, "buckets path: %s\n", bucketsPath)
dialCtx, dialCancel := context.WithTimeout(context.Background(), 30*time.Second)
conn, err := pb.GrpcDial(dialCtx, *s3Endpoint, false, env.option.GrpcDialOption)
dialCancel()
if err != nil {
return fmt.Errorf("dial s3 %s: %w", *s3Endpoint, err)
}
defer conn.Close()
rpcClient := s3_lifecycle_pb.NewSeaweedS3LifecycleInternalClient(conn)
return env.WithFilerClient(true, func(filerClient filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
var cancel context.CancelFunc
if *runtime > 0 {
ctx, cancel = context.WithTimeout(ctx, *runtime)
defer cancel()
}
client := &lifecycleClientCallable{c: rpcClient}
listFn := dailyrun.FilerListFunc(filerClient, bucketsPath)
walkerDispatch := &dailyrun.WalkerDispatcher{Client: client}
fmt.Fprintf(writer, "running shards %s (event budget=%d, runtime=%s, refresh=%s)…\n",
formatShardLabel(shards), *eventBudget, *runtime, *cadence)
announcedLoad := false
runPass := func() error {
// When -refresh is set we MUST cap each pass; otherwise
// drainShardEvents blocks until the outer runtime ctx
// expires and the loop's ticker never fires. With cadence=0
// (one-shot) the pass uses the full ctx and returns when
// the runtime cap hits.
passCtx := ctx
if *cadence > 0 {
var passCancel context.CancelFunc
// Pass budget = cadence + grace. Grace gives the
// drain time to actually process events that arrived
// during the cadence window.
passCtx, passCancel = context.WithTimeout(ctx, *cadence+5*time.Second)
defer passCancel()
}
eng := engine.New()
inputs, parseErrors, err := scheduler.LoadCompileInputs(passCtx, filerClient, bucketsPath)
if err != nil {
return fmt.Errorf("load lifecycle configs: %w", err)
}
for i, pe := range parseErrors {
if i < 3 {
fmt.Fprintf(writer, "warning: %s: %v\n", pe.Bucket, pe.Err)
}
}
if extra := len(parseErrors) - 3; extra > 0 {
fmt.Fprintf(writer, "warning: %d additional bucket(s) had malformed lifecycle config\n", extra)
}
eng.Compile(inputs, engine.CompileOptions{PriorStates: scheduler.AllActivePriorStates(inputs)})
if len(inputs) == 0 {
return nil
}
if !announcedLoad {
fmt.Fprintf(writer, "loaded lifecycle for %d bucket(s)\n", len(inputs))
announcedLoad = true
}
buckets := make([]string, 0, len(inputs))
for _, in := range inputs {
if in.Bucket != "" {
buckets = append(buckets, in.Bucket)
}
}
walker := dailyrun.WalkerFunc(func(walkCtx context.Context, view *engine.Snapshot, shardID int) error {
return dailyrun.WalkBuckets(walkCtx, view, shardID, buckets, listFn, walkerDispatch)
})
return dailyrun.Run(passCtx, dailyrun.Config{
Shards: shards,
BucketsPath: bucketsPath,
Engine: eng,
FilerClient: filerClient,
Client: client,
Persister: &dailyrun.FilerCursorPersister{Store: dispatcher.NewFilerStoreClient(filerClient)},
Lister: dispatcher.NewFilerSiblingLister(filerClient, bucketsPath),
// The shell command is used for bounded one-shot sweeps in
// integration tests and CI. Fan out across the selected shards
// so recovery walks do not serialize 16 shard scans into a 10s
// timeout budget.
Workers: len(shards),
Walker: walker,
EventBudget: *eventBudget,
ClientName: fmt.Sprintf("shell-lifecycle-%s", formatShardLabel(shards)),
})
}
if err := runPass(); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("daily_run: %w", err)
}
// cadence=0 → one-shot (test/s3/lifecycle/ uses this).
// cadence>0 → loop until ctx done (s3tests CI workflow uses this).
if *cadence > 0 {
ticker := time.NewTicker(*cadence)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
fmt.Fprintf(writer, "shards %s complete; ctx done\n", formatShardLabel(shards))
return nil
case <-ticker.C:
if err := runPass(); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
fmt.Fprintf(writer, "shards %s pass error: %v\n", formatShardLabel(shards), err)
}
}
}
}
fmt.Fprintf(writer, "shards %s complete; cursors checkpointed\n", formatShardLabel(shards))
return nil
})
}
// resolveShardSelection turns the -shard / -shards flags into a sorted,
// deduplicated []int. Exactly one form must be specified.
func resolveShardSelection(singleShard int, shardsSpec string) ([]int, error) {
if singleShard >= 0 && shardsSpec != "" {
return nil, fmt.Errorf("-shard and -shards are mutually exclusive")
}
if singleShard < 0 && shardsSpec == "" {
return nil, fmt.Errorf("specify -shard <id> or -shards <range|set>")
}
if singleShard >= 0 {
if singleShard >= s3lifecycle.ShardCount {
return nil, fmt.Errorf("-shard %d out of [0,%d)", singleShard, s3lifecycle.ShardCount)
}
return []int{singleShard}, nil
}
return parseShardsSpec(shardsSpec)
}
// parseShardsSpec accepts "lo-hi" (inclusive) or "a,b,c" and returns a
// sorted, deduplicated, in-range []int.
func parseShardsSpec(spec string) ([]int, error) {
spec = strings.TrimSpace(spec)
seen := map[int]struct{}{}
add := func(v int) error {
if v < 0 || v >= s3lifecycle.ShardCount {
return fmt.Errorf("shard %d out of [0,%d)", v, s3lifecycle.ShardCount)
}
seen[v] = struct{}{}
return nil
}
if strings.Contains(spec, "-") && !strings.Contains(spec, ",") {
parts := strings.SplitN(spec, "-", 2)
lo, err := strconv.Atoi(strings.TrimSpace(parts[0]))
if err != nil {
return nil, fmt.Errorf("range lo: %w", err)
}
hi, err := strconv.Atoi(strings.TrimSpace(parts[1]))
if err != nil {
return nil, fmt.Errorf("range hi: %w", err)
}
if lo > hi {
return nil, fmt.Errorf("range lo %d > hi %d", lo, hi)
}
for v := lo; v <= hi; v++ {
if err := add(v); err != nil {
return nil, err
}
}
} else {
for _, part := range strings.Split(spec, ",") {
part = strings.TrimSpace(part)
if part == "" {
continue
}
v, err := strconv.Atoi(part)
if err != nil {
return nil, fmt.Errorf("shard list: %w", err)
}
if err := add(v); err != nil {
return nil, err
}
}
}
if len(seen) == 0 {
return nil, fmt.Errorf("empty shard set")
}
out := make([]int, 0, len(seen))
for v := range seen {
out = append(out, v)
}
sort.Ints(out)
return out, nil
}
func formatShardLabel(shards []int) string {
if len(shards) == 1 {
return fmt.Sprintf("%d", shards[0])
}
contiguous := true
for i := 1; i < len(shards); i++ {
if shards[i] != shards[i-1]+1 {
contiguous = false
break
}
}
if contiguous {
return fmt.Sprintf("%d-%d", shards[0], shards[len(shards)-1])
}
parts := make([]string, len(shards))
for i, v := range shards {
parts[i] = strconv.Itoa(v)
}
return strings.Join(parts, ",")
}
// lifecycleClientCallable adapts the generated grpc client (variadic
// CallOption tail) to dailyrun.LifecycleClient.
type lifecycleClientCallable struct {
c s3_lifecycle_pb.SeaweedS3LifecycleInternalClient
}
func (l *lifecycleClientCallable) LifecycleDelete(ctx context.Context, req *s3_lifecycle_pb.LifecycleDeleteRequest) (*s3_lifecycle_pb.LifecycleDeleteResponse, error) {
return l.c.LifecycleDelete(ctx, req)
}
// resolveBucketsPath fetches the filer's configured buckets directory.
// Falls back to /buckets when the filer doesn't return one.
func resolveBucketsPath(env *CommandEnv) (string, error) {
var path string
err := env.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
}
path = resp.GetDirBuckets()
return nil
})
if err != nil {
return "", err
}
if path == "" {
path = "/buckets"
}
return path, nil
}