docs(s3/lifecycle): reflect shipped reader, obsolete Phase 6 (#9419)

* test(s3/lifecycle/engine): pin delay-group dedup across buckets

Compile a 100-bucket × 5-rule snapshot where the five Days values
include duplicates (1, 1, 7, 7, 30) and assert:

- snap.actions has 500 entries — every (bucket, rule) compiles to its
  own ActionKey, no collapse.
- snap.originalDelayGroups has exactly 3 entries — the routing index
  is keyed by Delay, so same-day rules across all buckets share a
  group. This is the property that lets the dispatcher index by
  delay group rather than per-rule.
- Per-group key count = (rules with that day) × buckets, so every
  action is reachable from its group entry.

* docs(s3/lifecycle): reflect shipped reader, obsolete Phase 6

The reader didn't end up building per-filer-shard cursors,
RetainedLogRangePerShard / EarliestRetainedPositionPerShard probes,
tail_drained_streams GC, or CollectLogFileRefs heap-merge. The
filer's SubscribeMetadata already aggregates persisted + in-memory
logs across peer filers, so a single global cursor on a single
subscription is sufficient.

Mark the per-shard architecture and lost-log GC pseudocode as
implementation-note + obsoleted, rewrite Phase 6 to call out what
was descoped and why, point readers at the narrow ResumeFromDiskError
auto-degrade follow-up that is the only residual gap, and update
Layer 4 to note no multi-filer test harness is needed.

* docs(s3/lifecycle): drop stale per-shard cursor wording in Phase 3

The "one sweep per delay group" bullet still described
last_processed_original / last_processed_predicate as per-shard maps,
contradicting the implementation note added a few lines above. The
shipped reader stores one global (ts_ns, offset) position per delay
group; rewrite the bullet to match.
This commit is contained in:
Chris Lu
2026-05-10 10:40:33 -07:00
committed by GitHub
parent 82648cca53
commit ca12934834
+34 -25
View File
@@ -110,7 +110,9 @@ This is more state than the previous "single subscription, single cursor" claim
**Cursor-skip semantics.** `last_processed[filer_id]` points at the *last resolved event* for that shard. Resume must skip events with position `<= cursor[filer_id]` (using the `(ts, offset)` order per shard) — strict `<=`, not `<`, otherwise the last processed event is replayed on every resume. Equivalently: deliver events with `(event.ts, event.offset) > cursor[filer_id]`. Phase 3 introduces a new API variant that takes per-shard `MessagePosition`s and applies this comparator — see "API change" below. The existing `pb.ReadLogFileRefs` filter `logEntry.TsNs <= startTsNs` (`weed/pb/filer_pb_direct_read.go:333`) is the wrong shape because it can't disambiguate equal-ts events.
**Required API addition** (Phase 3 dependency): `pb.ReadLogFileRefsWithPosition` extending the current `ReadLogFileRefs`:
> **Implementation note:** the API addition described below was not built. The shipped reader uses `SubscribeMetadata` with a single global cursor (the filer's server-side `MetaAggregator` already does the merge). The pseudocode is preserved as design context for a future per-shard architecture.
**~~Required API addition~~ (Phase 3 dependency, not built)**: `pb.ReadLogFileRefsWithPosition` extending the current `ReadLogFileRefs`:
- Take a per-shard `start_positions: map<filer_id, MessagePosition>` instead of a single `startTsNs`.
- Per-event callback signature `eachLogEntry(event, filer_id, position) → CallbackResult`.
@@ -565,6 +567,8 @@ Policy changes during a bucket bootstrap: the bootstrap is bound to a specific `
### Reader — single subscription, client-side merge
> **Implementation note (post-Phase 3):** the production reader uses `client.SubscribeMetadata(...)` directly with a single global cursor. The filer's `SubscribeMetadata` server (`weed/server/filer_grpc_server_sub_meta.go`) already aggregates across peer filers — it reads disk-persisted logs from every filer via `sendLogFileRefs` then drains `MetaAggregator.MetaLogBuffer` for the in-memory tail — so the worker doesn't have to assemble its own multi-filer merge. The per-filer-shard cursor model below was designed before this was wired and was not built; the proto fields (`last_processed_original` map keyed by `filer_id`, `tail_drained_streams`) remain as forward-compat stubs. The pseudocode below describes the alternative the design considered, kept here for context. See "Multi-filer durability" and the obsoleted Phase 6 section.
The reader is one cluster-singleton task subscribed to one filer at a time. `Filer.CollectLogFileRefs(start_position, stop_ts)` returns chunk refs from log files in directory order. The worker passes those refs to `pb.ReadLogFileRefsWithPosition` (the new API — see Task #19), which heap-merges per-filer chunks by event ts client-side and applies the per-shard `(ts, offset)` skip filter. Chunk *bodies* are read in parallel from volume servers via the returned fileIds.
Cursors are per-filer-shard. `last_processed_original[D.seconds]` is `map<filer_id, MessagePosition>`; `MessagePosition = {ts_ns, offset}` disambiguates equal-ts events within a per-filer chunk. Resume skips events with `(event.ts, event.offset) <= cursor[filer_id]` — strict `<=` so the last resolved event isn't replayed.
@@ -1394,6 +1398,8 @@ Why a separate lane matters here:
### Multi-filer durability
> **Implementation note (post-Phase 3):** the worker did not end up building its own multi-filer merge. `SubscribeMetadata` on the filer server (`weed/server/filer_grpc_server_sub_meta.go:154`) routes through `MetaAggregator` when peer filers exist: it reads from every filer's disk-persisted logs first (via `sendLogFileRefs`/`Filer.CollectLogFileRefs`), then drains the aggregator's in-memory tail. Workers get an aggregated, durable stream from a single subscription. The per-filer-shard cursor architecture below describes what would be needed if the worker had to assemble the merge itself; we kept it here as design context and as a reference for the proto stubs (`LOST_LOG`, `tail_drained_streams`) that remain forward-compatible if the architecture ever changes. The genuine residual gap — auto-degrading the worker on `ResumeFromDiskError` — is tracked separately as a narrow follow-up rather than a Phase.
`MetaAggregator` is **in-memory only** — `weed/filer/meta_aggregator.go:39`: *"MetaAggregator only aggregates data 'on the fly'. The logs are not re-persisted to disk. The old data comes from what each LocalMetadata persisted on disk."*
So the aggregator can't be the worker's source of truth: a worker restart, a crash, or a peer outage all lose unpersisted aggregated events. The persistent source is per-filer logs at `/topics/.system/log/<date>/<hour-min>.<filerId>` — written by each filer for its own events, durable, and queryable via `Filer.CollectLogFileRefs` and `Filer.ReadPersistedLogBuffer`.
@@ -1904,11 +1910,13 @@ Phasing changed: **the back-stamp cannot be removed until the worker can take ov
## Phase 3 — Shared cluster reader + delay-group sweeps (non-versioned)
- Implement `s3.lifecycle.read` task: cluster singleton, subscribed to one filer at a time. `Filer.CollectLogFileRefs` returns chunk refs in directory order; the worker uses `pb.ReadLogFileRefsWithPosition` (Task #19) to heap-merge per-filer chunks by event ts client-side, with per-shard `MessagePosition` skip filtering. Worker reads chunk bodies in parallel from volume servers. Cursors are per-filer-shard: `last_processed_original: map<delay_seconds, map<filer_id, MessagePosition>>` and `last_processed_predicate: map<filer_id, MessagePosition>` — `LogEntry.Offset` is per-buffer (per filerId), not globally unique, so per-shard cursors are required. Resume skips events with `(ts, offset) <= cursor[filer_id]` (strict `<=`). Listing uses the low-water-mark across per-filer cursors. New shards seeded lazily via `seedNewlyDiscoveredShardsLazily` using `EarliestRetainedPositionPerShard` (Task #19), and seeding clears any matching `tail_drained_streams` marker. Tail-drain GC at task entry uses `RetainedLogRangePerShard` (Task #19) to remove cursor entries where `cursor >= range.latest`, record the matching `tail_drained_streams` key, and downgrade to `scan_only` with `LOST_LOG` if logs vanished before a stream caught up. Failover to another filer endpoint on connection error; per-filer cursor map is portable. Periodic checkpointing (default every 1000 events or 5s) bounds crash redo.
- **Tail-drain tests required in this phase**: (a) cursor at exactly `range.latest` for a shard → tail-drained, entry GC'd, matching `tail_drained_streams` key updated, low-water-mark recovers; (b) cursor before `range.latest` → not GC'd, entry preserved; (c) shard removed from cluster but logs still retained → not GC'd until each stream cursor catches up to `range.latest`; (d) verify low-water-mark advances after GC (otherwise the tail-drain wasn't effective); (e) **lost-log**: shard's logs pruned with a stream cursor still behind `range.latest` and no matching `tail_drained_streams[...]` marker → cursor NOT GC'd, all reader-driven rules promoted to `mode = scan_only` with `degraded_reason = LOST_LOG`, `lifecycle_lost_log_total{filer_id}` increments, warning log emitted; verify `s3.lifecycle.reseed -ack-lost-log --reason <text>` is the only path that clears the lost-log cursor entries and degraded flag together; (f) cross-stream guard: a tail-drained 30-day original stream does not permit GC of the same filer's 60-day original stream or predicate stream; (g) rejoin guard: when a previously tail-drained shard reappears and is lazily seeded, the matching `tail_drained_streams` marker is cleared.
> **Implementation note:** the shipped reader uses `client.SubscribeMetadata(...)` with a single global `(ts_ns, offset)` cursor (`weed/s3api/s3lifecycle/reader/reader.go`). The filer's `SubscribeMetadata` server already aggregates persisted + in-memory logs across peer filers, so the per-filer-shard cursor map and the `RetainedLogRangePerShard` / `EarliestRetainedPositionPerShard` filer RPCs (Task #19) were not built. The tail-drain matrix and lost-log tests below were written against that unbuilt architecture and are obsoleted; the only residual gap — auto-degrading on `ResumeFromDiskError` — is tracked as a separate narrow follow-up. The retention mode-gate, engine routing, predicate-change pass, SKIPPED_OBJECT_LOCK, TRANSPORT_ERROR, and cursor-trail tests remain valid and were exercised against the production reader/router/engine stack.
- Implement `s3.lifecycle.read` task: cluster singleton, subscribed to one filer at a time. ~~`Filer.CollectLogFileRefs` returns chunk refs in directory order; the worker uses `pb.ReadLogFileRefsWithPosition` (Task #19) to heap-merge per-filer chunks by event ts client-side, with per-shard `MessagePosition` skip filtering.~~ The shipped task uses a direct `SubscribeMetadata` subscription; the filer server-side does the multi-filer merge. Cursor is one global `(ts_ns, offset)` position resumed via `SinceNs`. Failover to another filer endpoint on connection error; cursor is portable (one ts). Periodic checkpointing bounds crash redo.
- ~~**Tail-drain tests required in this phase**~~ — obsoleted by the single-cursor architecture; see Phase 6 for context.
- **Retention mode-gate tests** (per `eventLogHorizon` table, kinds beyond age-based): inject a rule of each reader-driven kind and toggle `metaLogRetention` above and below `eventLogHorizon(rule) + bootstrapLookbackMin`; verify the rule is `event_driven` above and `scan_only` with `degraded_reason = RETENTION_BELOW_HORIZON` below. Cover `EXPIRATION_DAYS`, `NONCURRENT_DAYS`, `ABORT_MPU`, `NEWER_NONCURRENT`, `EXPIRED_DELETE_MARKER`. Verify `EXPIRATION_DATE` bypasses the gate regardless of retention.
- Engine drives event routing: `engine.MatchOriginalWrite(event, delay=D)` and `engine.MatchPredicateChange(event)`.
- **One sweep per delay group** (e.g. `1d`, `30d`, `60d`, `90d`); each sweep advances `reader_state.last_processed_original[D.seconds]` (a `map<filer_id, MessagePosition>` — per-shard cursors, not a single position). Single predicate-change sweep clamped to `min(now - smallDelay, now - flushSafetyLag)` advances `reader_state.last_processed_predicate` (also per-shard).
- **One sweep per delay group** (e.g. `1d`, `30d`, `60d`, `90d`); each sweep advances `reader_state.last_processed_original[D.seconds]` (a single global `(ts_ns, offset)` `MessagePosition`). Single predicate-change sweep clamped to `min(now - smallDelay, now - flushSafetyLag)` advances `reader_state.last_processed_predicate` (also a single global position).
- One filer fetch per event (`fetchLiveEntryOnce`), reused across all candidate rules.
- Per-event resolution-or-stop: `Resolved` outcomes advance the cursor; `TRANSPORT_ERROR` halts the batch and the cursor stays.
- Engine refreshes when the reader observes a lifecycle-xattr `UpdateEntry` event.
@@ -1941,20 +1949,26 @@ Phasing changed: **the back-stamp cannot be removed until the worker can take ov
- `AbortIncompleteMultipartUpload` driven by events under `.uploads/`.
- Tests covering: new version PUT, delete marker creation, non-current expiration, expired-delete-marker, MPU abort.
## Phase 6 — Multi-filer durable replay (mostly done in Phase 3)
## Phase 6 — Multi-filer durable replay (obsoleted)
The shared-reader design (Phase 3) already reads from the shared filer namespace, so multi-filer correctness is largely subsumed. This phase fills in:
**Status: not built. Architecture moved on; the production reader uses the filer's existing aggregation surface.**
- Per-shard cursors keyed by `filerId` are already in the reader-state proto (Phase 3).
- Filer-set discovery: list `/topics/.system/log/<date>/` and observe filerId suffixes (Phase 3).
- Tail-drain GC implementation using `RetainedLogRangePerShard` (Task #19): cursor entries where `cursor >= range.latest` are removed at task entry AND the stream-specific key is recorded in `reader_state.tail_drained_streams`. Phase 3 tests verify single-shard tail-drain; Phase 6 adds multi-filer convergence tests:
- Two filers actively writing, racing writes against the same key — events from both shards merged correctly, per-shard cursors advance independently.
- Filer A departs (stops writing); its logs are still retained → cursor for A continues to advance until A's `range.latest`; not GC'd before then.
- Filer A reaches `range.latest` for one stream (e.g. original delay 30d), then its logs are pruned: probe returns no range, but the matching `tail_drained_streams[(ORIGINAL,30d,A)]` entry is set → that stream cursor is GC'd as safe; no degradation.
- Filer A later reappears and writes retained logs for that same stream: lazy seeding creates a new cursor and clears `tail_drained_streams[(ORIGINAL,30d,A)]`, so the old safe-drain marker cannot mask a future lost-log event.
- **Lost-log: Filer A's logs are pruned BEFORE another stream cursor reaches `range.latest`** (probe returns no range AND the matching `tail_drained_streams[...]` entry is unset): the reader does NOT GC that cursor; instead, all reader-driven rules are promoted to `mode = scan_only` with `degraded_reason = LOST_LOG`; `lifecycle_lost_log_total{filer_id="A"}` increments; warning logged. Verify cursor stays put until `s3.lifecycle.reseed -ack-lost-log --reason <text>` clears the lost-log cursor entries and degraded flag.
- Worker restart mid-merge → resumes per-filer cursors correctly; both shards continue from their persisted positions; `tail_drained_streams` survives restart.
- Failover to a different primary filer mid-batch — cursor portable, no events skipped or replayed.
What the original Phase 6 set out to add — per-filer-shard cursor maps, `EarliestRetainedPositionPerShard` and `RetainedLogRangePerShard` filer RPCs, `tail_drained_streams` GC, `CollectLogFileRefs` heap-merge, lost-log degraded promotion — was designed under the assumption that the lifecycle worker would have to assemble its own multi-filer view because `MetaAggregator` is in-memory only.
`SubscribeMetadata` already does that work server-side: when the filer has peer filers it reads disk-persisted logs from every filer first (via `sendLogFileRefs` → `Filer.CollectLogFileRefs`) and only then drains the in-memory aggregator. A single subscription returns a durable, multi-filer-merged stream. The Phase 3 shipped reader (`weed/s3api/s3lifecycle/reader/reader.go`) consumes that stream with one global `(ts_ns, offset)` cursor and never needed the per-shard cursor map.
What the design's per-shard architecture would have bought us, and why each piece is no longer required:
| Phase 6 mechanism | Why it's no longer required |
|---|---|
| Per-filer-shard cursor map | Single global cursor suffices because the filer presents one merged stream. Resume-replay overhead is bounded by `LogFlushInterval` (~1m) and the dispatcher's tick interval (~200ms), so the worst-case redundant work per batch is one minute of events that the cursor compare immediately skips. |
| `RetainedLogRangePerShard` / `EarliestRetainedPositionPerShard` filer RPCs | No per-shard cursor → no need for per-shard probe. `SubscribeMetadata` already returns `ResumeFromDiskError` when the cursor predates retained logs. |
| Tail-drain GC + `tail_drained_streams` | No per-shard cursor → no entries to GC. Departed filers' events are absorbed into the aggregated stream and the global cursor advances past them. |
| Lazy seeding `seedNewlyDiscoveredShardsLazily` | No per-shard cursor → no shards to seed. New filers' events appear automatically through the aggregator. |
Forward-compat stubs that stay in the proto: `LifecycleState.DegradedReason.LOST_LOG`, `ReaderState.tail_drained_streams`, `last_processed_original` keyed by `filer_id`. They cost nothing and keep the on-disk state file readable if a future deployment ever needs a per-shard architecture.
The single residual gap — and the only piece worth building — is auto-degrading the reader when `SubscribeMetadata` returns `ResumeFromDiskError`. Tracked as a separate narrow follow-up (catch the error in `reader.Run`, mark reader-driven actions `scan_only` with `degraded_reason = LOST_LOG`, increment the counter, advance the cursor to retained-earliest, expose `s3.lifecycle.reseed -ack-lost-log` to flip back to event_driven once safety scans cover the gap). It is a ~50-line change, not a Phase.
## Phase 7 — Observability
@@ -1979,7 +1993,7 @@ The shared-reader design (Phase 3) already reads from the shared filer namespace
- Date-rule deletions (`SCAN_AT_DATE` mode: a single scheduled bucket-level bootstrap at `rule.date` deletes all matches inline).
- Periodic safety scans (re-run bootstrap on the per-kind cadence) catch drift over time.
Not-yet-due age-based discovery via meta-log events arrives in Phase 3 (the reader). Versioned buckets remain unhandled until Phase 5; the worker skips them with a logged warning until then. The existing back-stamp keeps simple `Expiration.Days` working in parallel.
- Phase 3 brings steady-state event-driven expiration. The reader is a cluster singleton subscribed to one filer; the worker heap-merges per-filer log chunks client-side via `pb.ReadLogFileRefsWithPosition` (Task #19) with per-shard `MessagePosition` cursors. Multi-filer clusters work the same as single-filer clusters from this point forward.
- Phase 3 brings steady-state event-driven expiration. The reader is a cluster singleton subscribed to one filer via `SubscribeMetadata`, which the filer's `MetaAggregator` already merges across peer filers. Single global `(ts_ns, offset)` cursor; multi-filer clusters work the same as single-filer clusters from this point forward.
- Phase 4 makes `PUT` constant-time. Required dependency: Phase 2 + Phase 3 shipped and stable.
- Phases 58 complete the surface.
@@ -2073,16 +2087,11 @@ test-fast: build-weed-10sec
Run: `make -C test/s3/lifecycle test-with-server`. Phase mapping: end-to-end tests land in the same phase as the capability they cover.
## Layer 4 — Multi-filer convergence (Phase 6 only)
## Layer 4 — Multi-filer convergence (obsoleted)
Location: `test/plugin_workers/lifecycle/multi_filer_test.go` (component) plus a multi-filer scenario in `test/s3/lifecycle/` if needed.
**Not built.** Phase 6 was obsoleted by `SubscribeMetadata`'s server-side aggregation (see the Phase 6 section). With a single global cursor on a server-merged stream, there are no per-filer-shard convergence properties for the worker to test — any merge correctness lives inside the filer's `MetaAggregator`, which has its own coverage in `weed/filer/`.
Patterns to copy: `test/multi_master/`, `test/metadata_subscribe/` — both spin up multi-node clusters in-process.
Cases:
- Racing writes to the same key on different filers, **including events with identical `event.ts`** — reader merges per-filer logs by `(event.ts, filer_id, event.offset)` (deterministic ordering across shards because `event.offset` is per-buffer/per-filer and not globally unique). Test asserts the same merge order across runs and that equal-ts events from different filers don't swap relative position.
- Worker restart mid-merge → resumes per-filer watermarks correctly.
- Filer leaves cluster → topology refresh skips it; remaining filers' logs continue.
If `ResumeFromDiskError` auto-degrade lands as a follow-up (task #21), its tests live with the reader package, not in a separate multi-filer harness.
## CI gating