diff --git a/S3_LIFECYCLE_REDESIGN.md b/S3_LIFECYCLE_REDESIGN.md index bd539e3ff..357b978ee 100644 --- a/S3_LIFECYCLE_REDESIGN.md +++ b/S3_LIFECYCLE_REDESIGN.md @@ -110,7 +110,9 @@ This is more state than the previous "single subscription, single cursor" claim **Cursor-skip semantics.** `last_processed[filer_id]` points at the *last resolved event* for that shard. Resume must skip events with position `<= cursor[filer_id]` (using the `(ts, offset)` order per shard) — strict `<=`, not `<`, otherwise the last processed event is replayed on every resume. Equivalently: deliver events with `(event.ts, event.offset) > cursor[filer_id]`. Phase 3 introduces a new API variant that takes per-shard `MessagePosition`s and applies this comparator — see "API change" below. The existing `pb.ReadLogFileRefs` filter `logEntry.TsNs <= startTsNs` (`weed/pb/filer_pb_direct_read.go:333`) is the wrong shape because it can't disambiguate equal-ts events. -**Required API addition** (Phase 3 dependency): `pb.ReadLogFileRefsWithPosition` extending the current `ReadLogFileRefs`: +> **Implementation note:** the API addition described below was not built. The shipped reader uses `SubscribeMetadata` with a single global cursor (the filer's server-side `MetaAggregator` already does the merge). The pseudocode is preserved as design context for a future per-shard architecture. + +**~~Required API addition~~ (Phase 3 dependency, not built)**: `pb.ReadLogFileRefsWithPosition` extending the current `ReadLogFileRefs`: - Take a per-shard `start_positions: map` instead of a single `startTsNs`. - Per-event callback signature `eachLogEntry(event, filer_id, position) → CallbackResult`. @@ -565,6 +567,8 @@ Policy changes during a bucket bootstrap: the bootstrap is bound to a specific ` ### Reader — single subscription, client-side merge +> **Implementation note (post-Phase 3):** the production reader uses `client.SubscribeMetadata(...)` directly with a single global cursor. The filer's `SubscribeMetadata` server (`weed/server/filer_grpc_server_sub_meta.go`) already aggregates across peer filers — it reads disk-persisted logs from every filer via `sendLogFileRefs` then drains `MetaAggregator.MetaLogBuffer` for the in-memory tail — so the worker doesn't have to assemble its own multi-filer merge. The per-filer-shard cursor model below was designed before this was wired and was not built; the proto fields (`last_processed_original` map keyed by `filer_id`, `tail_drained_streams`) remain as forward-compat stubs. The pseudocode below describes the alternative the design considered, kept here for context. See "Multi-filer durability" and the obsoleted Phase 6 section. + The reader is one cluster-singleton task subscribed to one filer at a time. `Filer.CollectLogFileRefs(start_position, stop_ts)` returns chunk refs from log files in directory order. The worker passes those refs to `pb.ReadLogFileRefsWithPosition` (the new API — see Task #19), which heap-merges per-filer chunks by event ts client-side and applies the per-shard `(ts, offset)` skip filter. Chunk *bodies* are read in parallel from volume servers via the returned fileIds. Cursors are per-filer-shard. `last_processed_original[D.seconds]` is `map`; `MessagePosition = {ts_ns, offset}` disambiguates equal-ts events within a per-filer chunk. Resume skips events with `(event.ts, event.offset) <= cursor[filer_id]` — strict `<=` so the last resolved event isn't replayed. @@ -1394,6 +1398,8 @@ Why a separate lane matters here: ### Multi-filer durability +> **Implementation note (post-Phase 3):** the worker did not end up building its own multi-filer merge. `SubscribeMetadata` on the filer server (`weed/server/filer_grpc_server_sub_meta.go:154`) routes through `MetaAggregator` when peer filers exist: it reads from every filer's disk-persisted logs first (via `sendLogFileRefs`/`Filer.CollectLogFileRefs`), then drains the aggregator's in-memory tail. Workers get an aggregated, durable stream from a single subscription. The per-filer-shard cursor architecture below describes what would be needed if the worker had to assemble the merge itself; we kept it here as design context and as a reference for the proto stubs (`LOST_LOG`, `tail_drained_streams`) that remain forward-compatible if the architecture ever changes. The genuine residual gap — auto-degrading the worker on `ResumeFromDiskError` — is tracked separately as a narrow follow-up rather than a Phase. + `MetaAggregator` is **in-memory only** — `weed/filer/meta_aggregator.go:39`: *"MetaAggregator only aggregates data 'on the fly'. The logs are not re-persisted to disk. The old data comes from what each LocalMetadata persisted on disk."* So the aggregator can't be the worker's source of truth: a worker restart, a crash, or a peer outage all lose unpersisted aggregated events. The persistent source is per-filer logs at `/topics/.system/log//.` — written by each filer for its own events, durable, and queryable via `Filer.CollectLogFileRefs` and `Filer.ReadPersistedLogBuffer`. @@ -1904,11 +1910,13 @@ Phasing changed: **the back-stamp cannot be removed until the worker can take ov ## Phase 3 — Shared cluster reader + delay-group sweeps (non-versioned) -- Implement `s3.lifecycle.read` task: cluster singleton, subscribed to one filer at a time. `Filer.CollectLogFileRefs` returns chunk refs in directory order; the worker uses `pb.ReadLogFileRefsWithPosition` (Task #19) to heap-merge per-filer chunks by event ts client-side, with per-shard `MessagePosition` skip filtering. Worker reads chunk bodies in parallel from volume servers. Cursors are per-filer-shard: `last_processed_original: map>` and `last_processed_predicate: map` — `LogEntry.Offset` is per-buffer (per filerId), not globally unique, so per-shard cursors are required. Resume skips events with `(ts, offset) <= cursor[filer_id]` (strict `<=`). Listing uses the low-water-mark across per-filer cursors. New shards seeded lazily via `seedNewlyDiscoveredShardsLazily` using `EarliestRetainedPositionPerShard` (Task #19), and seeding clears any matching `tail_drained_streams` marker. Tail-drain GC at task entry uses `RetainedLogRangePerShard` (Task #19) to remove cursor entries where `cursor >= range.latest`, record the matching `tail_drained_streams` key, and downgrade to `scan_only` with `LOST_LOG` if logs vanished before a stream caught up. Failover to another filer endpoint on connection error; per-filer cursor map is portable. Periodic checkpointing (default every 1000 events or 5s) bounds crash redo. -- **Tail-drain tests required in this phase**: (a) cursor at exactly `range.latest` for a shard → tail-drained, entry GC'd, matching `tail_drained_streams` key updated, low-water-mark recovers; (b) cursor before `range.latest` → not GC'd, entry preserved; (c) shard removed from cluster but logs still retained → not GC'd until each stream cursor catches up to `range.latest`; (d) verify low-water-mark advances after GC (otherwise the tail-drain wasn't effective); (e) **lost-log**: shard's logs pruned with a stream cursor still behind `range.latest` and no matching `tail_drained_streams[...]` marker → cursor NOT GC'd, all reader-driven rules promoted to `mode = scan_only` with `degraded_reason = LOST_LOG`, `lifecycle_lost_log_total{filer_id}` increments, warning log emitted; verify `s3.lifecycle.reseed -ack-lost-log --reason ` is the only path that clears the lost-log cursor entries and degraded flag together; (f) cross-stream guard: a tail-drained 30-day original stream does not permit GC of the same filer's 60-day original stream or predicate stream; (g) rejoin guard: when a previously tail-drained shard reappears and is lazily seeded, the matching `tail_drained_streams` marker is cleared. +> **Implementation note:** the shipped reader uses `client.SubscribeMetadata(...)` with a single global `(ts_ns, offset)` cursor (`weed/s3api/s3lifecycle/reader/reader.go`). The filer's `SubscribeMetadata` server already aggregates persisted + in-memory logs across peer filers, so the per-filer-shard cursor map and the `RetainedLogRangePerShard` / `EarliestRetainedPositionPerShard` filer RPCs (Task #19) were not built. The tail-drain matrix and lost-log tests below were written against that unbuilt architecture and are obsoleted; the only residual gap — auto-degrading on `ResumeFromDiskError` — is tracked as a separate narrow follow-up. The retention mode-gate, engine routing, predicate-change pass, SKIPPED_OBJECT_LOCK, TRANSPORT_ERROR, and cursor-trail tests remain valid and were exercised against the production reader/router/engine stack. + +- Implement `s3.lifecycle.read` task: cluster singleton, subscribed to one filer at a time. ~~`Filer.CollectLogFileRefs` returns chunk refs in directory order; the worker uses `pb.ReadLogFileRefsWithPosition` (Task #19) to heap-merge per-filer chunks by event ts client-side, with per-shard `MessagePosition` skip filtering.~~ The shipped task uses a direct `SubscribeMetadata` subscription; the filer server-side does the multi-filer merge. Cursor is one global `(ts_ns, offset)` position resumed via `SinceNs`. Failover to another filer endpoint on connection error; cursor is portable (one ts). Periodic checkpointing bounds crash redo. +- ~~**Tail-drain tests required in this phase**~~ — obsoleted by the single-cursor architecture; see Phase 6 for context. - **Retention mode-gate tests** (per `eventLogHorizon` table, kinds beyond age-based): inject a rule of each reader-driven kind and toggle `metaLogRetention` above and below `eventLogHorizon(rule) + bootstrapLookbackMin`; verify the rule is `event_driven` above and `scan_only` with `degraded_reason = RETENTION_BELOW_HORIZON` below. Cover `EXPIRATION_DAYS`, `NONCURRENT_DAYS`, `ABORT_MPU`, `NEWER_NONCURRENT`, `EXPIRED_DELETE_MARKER`. Verify `EXPIRATION_DATE` bypasses the gate regardless of retention. - Engine drives event routing: `engine.MatchOriginalWrite(event, delay=D)` and `engine.MatchPredicateChange(event)`. -- **One sweep per delay group** (e.g. `1d`, `30d`, `60d`, `90d`); each sweep advances `reader_state.last_processed_original[D.seconds]` (a `map` — per-shard cursors, not a single position). Single predicate-change sweep clamped to `min(now - smallDelay, now - flushSafetyLag)` advances `reader_state.last_processed_predicate` (also per-shard). +- **One sweep per delay group** (e.g. `1d`, `30d`, `60d`, `90d`); each sweep advances `reader_state.last_processed_original[D.seconds]` (a single global `(ts_ns, offset)` `MessagePosition`). Single predicate-change sweep clamped to `min(now - smallDelay, now - flushSafetyLag)` advances `reader_state.last_processed_predicate` (also a single global position). - One filer fetch per event (`fetchLiveEntryOnce`), reused across all candidate rules. - Per-event resolution-or-stop: `Resolved` outcomes advance the cursor; `TRANSPORT_ERROR` halts the batch and the cursor stays. - Engine refreshes when the reader observes a lifecycle-xattr `UpdateEntry` event. @@ -1941,20 +1949,26 @@ Phasing changed: **the back-stamp cannot be removed until the worker can take ov - `AbortIncompleteMultipartUpload` driven by events under `.uploads/`. - Tests covering: new version PUT, delete marker creation, non-current expiration, expired-delete-marker, MPU abort. -## Phase 6 — Multi-filer durable replay (mostly done in Phase 3) +## Phase 6 — Multi-filer durable replay (obsoleted) -The shared-reader design (Phase 3) already reads from the shared filer namespace, so multi-filer correctness is largely subsumed. This phase fills in: +**Status: not built. Architecture moved on; the production reader uses the filer's existing aggregation surface.** -- Per-shard cursors keyed by `filerId` are already in the reader-state proto (Phase 3). -- Filer-set discovery: list `/topics/.system/log//` and observe filerId suffixes (Phase 3). -- Tail-drain GC implementation using `RetainedLogRangePerShard` (Task #19): cursor entries where `cursor >= range.latest` are removed at task entry AND the stream-specific key is recorded in `reader_state.tail_drained_streams`. Phase 3 tests verify single-shard tail-drain; Phase 6 adds multi-filer convergence tests: - - Two filers actively writing, racing writes against the same key — events from both shards merged correctly, per-shard cursors advance independently. - - Filer A departs (stops writing); its logs are still retained → cursor for A continues to advance until A's `range.latest`; not GC'd before then. - - Filer A reaches `range.latest` for one stream (e.g. original delay 30d), then its logs are pruned: probe returns no range, but the matching `tail_drained_streams[(ORIGINAL,30d,A)]` entry is set → that stream cursor is GC'd as safe; no degradation. - - Filer A later reappears and writes retained logs for that same stream: lazy seeding creates a new cursor and clears `tail_drained_streams[(ORIGINAL,30d,A)]`, so the old safe-drain marker cannot mask a future lost-log event. - - **Lost-log: Filer A's logs are pruned BEFORE another stream cursor reaches `range.latest`** (probe returns no range AND the matching `tail_drained_streams[...]` entry is unset): the reader does NOT GC that cursor; instead, all reader-driven rules are promoted to `mode = scan_only` with `degraded_reason = LOST_LOG`; `lifecycle_lost_log_total{filer_id="A"}` increments; warning logged. Verify cursor stays put until `s3.lifecycle.reseed -ack-lost-log --reason ` clears the lost-log cursor entries and degraded flag. - - Worker restart mid-merge → resumes per-filer cursors correctly; both shards continue from their persisted positions; `tail_drained_streams` survives restart. - - Failover to a different primary filer mid-batch — cursor portable, no events skipped or replayed. +What the original Phase 6 set out to add — per-filer-shard cursor maps, `EarliestRetainedPositionPerShard` and `RetainedLogRangePerShard` filer RPCs, `tail_drained_streams` GC, `CollectLogFileRefs` heap-merge, lost-log degraded promotion — was designed under the assumption that the lifecycle worker would have to assemble its own multi-filer view because `MetaAggregator` is in-memory only. + +`SubscribeMetadata` already does that work server-side: when the filer has peer filers it reads disk-persisted logs from every filer first (via `sendLogFileRefs` → `Filer.CollectLogFileRefs`) and only then drains the in-memory aggregator. A single subscription returns a durable, multi-filer-merged stream. The Phase 3 shipped reader (`weed/s3api/s3lifecycle/reader/reader.go`) consumes that stream with one global `(ts_ns, offset)` cursor and never needed the per-shard cursor map. + +What the design's per-shard architecture would have bought us, and why each piece is no longer required: + +| Phase 6 mechanism | Why it's no longer required | +|---|---| +| Per-filer-shard cursor map | Single global cursor suffices because the filer presents one merged stream. Resume-replay overhead is bounded by `LogFlushInterval` (~1m) and the dispatcher's tick interval (~200ms), so the worst-case redundant work per batch is one minute of events that the cursor compare immediately skips. | +| `RetainedLogRangePerShard` / `EarliestRetainedPositionPerShard` filer RPCs | No per-shard cursor → no need for per-shard probe. `SubscribeMetadata` already returns `ResumeFromDiskError` when the cursor predates retained logs. | +| Tail-drain GC + `tail_drained_streams` | No per-shard cursor → no entries to GC. Departed filers' events are absorbed into the aggregated stream and the global cursor advances past them. | +| Lazy seeding `seedNewlyDiscoveredShardsLazily` | No per-shard cursor → no shards to seed. New filers' events appear automatically through the aggregator. | + +Forward-compat stubs that stay in the proto: `LifecycleState.DegradedReason.LOST_LOG`, `ReaderState.tail_drained_streams`, `last_processed_original` keyed by `filer_id`. They cost nothing and keep the on-disk state file readable if a future deployment ever needs a per-shard architecture. + +The single residual gap — and the only piece worth building — is auto-degrading the reader when `SubscribeMetadata` returns `ResumeFromDiskError`. Tracked as a separate narrow follow-up (catch the error in `reader.Run`, mark reader-driven actions `scan_only` with `degraded_reason = LOST_LOG`, increment the counter, advance the cursor to retained-earliest, expose `s3.lifecycle.reseed -ack-lost-log` to flip back to event_driven once safety scans cover the gap). It is a ~50-line change, not a Phase. ## Phase 7 — Observability @@ -1979,7 +1993,7 @@ The shared-reader design (Phase 3) already reads from the shared filer namespace - Date-rule deletions (`SCAN_AT_DATE` mode: a single scheduled bucket-level bootstrap at `rule.date` deletes all matches inline). - Periodic safety scans (re-run bootstrap on the per-kind cadence) catch drift over time. Not-yet-due age-based discovery via meta-log events arrives in Phase 3 (the reader). Versioned buckets remain unhandled until Phase 5; the worker skips them with a logged warning until then. The existing back-stamp keeps simple `Expiration.Days` working in parallel. -- Phase 3 brings steady-state event-driven expiration. The reader is a cluster singleton subscribed to one filer; the worker heap-merges per-filer log chunks client-side via `pb.ReadLogFileRefsWithPosition` (Task #19) with per-shard `MessagePosition` cursors. Multi-filer clusters work the same as single-filer clusters from this point forward. +- Phase 3 brings steady-state event-driven expiration. The reader is a cluster singleton subscribed to one filer via `SubscribeMetadata`, which the filer's `MetaAggregator` already merges across peer filers. Single global `(ts_ns, offset)` cursor; multi-filer clusters work the same as single-filer clusters from this point forward. - Phase 4 makes `PUT` constant-time. Required dependency: Phase 2 + Phase 3 shipped and stable. - Phases 5–8 complete the surface. @@ -2073,16 +2087,11 @@ test-fast: build-weed-10sec Run: `make -C test/s3/lifecycle test-with-server`. Phase mapping: end-to-end tests land in the same phase as the capability they cover. -## Layer 4 — Multi-filer convergence (Phase 6 only) +## Layer 4 — Multi-filer convergence (obsoleted) -Location: `test/plugin_workers/lifecycle/multi_filer_test.go` (component) plus a multi-filer scenario in `test/s3/lifecycle/` if needed. +**Not built.** Phase 6 was obsoleted by `SubscribeMetadata`'s server-side aggregation (see the Phase 6 section). With a single global cursor on a server-merged stream, there are no per-filer-shard convergence properties for the worker to test — any merge correctness lives inside the filer's `MetaAggregator`, which has its own coverage in `weed/filer/`. -Patterns to copy: `test/multi_master/`, `test/metadata_subscribe/` — both spin up multi-node clusters in-process. - -Cases: -- Racing writes to the same key on different filers, **including events with identical `event.ts`** — reader merges per-filer logs by `(event.ts, filer_id, event.offset)` (deterministic ordering across shards because `event.offset` is per-buffer/per-filer and not globally unique). Test asserts the same merge order across runs and that equal-ts events from different filers don't swap relative position. -- Worker restart mid-merge → resumes per-filer watermarks correctly. -- Filer leaves cluster → topology refresh skips it; remaining filers' logs continue. +If `ResumeFromDiskError` auto-degrade lands as a follow-up (task #21), its tests live with the reader package, not in a separate multi-filer harness. ## CI gating