mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
docs(s3/lifecycle): event-driven redesign (#9346)
* docs(s3/lifecycle): event-driven redesign
Replaces the synchronous PUT-handler walk with an event-driven worker
model: meta-log reader subscribed to one filer, client-side heap merge
with per-filer-shard MessagePosition cursors, bucket-level bootstrap
with inline delete, blocked-cursor handling for fatal events, durable
retry budget for sustained-transient promotion, retention mode gate
that downgrades reader-driven rules to scan_only when log retention
falls below the rule's event-log horizon.
* docs(s3/lifecycle): record Phase 0 verified assumptions
ReadPersistedLogBuffer payload carries Extended (event marshaled via
SubscribeMetadataResponse → ToProtoEntry). Meta-log files at
topics/.system/log/<date>/<HH-MM>.<filerId> are written without TtlSec
in filer_notify_append.go; retention is unbounded by default and only
shrinks if an operator sets a filer.conf rule with a TTL on the
SystemLogDir prefix. .versions/ filenames are v_<16h-ts><16h-rand>
with old/new (inverted) format distinguished by threshold
0x4000000000000000; getVersionTimestamp / compareVersionIds give
format-agnostic ordering for successor-version discovery.
* feat(s3/lifecycle): rule evaluator and dueAt helper
Evaluate(rule, info, now) returns the EvalResult by object shape:
IsMPUInit -> AbortMultipartUpload, IsDeleteMarker -> ExpireDeleteMarker
when sole survivor, IsLatest -> DeleteObject (Days or Date), non-current
-> DeleteVersion (Days fallback to ModTime when SuccessorModTime is
zero; NewerNoncurrent retention enforced when both are set).
ComputeDueAt mirrors Evaluate's shape and returns the earliest eligible
wall-clock time for the same (rule, info), used by the reader/bootstrap
to decide pending vs inline-delete.
Adds StatusEnabled/Disabled and SmallDelay consts and an IsMPUInit
flag on ObjectInfo so .uploads/<id>/ entries route to AbortMPU without
overloading IsLatest.
No callers; package compiles standalone.
* feat(s3/lifecycle): MinTriggerAge for safety-scan cadence
Returns the smallest non-zero day threshold across the rule's actions.
Used as max(MinTriggerAge, kindFloor) for the per-kind cadence; date,
count-only, and delete-marker-only rules return 0 so callers fall
through to their kind-specific floor.
* feat(s3/lifecycle): EventLogHorizon for retention mode gate
Returns the maximum event age the reader needs for a rule. Days-based
kinds return their day threshold; pure NewerNoncurrent (count) and
ExpiredObjectDeleteMarker return SmallDelay. Date rules return 0 (the
gate skips them). Multi-action rules take the max — strictest horizon
wins.
Drives the Phase 2 mode gate: metaLogRetention < EventLogHorizon(rule)
+ bootstrapLookbackMin -> scan_only with RETENTION_BELOW_HORIZON.
* feat(s3/lifecycle): RuleHash for per-rule state CAS
sha256 over canonicalized form, first 8 bytes. Stable across tag-key
reorder, prefix trailing-slash variation, ID renames, and Status flips
(state continuity is preserved when an operator toggles
Enabled/Disabled). Different action shapes — different days, filter,
or action type — hash differently.
Used by the per-rule state directory layout
/etc/s3/lifecycle/<bucket>/<rule_hash_hex>/ and by the bootstrap
detector's reconcile-on-PUT.
* feat(s3/lifecycle): add s3_lifecycle.proto storage schema
Defines the durable types backing the lifecycle worker:
LifecycleState (per-rule mode + bootstrap_complete + degraded_reason
incl. RETENTION_BELOW_HORIZON / LOST_LOG), PendingItem, EntryIdentity,
BootstrapState, ReaderState (per-filer-shard cursors plus
tail_drained_streams marker), BlockerRecord (rule_hash optional for
pre-evaluation failures), RetryBudgetEntry with the four-shape
StreamKey oneof, and RetryTarget (no action / no expected_identity —
retry replays handler against current state).
No callers; schema only. Wired into the pb Makefile.
* feat(s3/lifecycle): add S3LifecycleParams to TaskParams
Wires lifecycle subroutines into the existing worker dispatch.
Subtype is READ (cluster-singleton meta-log reader), BOOTSTRAP
(per-bucket walker), or DRAIN (per-rule pending). bucket / rule_hash
populated for the latter two; ContinuationHint is an advisory resume
point for kill-resumable BOOTSTRAP / READ.
oneof tag = 14, after the existing ec_balance_params at 13.
* fix(s3/lifecycle): noncurrent delete markers honor NoncurrentDays
A non-current delete marker is just another version per AWS S3 spec
and is eligible under NoncurrentVersionExpirationDays. The
IsDeleteMarker special case is meant only for the *current* delete
marker (sole-survivor ExpiredObjectDeleteMarker action), so guard
that switch arm with IsLatest. Without IsLatest, the bootstrap walker
silently skipped pre-existing non-current delete markers under a
NoncurrentDays rule because ComputeDueAt returned zero.
Mirrors the same fix in evaluate.go so the runtime decision matches.
* fix(s3/lifecycle): preserve prefix trailing slash in RuleHash
"logs" and "logs/" match different object sets under literal
strings.HasPrefix semantics: "logs" matches "logsmore/x", "logs/" does
not. Collapsing them in the hash would let an XML edit silently bind
the new rule to the previous rule's durable state directory, causing
stale bootstrap_complete and stale pending entries against a rule
that now matches a different set.
* fix(s3/lifecycle): add UNSPECIFIED sentinels to enum zero values
Proto3 best practice: enum 0 should be an _UNSPECIFIED sentinel, not
an active value. Persisted state schemas care most: a partially
populated payload (or one written by an older binary that didn't set
the field) would otherwise silently default to a semantically active
value.
- LifecycleState.RuleKind: shifts EXPIRATION_DAYS..EXPIRED_DELETE_MARKER
from 0..5 to 1..6, with RULE_KIND_UNSPECIFIED at 0.
- LifecycleState.RuleMode: shifts EVENT_DRIVEN..PENDING_BOOTSTRAP from
0..4 to 1..5.
- LifecycleState.DegradedReason: replaces NONE=0 with
DEGRADED_REASON_UNSPECIFIED=0 (operators treat both as healthy).
- StreamKind: shifts ORIGINAL..PENDING from 0..3 to 1..4.
- S3LifecycleParams.Subtype: shifts READ..DRAIN from 0..2 to 1..3, so
an unset subtype no longer routes into the cluster-singleton READ task.
No on-disk state has been written yet; renumbering is safe.
* docs(s3/lifecycle): per-action state, not per-rule
A single AWS lifecycle XML <Rule> can declare multiple actions in
parallel (e.g. ExpirationDays=90 + AbortMPU=7 + NoncurrentDays=30).
Each must drive its own delay/horizon/mode/pending stream
independently. Modeling the rule as one compiled entry with one kind
collapses these — picking the smallest delay (7d MPU) means the 90d
expiration cursor advances past objects that aren't yet due, and the
90d action never re-fires.
Restructure storage to per-action: every XML rule expands into N
compiled actions; state lives at <bucket>/<rule_hash>/<action_kind>/.
The intermediate rule_hash directory keeps a rule's actions grouped
for operator listing. Each action has its own state file with its own
mode + bootstrap_complete + degraded_reason; sibling actions of the
same rule can degrade independently.
* fix(s3/lifecycle): per-action proto schema + safety-scan counters
Realigns the durable schema with the per-action storage model and the
safety-scan contract in the design doc.
- Promote LifecycleState.RuleKind to a top-level ActionKind enum and
rename rule_kind -> action_kind. The same enum now also keys
BootstrapKey, PendingKey, and BlockerRecord so per-action streams
under one rule never collapse.
- LifecycleState now keyed by (rule_hash, action_kind). Added
last_safety_scan_ts_ns / next_safety_scan_ts_ns and the four
observability counters the design specifies (evaluated_total,
expired_total, metadata_only_total, error_total). Dropped
last_evaluated_ns, deleted_total, skipped_object_lock_total, and
pending_size — subsumed or out-of-band.
- BlockerRecord.action_kind is OPTIONAL (UNSPECIFIED for
pre-evaluation failures, just like rule_hash).
No on-disk state has been written yet; the renumbering / rename is
free.
* fix(s3/lifecycle): per-action MinTriggerAge / EventLogHorizon helpers
The retention mode gate and safety-scan cadence run on each compiled
action independently. Taking a "min/max across all actions in the
rule" — as the previous helpers did — was wrong: a 90d ExpirationDays
sibling alongside a 7d AbortMPU action would cause the 90d cursor to
advance at 7d (because MinTriggerAge picked the smallest), and the
ExpirationDays action would never re-fire on objects that aged past
the 7d sweep window before reaching 90d.
Both helpers now take an ActionKind and return the threshold for that
specific action only. Returns 0 if the rule does not declare the
requested kind, which makes the gate a no-op and the cadence fall
through to the kind floor.
Also adds RuleActionKinds(rule), the canonical expansion that the
engine uses at compile time to turn one XML rule into N compiled
actions. NewerNoncurrentVersions paired with NoncurrentDays is
subsumed into a single NONCURRENT_DAYS action (AWS-paired
conditions); only when NewerNoncurrent stands alone does it become
NEWER_NONCURRENT.
* fix(s3/lifecycle): length-prefix RuleHash to remove delimiter ambiguity
The previous encoding used "tag=K=V\n" lines, which is ambiguous: a
tag (a=b, c) serializes identically to (a, b=c). A prefix containing
"\nexp_days=99" could likewise forge an action field. Either could
silently bind two semantically different rules to the same per-rule
state directory.
Switch to a length-prefixed canonical form: each scalar is written as
<field-tag-byte> <uvarint-length> <bytes>. Field-tag bytes namespace
each scalar so ("a=b" tag-key) and ("a" tag-key with "=b" leakage)
can't collide. Three regression tests pin the resistance.
* docs(s3/lifecycle): ActionKey is the engine identity, not rule_hash
Finishes the per-action restructure across the engine pseudocode,
bootstrap completion, drain locks, detector paths, policy CAS, and
Phase 2 plan. Every per-action data structure — engine indexes,
target modes, newly-completed sets, bootstrap completion bits, drain
keys, locks, metrics, status — is now keyed by
ActionKey{rule_hash, action_kind}, not by rule_hash alone.
Without this, sibling actions under one XML rule still shared
scheduling state in the engine: originalDelayGroups holding
[]ruleHash means a rule's 7d AbortMPU and 90d ExpirationDays
collapse into one entry, the smaller delay wins for cursor advance,
and the larger sibling never re-fires.
Helper API tightened: EvaluateAction(rule, kind, info, now) and
ComputeDueAt(rule, kind, info) replace the aggregate-rule signatures
so a caller asks one specific action's eligibility against one
entry, never "any action of this rule." Drain task lock includes
action_kind so siblings have independent re-arm timers. Policy CAS
moves from rule_hash to ActionKey membership.
* fix(s3/lifecycle): kind-aware EvaluateAction / ComputeDueAt
Replaces the aggregate-rule signatures with per-action ones:
EvaluateAction(rule, kind, info, now) and ComputeDueAt(rule, kind,
info). The old Evaluate(rule, info, now) could return the verdict of
ANY action declared on the rule, which sat wrong with the per-action
engine indexing (each ActionKey has its own delay group, mode, and
pending stream).
Each helper now decides exactly one (rule, kind) compiled action's
fate against one entry. Asking for a kind the rule doesn't declare,
or asking against the wrong object shape for that kind, returns
ActionNone / zero — never silently routes to a sibling.
Multi-action regression test: a rule with ExpirationDays=90 and
AbortMPU=7 evaluates each kind independently for the same entry; the
7d window has no influence on the 90d eligibility decision.
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -12,6 +12,8 @@ gen:
|
||||
mkdir -p ./mount_peer_pb
|
||||
protoc mount_peer.proto --go_out=./mount_peer_pb --go-grpc_out=./mount_peer_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
||||
protoc s3.proto --go_out=./s3_pb --go-grpc_out=./s3_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
||||
mkdir -p ./s3_lifecycle_pb
|
||||
protoc s3_lifecycle.proto --go_out=./s3_lifecycle_pb --go_opt=paths=source_relative
|
||||
protoc mq_broker.proto --go_out=./mq_pb --go-grpc_out=./mq_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
||||
protoc mq_schema.proto --go_out=./schema_pb --go-grpc_out=./schema_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
||||
protoc mq_agent.proto --go_out=./mq_agent_pb --go-grpc_out=./mq_agent_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
|
||||
|
||||
@@ -0,0 +1,280 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package s3_lifecycle_pb;
|
||||
|
||||
option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/s3_lifecycle_pb";
|
||||
option java_package = "seaweedfs.client";
|
||||
option java_outer_classname = "S3LifecycleProto";
|
||||
|
||||
// Storage layout reference (see S3_LIFECYCLE_REDESIGN.md):
|
||||
//
|
||||
// /etc/s3/lifecycle/<bucket>/<rule_hash_hex>/<action_kind>/state -> LifecycleState
|
||||
// /etc/s3/lifecycle/<bucket>/<rule_hash_hex>/<action_kind>/pending -> repeated PendingItem
|
||||
// /etc/s3/lifecycle/<bucket>/_bootstrap -> BootstrapState
|
||||
// /etc/s3/lifecycle/_reader/reader_state -> ReaderState
|
||||
// /etc/s3/lifecycle/_reader/blockers -> repeated BlockerRecord
|
||||
// /etc/s3/lifecycle/_reader/retry_budget -> repeated RetryBudgetEntry
|
||||
//
|
||||
// State is per-(rule_hash, action_kind), not per-rule: a single XML <Rule>
|
||||
// with N action sub-elements expands into N compiled actions, each with
|
||||
// its own state file and its own pending file. Sibling actions of the
|
||||
// same rule can degrade or activate independently.
|
||||
|
||||
// ActionKind classifies the lifecycle action a single compiled entry
|
||||
// represents. A single XML rule may produce multiple compiled entries —
|
||||
// one per populated action.
|
||||
enum ActionKind {
|
||||
ACTION_KIND_UNSPECIFIED = 0;
|
||||
EXPIRATION_DAYS = 1; // Expiration.Days
|
||||
EXPIRATION_DATE = 2; // Expiration.Date
|
||||
NONCURRENT_DAYS = 3; // NoncurrentVersionExpiration.NoncurrentDays (with optional NewerNoncurrent retention)
|
||||
NEWER_NONCURRENT = 4; // NoncurrentVersionExpiration.NewerNoncurrentVersions, count-only (no NoncurrentDays)
|
||||
ABORT_MPU = 5; // AbortIncompleteMultipartUpload.DaysAfterInitiation
|
||||
EXPIRED_DELETE_MARKER = 6; // Expiration.ExpiredObjectDeleteMarker
|
||||
}
|
||||
|
||||
// MessagePosition mirrors weed/util/log_buffer.MessagePosition for durable
|
||||
// storage. ts_ns + offset uniquely identifies a position within a per-filer
|
||||
// log buffer; per-shard cursors (see ReaderState) use this shape to skip past
|
||||
// already-processed events when replaying.
|
||||
message MessagePosition {
|
||||
int64 ts_ns = 1;
|
||||
int64 offset = 2;
|
||||
}
|
||||
|
||||
// Cursor map keyed by filer_id. Each delay group's cursor (ReaderState) and
|
||||
// the predicate cursor (ReaderState) are per-filer-shard maps because
|
||||
// LogEntry.Offset is per-filer and not globally unique.
|
||||
message FilerShardCursor {
|
||||
string filer_id = 1;
|
||||
MessagePosition position = 2;
|
||||
}
|
||||
|
||||
// EntryIdentity is the CAS witness used by LifecycleDelete to detect that
|
||||
// the live entry hasn't changed between worker decision and server execution.
|
||||
// Mismatch -> STALE_IDENTITY -> NOOP_RESOLVED.
|
||||
message EntryIdentity {
|
||||
int64 mtime_ns = 1;
|
||||
int64 size = 2;
|
||||
string head_fid = 3;
|
||||
bytes extended_hash = 4; // sha256 over sorted Extended map; cheap and stable
|
||||
}
|
||||
|
||||
// LifecycleState captures the durable per-action scheduling state. Keyed by
|
||||
// (rule_hash, action_kind); persisted at
|
||||
// /etc/s3/lifecycle/<bucket>/<rule_hash_hex>/<action_kind>/state
|
||||
//
|
||||
// A single XML <Rule> with N populated action sub-elements expands into N
|
||||
// LifecycleState records, one per sibling action.
|
||||
message LifecycleState {
|
||||
// Proto3 best practice: zero value is an UNSPECIFIED sentinel so that
|
||||
// legacy / partially-populated payloads don't silently default to a
|
||||
// semantically active value. Persisted state files always set explicit
|
||||
// non-zero values; reading UNSPECIFIED indicates corruption or a missing
|
||||
// field.
|
||||
enum RuleMode {
|
||||
RULE_MODE_UNSPECIFIED = 0;
|
||||
EVENT_DRIVEN = 1;
|
||||
SCAN_AT_DATE = 2;
|
||||
SCAN_ONLY = 3;
|
||||
DISABLED = 4;
|
||||
PENDING_BOOTSTRAP = 5;
|
||||
}
|
||||
// DEGRADED_REASON_UNSPECIFIED replaces the old NONE sentinel: an unset
|
||||
// / zero degraded_reason means "no active degradation" — operators
|
||||
// treat UNSPECIFIED as healthy.
|
||||
enum DegradedReason {
|
||||
DEGRADED_REASON_UNSPECIFIED = 0;
|
||||
LAG_HIGH = 1;
|
||||
PENDING_FULL = 2;
|
||||
DELETE_FAILURES = 3;
|
||||
OPERATOR_PAUSED = 4;
|
||||
RETENTION_BELOW_HORIZON = 5;
|
||||
LOST_LOG = 6;
|
||||
}
|
||||
|
||||
bytes rule_hash = 1; // 8 bytes; matches the parent rule dir
|
||||
ActionKind action_kind = 2; // matches the leaf dir name
|
||||
string rule_id = 3; // display only; identical across sibling actions
|
||||
RuleMode mode = 4;
|
||||
DegradedReason degraded_reason = 5;
|
||||
int64 degraded_since_ns = 6;
|
||||
bool bootstrap_complete = 7;
|
||||
int64 bootstrap_started_at_ns = 8;
|
||||
int64 bootstrap_completed_at_ns = 9;
|
||||
|
||||
// Safety-scan scheduling. Set by the safety-scan job when a SCAN_ONLY
|
||||
// (or periodic safety-scan) bootstrap pass completes; consulted by the
|
||||
// detector to decide whether to emit the next bootstrap task.
|
||||
int64 last_safety_scan_ts_ns = 10;
|
||||
int64 next_safety_scan_ts_ns = 11;
|
||||
|
||||
// Counters for observability; no behavior depends on these.
|
||||
// - evaluated_total: live entries the worker considered for this action
|
||||
// (filter-matched + due-checked); includes objects that ended up not
|
||||
// yet eligible.
|
||||
// - expired_total: successful DONE outcomes (object/version/marker/MPU
|
||||
// removed) under this action.
|
||||
// - metadata_only_total: subset of expired_total where the worker
|
||||
// short-circuited via volume-TTL routing (chunks left to volume GC).
|
||||
// - error_total: outcomes that did NOT advance the cursor under this
|
||||
// action (RETRY_LATER, BLOCKED, FATAL_EVENT_ERROR).
|
||||
int64 evaluated_total = 12;
|
||||
int64 expired_total = 13;
|
||||
int64 metadata_only_total = 14;
|
||||
int64 error_total = 15;
|
||||
}
|
||||
|
||||
// PendingItem records a not-yet-due eligibility for late-predicate exceptions
|
||||
// (e.g. tag added at age 30d on a 60d rule). Drained by s3.lifecycle.drain.
|
||||
// One use only — not the steady-state path.
|
||||
message PendingItem {
|
||||
string object_path = 1;
|
||||
string version_id = 2;
|
||||
int64 due_at_ns = 3;
|
||||
EntryIdentity expected_identity = 4;
|
||||
}
|
||||
|
||||
// BootstrapState tracks the bucket walker's resume point. Operator-resolution
|
||||
// of a BOOTSTRAP-stream blocker does NOT mutate this field — the next
|
||||
// scheduled bootstrap task picks up from last_scanned_path and re-walks.
|
||||
message BootstrapState {
|
||||
string last_scanned_path = 1;
|
||||
int64 started_at_ns = 2;
|
||||
int64 completed_at_ns = 3; // zero while in progress
|
||||
}
|
||||
|
||||
// ReaderState is the cluster-singleton reader's durable cursors. One file at
|
||||
// /etc/s3/lifecycle/_reader/reader_state, written by the singleton task.
|
||||
message ReaderState {
|
||||
string primary_filer_endpoint = 1;
|
||||
|
||||
// Per-delay-group, per-filer-shard cursors. Outer key is delay_seconds.
|
||||
// Inner FilerShardCursor list maps filer_id -> MessagePosition.
|
||||
map<int64, FilerShardCursorList> last_processed_original = 2;
|
||||
|
||||
// Predicate-change pass cursor, keyed by filer_id.
|
||||
repeated FilerShardCursor last_processed_predicate = 3;
|
||||
|
||||
// Filer-shard keys whose cursor reached range.latest and were safely GC'd.
|
||||
// Used by the lost-log gate at task entry to distinguish "F was tail-drained
|
||||
// safely then pruned" from "F's logs vanished before catch-up". Encoded as
|
||||
// stream-specific keys (same shape as RetryBudgetEntry.StreamKey).
|
||||
repeated TailDrainedKey tail_drained_streams = 4;
|
||||
|
||||
int64 last_checkpoint_ns = 5;
|
||||
}
|
||||
|
||||
message FilerShardCursorList {
|
||||
repeated FilerShardCursor cursors = 1;
|
||||
}
|
||||
|
||||
// TailDrainedKey identifies a (stream_kind, shard, [delay]) tuple whose
|
||||
// cursor previously caught up to range.latest. Persisted in ReaderState so
|
||||
// the lost-log gate can distinguish safe drain from data loss.
|
||||
message TailDrainedKey {
|
||||
StreamKind stream_kind = 1;
|
||||
string filer_id = 2;
|
||||
int64 delay_seconds = 3; // populated when stream_kind == ORIGINAL
|
||||
}
|
||||
|
||||
// StreamKind classifies the four blockable streams. ORIGINAL/PREDICATE pause
|
||||
// reader cursors; BOOTSTRAP pauses a bucket walker; PENDING pauses a rule's
|
||||
// drain task. Zero is an UNSPECIFIED sentinel — a BlockerRecord whose
|
||||
// stream_kind reads as UNSPECIFIED is a corruption signal, not a default.
|
||||
enum StreamKind {
|
||||
STREAM_KIND_UNSPECIFIED = 0;
|
||||
ORIGINAL = 1;
|
||||
PREDICATE = 2;
|
||||
BOOTSTRAP = 3;
|
||||
PENDING = 4;
|
||||
}
|
||||
|
||||
// BlockerRecord is the durable record of a paused stream. The cursor stays
|
||||
// at the failing position until the operator clears it via
|
||||
// `s3.lifecycle.blockers retry|resume|quarantine`. There is no automatic
|
||||
// dead-letter — silently routing a failed delete decision aside is unsafe
|
||||
// for lifecycle.
|
||||
message BlockerRecord {
|
||||
StreamKind stream_kind = 1;
|
||||
|
||||
// Stream-specific scoping; populated per stream_kind.
|
||||
string shard = 2; // filer_id; ORIGINAL/PREDICATE only
|
||||
int64 delay_seconds = 3; // ORIGINAL only
|
||||
MessagePosition position = 4; // ORIGINAL/PREDICATE only
|
||||
|
||||
// Common context. bucket/object_path/version_id always populated.
|
||||
// rule_hash and action_kind are OPTIONAL: empty / UNSPECIFIED for
|
||||
// pre-evaluation failures (e.g. handleEvent fetchLive FATAL where no
|
||||
// action has been evaluated). When populated, both are set together —
|
||||
// the failure is bound to a specific (rule, action) pair.
|
||||
bytes rule_hash = 5;
|
||||
string bucket = 6;
|
||||
string object_path = 7;
|
||||
string version_id = 8;
|
||||
ActionKind action_kind = 14; // optional; UNSPECIFIED for pre-evaluation failures
|
||||
|
||||
string reason = 9; // "FATAL_EVENT_ERROR: malformed entry"
|
||||
string last_error = 10; // raw error string from the last attempt
|
||||
int64 first_seen_at_ns = 11;
|
||||
int64 last_retry_at_ns = 12;
|
||||
int32 retry_count = 13;
|
||||
}
|
||||
|
||||
// StreamKey is the shape used by retry-budget tracking. One of four shapes
|
||||
// per stream_kind, since the identifying tuple differs.
|
||||
message StreamKey {
|
||||
oneof key {
|
||||
OriginalKey original = 1;
|
||||
PredicateKey predicate = 2;
|
||||
BootstrapKey bootstrap = 3;
|
||||
PendingKey pending = 4;
|
||||
}
|
||||
}
|
||||
|
||||
message OriginalKey {
|
||||
string shard = 1;
|
||||
int64 delay_seconds = 2;
|
||||
MessagePosition position = 3;
|
||||
}
|
||||
message PredicateKey {
|
||||
string shard = 1;
|
||||
MessagePosition position = 2;
|
||||
}
|
||||
message BootstrapKey {
|
||||
string bucket = 1;
|
||||
string object_path = 2;
|
||||
string version_id = 3;
|
||||
bytes rule_hash = 4;
|
||||
ActionKind action_kind = 5; // distinguishes per-action streams under one rule
|
||||
}
|
||||
message PendingKey {
|
||||
string bucket = 1;
|
||||
bytes rule_hash = 2;
|
||||
string object_path = 3;
|
||||
string version_id = 4;
|
||||
ActionKind action_kind = 5;
|
||||
}
|
||||
|
||||
// RetryBudgetEntry tracks consecutive RETRY_LATER outcomes for a stream key.
|
||||
// Promotes to BLOCKED when consecutive_retries >= retryBudgetMax (30) or age
|
||||
// >= retryBudgetMaxAge (4h). Compacts on success (clearRetryBudget).
|
||||
message RetryBudgetEntry {
|
||||
StreamKey key = 1;
|
||||
int32 consecutive_retries = 2;
|
||||
int64 first_seen_at_ns = 3;
|
||||
int64 last_retry_at_ns = 4;
|
||||
}
|
||||
|
||||
// RetryTarget carries routing info needed to (a) write a BlockerRecord on
|
||||
// promotion and (b) re-invoke the right primitive on operator retry. We
|
||||
// deliberately do NOT carry expected_identity or action: the handler
|
||||
// re-fetches live state and re-evaluates from scratch.
|
||||
message RetryTarget {
|
||||
StreamKey key = 1;
|
||||
string bucket = 2;
|
||||
string object_path = 3;
|
||||
string version_id = 4;
|
||||
bytes rule_hash = 5;
|
||||
ActionKind action_kind = 6;
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -112,9 +112,47 @@ message TaskParams {
|
||||
BalanceTaskParams balance_params = 11;
|
||||
ReplicationTaskParams replication_params = 12;
|
||||
EcBalanceTaskParams ec_balance_params = 13;
|
||||
S3LifecycleParams s3_lifecycle_params = 14;
|
||||
}
|
||||
}
|
||||
|
||||
// S3LifecycleParams routes a worker task to one of the three lifecycle
|
||||
// subroutines. READ is the cluster-singleton meta-log reader (one task at
|
||||
// a time); BOOTSTRAP walks a single bucket; DRAIN drains pending exceptions
|
||||
// for a single rule.
|
||||
message S3LifecycleParams {
|
||||
// Zero is an UNSPECIFIED sentinel: a TaskParams payload whose subtype is
|
||||
// unset must not silently route into the cluster-singleton READ task.
|
||||
// Callers always populate one of READ / BOOTSTRAP / DRAIN.
|
||||
enum Subtype {
|
||||
SUBTYPE_UNSPECIFIED = 0;
|
||||
READ = 1;
|
||||
BOOTSTRAP = 2;
|
||||
DRAIN = 3;
|
||||
}
|
||||
Subtype subtype = 1;
|
||||
|
||||
// Required for BOOTSTRAP and DRAIN; ignored for READ. rule_hash is
|
||||
// optional for BOOTSTRAP (omitting walks all rules for the bucket).
|
||||
string bucket = 2;
|
||||
bytes rule_hash = 3; // 8 bytes when present
|
||||
|
||||
bool force = 4; // operator override; bypasses scheduling guards
|
||||
int64 batch_time_budget_ns = 5; // 0 = use default
|
||||
int32 batch_event_budget = 6; // 0 = use default
|
||||
|
||||
ContinuationHint continuation = 7; // resume hint for kill-resume tasks
|
||||
}
|
||||
|
||||
// ContinuationHint lets a long-running BOOTSTRAP or READ task hand its
|
||||
// resume point to the next scheduled invocation without going through
|
||||
// durable state. The durable state files are still authoritative; this
|
||||
// is just a scheduling hint to skip an unnecessary fresh start.
|
||||
message ContinuationHint {
|
||||
string last_scanned_path = 1; // BOOTSTRAP only
|
||||
int64 last_position_ns = 2; // READ only (advisory)
|
||||
}
|
||||
|
||||
// VacuumTaskParams for vacuum operations
|
||||
message VacuumTaskParams {
|
||||
double garbage_threshold = 1; // Minimum garbage ratio to trigger vacuum
|
||||
|
||||
+453
-202
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,76 @@
|
||||
package s3lifecycle
|
||||
|
||||
// ActionKind identifies a single compiled lifecycle action under one XML
|
||||
// rule. A single XML <Rule> may declare multiple action sub-elements in
|
||||
// parallel, each yielding a separate compiled action with its own delay
|
||||
// group, mode, pending stream, and durable state directory.
|
||||
//
|
||||
// The values here mirror the wire-form ActionKind enum in
|
||||
// weed/pb/s3_lifecycle.proto (offset by the UNSPECIFIED sentinel at 0).
|
||||
type ActionKind int
|
||||
|
||||
const (
|
||||
ActionKindUnspecified ActionKind = iota // matches proto ACTION_KIND_UNSPECIFIED
|
||||
ActionKindExpirationDays // Expiration.Days
|
||||
ActionKindExpirationDate // Expiration.Date
|
||||
ActionKindNoncurrentDays // NoncurrentVersionExpiration.NoncurrentDays (with optional NewerNoncurrent retention)
|
||||
ActionKindNewerNoncurrent // NoncurrentVersionExpiration.NewerNoncurrentVersions (count-only, no NoncurrentDays)
|
||||
ActionKindAbortMPU // AbortIncompleteMultipartUpload.DaysAfterInitiation
|
||||
ActionKindExpiredDeleteMarker // Expiration.ExpiredObjectDeleteMarker
|
||||
)
|
||||
|
||||
// String returns the leaf-directory name used in
|
||||
// /etc/s3/lifecycle/<bucket>/<rule_hash>/<action_kind>/.
|
||||
func (k ActionKind) String() string {
|
||||
switch k {
|
||||
case ActionKindExpirationDays:
|
||||
return "expiration_days"
|
||||
case ActionKindExpirationDate:
|
||||
return "expiration_date"
|
||||
case ActionKindNoncurrentDays:
|
||||
return "noncurrent_days"
|
||||
case ActionKindNewerNoncurrent:
|
||||
return "newer_noncurrent"
|
||||
case ActionKindAbortMPU:
|
||||
return "abort_mpu"
|
||||
case ActionKindExpiredDeleteMarker:
|
||||
return "expired_delete_marker"
|
||||
default:
|
||||
return "unspecified"
|
||||
}
|
||||
}
|
||||
|
||||
// RuleActionKinds returns the compiled actions a single XML rule expands to.
|
||||
// Empty when no action sub-element is populated. Order is deterministic so
|
||||
// callers can hash / iterate stably:
|
||||
//
|
||||
// EXPIRATION_DAYS, EXPIRATION_DATE, EXPIRED_DELETE_MARKER,
|
||||
// NONCURRENT_DAYS, NEWER_NONCURRENT, ABORT_MPU
|
||||
//
|
||||
// Note: NewerNoncurrentVersions is paired with NoncurrentDays into a single
|
||||
// NONCURRENT_DAYS action when both are set; only when NewerNoncurrent is set
|
||||
// alone (no day threshold) does it produce a NEWER_NONCURRENT action.
|
||||
func RuleActionKinds(rule *Rule) []ActionKind {
|
||||
if rule == nil {
|
||||
return nil
|
||||
}
|
||||
var kinds []ActionKind
|
||||
if rule.ExpirationDays > 0 {
|
||||
kinds = append(kinds, ActionKindExpirationDays)
|
||||
}
|
||||
if !rule.ExpirationDate.IsZero() {
|
||||
kinds = append(kinds, ActionKindExpirationDate)
|
||||
}
|
||||
if rule.ExpiredObjectDeleteMarker {
|
||||
kinds = append(kinds, ActionKindExpiredDeleteMarker)
|
||||
}
|
||||
if rule.NoncurrentVersionExpirationDays > 0 {
|
||||
kinds = append(kinds, ActionKindNoncurrentDays)
|
||||
} else if rule.NewerNoncurrentVersions > 0 {
|
||||
kinds = append(kinds, ActionKindNewerNoncurrent)
|
||||
}
|
||||
if rule.AbortMPUDaysAfterInitiation > 0 {
|
||||
kinds = append(kinds, ActionKindAbortMPU)
|
||||
}
|
||||
return kinds
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
package s3lifecycle
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRuleActionKinds_SingleAction(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
rule *Rule
|
||||
want []ActionKind
|
||||
}{
|
||||
{"expiration_days", &Rule{ExpirationDays: 30}, []ActionKind{ActionKindExpirationDays}},
|
||||
{"expiration_date", &Rule{ExpirationDate: mustTime(t, "2025-06-15T00:00:00Z")}, []ActionKind{ActionKindExpirationDate}},
|
||||
{"expired_delete_marker", &Rule{ExpiredObjectDeleteMarker: true}, []ActionKind{ActionKindExpiredDeleteMarker}},
|
||||
{"noncurrent_days", &Rule{NoncurrentVersionExpirationDays: 30}, []ActionKind{ActionKindNoncurrentDays}},
|
||||
{"newer_noncurrent_alone", &Rule{NewerNoncurrentVersions: 3}, []ActionKind{ActionKindNewerNoncurrent}},
|
||||
{"abort_mpu", &Rule{AbortMPUDaysAfterInitiation: 7}, []ActionKind{ActionKindAbortMPU}},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
if got := RuleActionKinds(c.rule); !reflect.DeepEqual(got, c.want) {
|
||||
t.Fatalf("want %v, got %v", c.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleActionKinds_MultiAction(t *testing.T) {
|
||||
// The bug this fixes: a single XML <Rule> can declare ExpirationDays,
|
||||
// AbortMPU, and NoncurrentDays in parallel. Each must compile to its
|
||||
// own action with its own delay group / mode / pending — not be
|
||||
// collapsed into one entry whose delay is the smallest.
|
||||
rule := &Rule{
|
||||
ExpirationDays: 90,
|
||||
AbortMPUDaysAfterInitiation: 7,
|
||||
NoncurrentVersionExpirationDays: 30,
|
||||
}
|
||||
want := []ActionKind{
|
||||
ActionKindExpirationDays,
|
||||
ActionKindNoncurrentDays,
|
||||
ActionKindAbortMPU,
|
||||
}
|
||||
if got := RuleActionKinds(rule); !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("want %v, got %v", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleActionKinds_NoncurrentDaysSubsumesNewerNoncurrent(t *testing.T) {
|
||||
// When both NoncurrentDays and NewerNoncurrent are set on the same rule,
|
||||
// they are AWS-paired conditions for ONE noncurrent expiration action,
|
||||
// not two; only NoncurrentDays is emitted.
|
||||
rule := &Rule{NoncurrentVersionExpirationDays: 30, NewerNoncurrentVersions: 3}
|
||||
want := []ActionKind{ActionKindNoncurrentDays}
|
||||
if got := RuleActionKinds(rule); !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("want %v, got %v", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleActionKinds_Empty(t *testing.T) {
|
||||
if got := RuleActionKinds(&Rule{}); len(got) != 0 {
|
||||
t.Fatalf("want empty, got %v", got)
|
||||
}
|
||||
if got := RuleActionKinds(nil); got != nil {
|
||||
t.Fatalf("want nil, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActionKind_StringIsStable(t *testing.T) {
|
||||
// The string is the leaf-directory name on disk; renaming would
|
||||
// orphan existing state, so pin the values.
|
||||
cases := map[ActionKind]string{
|
||||
ActionKindExpirationDays: "expiration_days",
|
||||
ActionKindExpirationDate: "expiration_date",
|
||||
ActionKindNoncurrentDays: "noncurrent_days",
|
||||
ActionKindNewerNoncurrent: "newer_noncurrent",
|
||||
ActionKindAbortMPU: "abort_mpu",
|
||||
ActionKindExpiredDeleteMarker: "expired_delete_marker",
|
||||
}
|
||||
for k, want := range cases {
|
||||
if got := k.String(); got != want {
|
||||
t.Fatalf("ActionKind(%d).String() = %q, want %q", k, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package s3lifecycle
|
||||
|
||||
import "time"
|
||||
|
||||
// ComputeDueAt returns the earliest wall-clock time the (rule, kind) compiled
|
||||
// action can fire for info given the object's current shape. Returns the
|
||||
// zero time when the action cannot fire for this entry (filter rejects, kind
|
||||
// not declared on the rule, wrong object shape, etc.).
|
||||
//
|
||||
// Used by the reader/bootstrap to decide pending-vs-inline-delete for one
|
||||
// specific action. Sibling actions of the same XML rule are computed
|
||||
// separately so a rule's 7d AbortMPU due time does not influence its 90d
|
||||
// ExpirationDays sibling.
|
||||
func ComputeDueAt(rule *Rule, kind ActionKind, info *ObjectInfo) time.Time {
|
||||
if rule == nil || info == nil || rule.Status != StatusEnabled {
|
||||
return time.Time{}
|
||||
}
|
||||
if !filterMatches(rule, info) {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
switch kind {
|
||||
case ActionKindAbortMPU:
|
||||
if info.IsMPUInit && rule.AbortMPUDaysAfterInitiation > 0 {
|
||||
return info.ModTime.AddDate(0, 0, rule.AbortMPUDaysAfterInitiation)
|
||||
}
|
||||
case ActionKindExpiredDeleteMarker:
|
||||
if info.IsLatest && info.IsDeleteMarker && rule.ExpiredObjectDeleteMarker && info.NumVersions == 1 {
|
||||
return info.ModTime
|
||||
}
|
||||
case ActionKindExpirationDays:
|
||||
if info.IsLatest && !info.IsDeleteMarker && rule.ExpirationDays > 0 {
|
||||
return info.ModTime.AddDate(0, 0, rule.ExpirationDays)
|
||||
}
|
||||
case ActionKindExpirationDate:
|
||||
if info.IsLatest && !info.IsDeleteMarker && !rule.ExpirationDate.IsZero() {
|
||||
return rule.ExpirationDate
|
||||
}
|
||||
case ActionKindNoncurrentDays:
|
||||
if !info.IsLatest && rule.NoncurrentVersionExpirationDays > 0 {
|
||||
base := info.SuccessorModTime
|
||||
if base.IsZero() {
|
||||
base = info.ModTime
|
||||
}
|
||||
return base.AddDate(0, 0, rule.NoncurrentVersionExpirationDays)
|
||||
}
|
||||
case ActionKindNewerNoncurrent:
|
||||
// Pure count-based: only when NoncurrentDays is unset.
|
||||
if !info.IsLatest && rule.NoncurrentVersionExpirationDays == 0 && rule.NewerNoncurrentVersions > 0 {
|
||||
if !info.SuccessorModTime.IsZero() {
|
||||
return info.SuccessorModTime
|
||||
}
|
||||
return info.ModTime
|
||||
}
|
||||
}
|
||||
return time.Time{}
|
||||
}
|
||||
@@ -0,0 +1,102 @@
|
||||
package s3lifecycle
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestComputeDueAt_ExpirationDays(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDays: 30}
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mod}
|
||||
want := mod.AddDate(0, 0, 30)
|
||||
if got := ComputeDueAt(rule, ActionKindExpirationDays, info); !got.Equal(want) {
|
||||
t.Fatalf("want %v, got %v", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeDueAt_ExpirationDate(t *testing.T) {
|
||||
date := mustTime(t, "2025-06-15T00:00:00Z")
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDate: date}
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mustTime(t, "2024-01-01T00:00:00Z")}
|
||||
if got := ComputeDueAt(rule, ActionKindExpirationDate, info); !got.Equal(date) {
|
||||
t.Fatalf("want %v, got %v", date, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeDueAt_DeleteMarker_NotSoleSurvivor(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, ExpiredObjectDeleteMarker: true}
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, IsDeleteMarker: true, NumVersions: 3}
|
||||
if got := ComputeDueAt(rule, ActionKindExpiredDeleteMarker, info); !got.IsZero() {
|
||||
t.Fatalf("non-sole marker should be zero, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeDueAt_KindNotDeclared(t *testing.T) {
|
||||
// Asking for a kind the rule doesn't declare returns zero.
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDays: 1}
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mustTime(t, "2024-01-01T00:00:00Z")}
|
||||
if got := ComputeDueAt(rule, ActionKindAbortMPU, info); !got.IsZero() {
|
||||
t.Fatalf("undeclared kind should be zero, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeDueAt_WrongShapeForKind(t *testing.T) {
|
||||
// Non-current object asked under EXPIRATION_DAYS (which is current-only): zero.
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDays: 1}
|
||||
info := &ObjectInfo{Key: "a", IsLatest: false, ModTime: mustTime(t, "2024-01-01T00:00:00Z")}
|
||||
if got := ComputeDueAt(rule, ActionKindExpirationDays, info); !got.IsZero() {
|
||||
t.Fatalf("non-current under current rule should be zero, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeDueAt_NoncurrentDeleteMarkerHonorsNoncurrentDays(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, NoncurrentVersionExpirationDays: 7}
|
||||
successor := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{
|
||||
Key: "a",
|
||||
IsLatest: false,
|
||||
IsDeleteMarker: true,
|
||||
NumVersions: 3,
|
||||
SuccessorModTime: successor,
|
||||
}
|
||||
want := successor.AddDate(0, 0, 7)
|
||||
if got := ComputeDueAt(rule, ActionKindNoncurrentDays, info); !got.Equal(want) {
|
||||
t.Fatalf("want %v, got %v", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeDueAt_NoncurrentSuccessorMtime(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, NoncurrentVersionExpirationDays: 30}
|
||||
successor := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{Key: "a", IsLatest: false, SuccessorModTime: successor}
|
||||
want := successor.AddDate(0, 0, 30)
|
||||
if got := ComputeDueAt(rule, ActionKindNoncurrentDays, info); !got.Equal(want) {
|
||||
t.Fatalf("want %v, got %v", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeDueAt_FilterRejects(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, Prefix: "logs/", ExpirationDays: 1}
|
||||
info := &ObjectInfo{Key: "data/x", IsLatest: true, ModTime: mustTime(t, "2024-01-01T00:00:00Z")}
|
||||
if got := ComputeDueAt(rule, ActionKindExpirationDays, info); !got.IsZero() {
|
||||
t.Fatalf("filter rejection should be zero, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeDueAt_DisabledRule(t *testing.T) {
|
||||
rule := &Rule{Status: StatusDisabled, ExpirationDays: 1}
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mustTime(t, "2024-01-01T00:00:00Z")}
|
||||
if got := ComputeDueAt(rule, ActionKindExpirationDays, info); !got.IsZero() {
|
||||
t.Fatalf("disabled rule should be zero, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestComputeDueAt_MPUInit(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, AbortMPUDaysAfterInitiation: 7}
|
||||
init := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{Key: ".uploads/u1/", IsMPUInit: true, ModTime: init}
|
||||
want := init.AddDate(0, 0, 7)
|
||||
if got := ComputeDueAt(rule, ActionKindAbortMPU, info); !got.Equal(want) {
|
||||
t.Fatalf("want %v, got %v", want, got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,127 @@
|
||||
package s3lifecycle
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// EvaluateAction decides whether the (rule, kind) compiled action fires for
|
||||
// info at the given wall-clock time. Returns ActionNone when the rule is
|
||||
// disabled, the filter rejects, the object shape doesn't match this kind, or
|
||||
// the kind isn't yet due.
|
||||
//
|
||||
// One XML rule may declare multiple actions in parallel. The engine compiles
|
||||
// each into its own ActionKey and calls EvaluateAction once per (rule, kind,
|
||||
// entry); sibling actions are independent and may produce different verdicts
|
||||
// for the same entry. Callers that need to evaluate every action of a rule
|
||||
// iterate `RuleActionKinds(rule)` and call this helper per kind.
|
||||
//
|
||||
// Action selection by object shape per kind:
|
||||
//
|
||||
// IsMPUInit + ActionKindAbortMPU -> AbortIncompleteMultipartUpload
|
||||
// IsLatest && IsDeleteMarker + ActionKindExpiredDeleteMarker -> ExpiredObjectDeleteMarker (sole survivor)
|
||||
// IsLatest + ActionKindExpirationDays -> DeleteObject (Days threshold)
|
||||
// IsLatest + ActionKindExpirationDate -> DeleteObject (date threshold)
|
||||
// non-current + ActionKindNoncurrentDays -> DeleteVersion (Days + NewerNoncurrent retention)
|
||||
// non-current + ActionKindNewerNoncurrent -> DeleteVersion (count-only)
|
||||
//
|
||||
// Per AWS S3 semantics, a non-current delete marker is treated as a regular
|
||||
// non-current version under NONCURRENT_DAYS / NEWER_NONCURRENT. Only the
|
||||
// *current* delete marker (and only as sole survivor) routes to
|
||||
// EXPIRED_DELETE_MARKER.
|
||||
func EvaluateAction(rule *Rule, kind ActionKind, info *ObjectInfo, now time.Time) EvalResult {
|
||||
none := EvalResult{Action: ActionNone}
|
||||
if rule == nil || info == nil || rule.Status != StatusEnabled {
|
||||
return none
|
||||
}
|
||||
if !filterMatches(rule, info) {
|
||||
return none
|
||||
}
|
||||
|
||||
switch kind {
|
||||
case ActionKindAbortMPU:
|
||||
if !info.IsMPUInit || rule.AbortMPUDaysAfterInitiation <= 0 {
|
||||
return none
|
||||
}
|
||||
due := info.ModTime.AddDate(0, 0, rule.AbortMPUDaysAfterInitiation)
|
||||
if now.Before(due) {
|
||||
return none
|
||||
}
|
||||
return EvalResult{Action: ActionAbortMultipartUpload, RuleID: rule.ID}
|
||||
|
||||
case ActionKindExpiredDeleteMarker:
|
||||
if !info.IsLatest || !info.IsDeleteMarker || !rule.ExpiredObjectDeleteMarker {
|
||||
return none
|
||||
}
|
||||
if info.NumVersions != 1 {
|
||||
return none
|
||||
}
|
||||
return EvalResult{Action: ActionExpireDeleteMarker, RuleID: rule.ID}
|
||||
|
||||
case ActionKindExpirationDays:
|
||||
if !info.IsLatest || info.IsDeleteMarker || rule.ExpirationDays <= 0 {
|
||||
return none
|
||||
}
|
||||
due := info.ModTime.AddDate(0, 0, rule.ExpirationDays)
|
||||
if now.Before(due) {
|
||||
return none
|
||||
}
|
||||
return EvalResult{Action: ActionDeleteObject, RuleID: rule.ID}
|
||||
|
||||
case ActionKindExpirationDate:
|
||||
if !info.IsLatest || info.IsDeleteMarker || rule.ExpirationDate.IsZero() {
|
||||
return none
|
||||
}
|
||||
if now.Before(rule.ExpirationDate) {
|
||||
return none
|
||||
}
|
||||
return EvalResult{Action: ActionDeleteObject, RuleID: rule.ID}
|
||||
|
||||
case ActionKindNoncurrentDays:
|
||||
if info.IsLatest || rule.NoncurrentVersionExpirationDays <= 0 {
|
||||
return none
|
||||
}
|
||||
base := info.SuccessorModTime
|
||||
if base.IsZero() {
|
||||
base = info.ModTime
|
||||
}
|
||||
due := base.AddDate(0, 0, rule.NoncurrentVersionExpirationDays)
|
||||
if now.Before(due) {
|
||||
return none
|
||||
}
|
||||
if rule.NewerNoncurrentVersions > 0 && info.NoncurrentIndex < rule.NewerNoncurrentVersions {
|
||||
return none
|
||||
}
|
||||
return EvalResult{Action: ActionDeleteVersion, RuleID: rule.ID}
|
||||
|
||||
case ActionKindNewerNoncurrent:
|
||||
// Pure count-based: only when NoncurrentDays is unset (when paired,
|
||||
// the rule expands to NONCURRENT_DAYS instead — see RuleActionKinds).
|
||||
if info.IsLatest || rule.NoncurrentVersionExpirationDays > 0 || rule.NewerNoncurrentVersions <= 0 {
|
||||
return none
|
||||
}
|
||||
if info.NoncurrentIndex < rule.NewerNoncurrentVersions {
|
||||
return none
|
||||
}
|
||||
return EvalResult{Action: ActionDeleteVersion, RuleID: rule.ID}
|
||||
}
|
||||
return none
|
||||
}
|
||||
|
||||
func filterMatches(rule *Rule, info *ObjectInfo) bool {
|
||||
if rule.Prefix != "" && !strings.HasPrefix(info.Key, rule.Prefix) {
|
||||
return false
|
||||
}
|
||||
if rule.FilterSizeGreaterThan > 0 && info.Size <= rule.FilterSizeGreaterThan {
|
||||
return false
|
||||
}
|
||||
if rule.FilterSizeLessThan > 0 && info.Size >= rule.FilterSizeLessThan {
|
||||
return false
|
||||
}
|
||||
for k, v := range rule.FilterTags {
|
||||
if got, ok := info.Tags[k]; !ok || got != v {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -0,0 +1,270 @@
|
||||
package s3lifecycle
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func mustTime(t *testing.T, s string) time.Time {
|
||||
t.Helper()
|
||||
tm, err := time.Parse(time.RFC3339, s)
|
||||
if err != nil {
|
||||
t.Fatalf("parse %s: %v", s, err)
|
||||
}
|
||||
return tm
|
||||
}
|
||||
|
||||
func TestEvaluateAction_DisabledRuleNeverFires(t *testing.T) {
|
||||
rule := &Rule{Status: StatusDisabled, ExpirationDays: 1}
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mustTime(t, "2024-01-01T00:00:00Z")}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, mustTime(t, "2030-01-01T00:00:00Z")); got.Action != ActionNone {
|
||||
t.Fatalf("disabled rule should be no-op, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_NilInputs(t *testing.T) {
|
||||
if EvaluateAction(nil, ActionKindExpirationDays, &ObjectInfo{}, time.Now()).Action != ActionNone {
|
||||
t.Fatalf("nil rule should be no-op")
|
||||
}
|
||||
if EvaluateAction(&Rule{Status: StatusEnabled}, ActionKindExpirationDays, nil, time.Now()).Action != ActionNone {
|
||||
t.Fatalf("nil info should be no-op")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_ExpirationDaysBoundary(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, ID: "r", ExpirationDays: 30}
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mod}
|
||||
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, mod.AddDate(0, 0, 30).Add(-time.Nanosecond)); got.Action != ActionNone {
|
||||
t.Fatalf("not yet due, got %v", got)
|
||||
}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, mod.AddDate(0, 0, 30)); got.Action != ActionDeleteObject || got.RuleID != "r" {
|
||||
t.Fatalf("at boundary, got %v", got)
|
||||
}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, mod.AddDate(0, 1, 0)); got.Action != ActionDeleteObject {
|
||||
t.Fatalf("past due, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_KindFiltersByDeclaredAction(t *testing.T) {
|
||||
// Asking the wrong kind of a rule that doesn't declare that action returns
|
||||
// ActionNone — even when the object would otherwise match.
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDays: 30}
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mod}
|
||||
now := mod.AddDate(0, 1, 0)
|
||||
if got := EvaluateAction(rule, ActionKindAbortMPU, info, now); got.Action != ActionNone {
|
||||
t.Fatalf("AbortMPU not declared, got %v", got)
|
||||
}
|
||||
if got := EvaluateAction(rule, ActionKindNoncurrentDays, info, now); got.Action != ActionNone {
|
||||
t.Fatalf("NoncurrentDays not declared, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_MultiActionRule_SiblingsIndependent(t *testing.T) {
|
||||
// The bug this fixes: a rule with both ExpirationDays=90 and AbortMPU=7
|
||||
// is evaluated per-action; the 7d MPU window not being "due" for an
|
||||
// object event has no effect on the 90d expiration's eligibility.
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDays: 90, AbortMPUDaysAfterInitiation: 7}
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mod}
|
||||
|
||||
// Past the 90d threshold: ExpirationDays fires.
|
||||
now := mod.AddDate(0, 0, 91)
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, now); got.Action != ActionDeleteObject {
|
||||
t.Fatalf("90d action should fire, got %v", got)
|
||||
}
|
||||
// AbortMPU asked against a non-MPU entry: no-op.
|
||||
if got := EvaluateAction(rule, ActionKindAbortMPU, info, now); got.Action != ActionNone {
|
||||
t.Fatalf("AbortMPU on non-MPU entry should be no-op, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_ExpirationDate(t *testing.T) {
|
||||
date := mustTime(t, "2025-06-15T00:00:00Z")
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDate: date}
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mustTime(t, "2024-01-01T00:00:00Z")}
|
||||
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDate, info, date.Add(-time.Second)); got.Action != ActionNone {
|
||||
t.Fatalf("before date, got %v", got)
|
||||
}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDate, info, date); got.Action != ActionDeleteObject {
|
||||
t.Fatalf("at date, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_ExpiredObjectDeleteMarker(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, ExpiredObjectDeleteMarker: true}
|
||||
now := mustTime(t, "2025-01-01T00:00:00Z")
|
||||
mod := mustTime(t, "2024-12-01T00:00:00Z")
|
||||
|
||||
// Current sole-survivor delete marker: fires.
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, IsDeleteMarker: true, NumVersions: 1, ModTime: mod}
|
||||
if got := EvaluateAction(rule, ActionKindExpiredDeleteMarker, info, now); got.Action != ActionExpireDeleteMarker {
|
||||
t.Fatalf("sole-survivor marker, got %v", got)
|
||||
}
|
||||
// Non-sole-survivor: not eligible for EXPIRED_DELETE_MARKER.
|
||||
info.NumVersions = 2
|
||||
if got := EvaluateAction(rule, ActionKindExpiredDeleteMarker, info, now); got.Action != ActionNone {
|
||||
t.Fatalf("non-sole marker, got %v", got)
|
||||
}
|
||||
// Latest non-marker: not in scope of this kind.
|
||||
info = &ObjectInfo{Key: "a", IsLatest: true, IsDeleteMarker: false, NumVersions: 1, ModTime: mod}
|
||||
if got := EvaluateAction(rule, ActionKindExpiredDeleteMarker, info, now); got.Action != ActionNone {
|
||||
t.Fatalf("non-marker, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_NoncurrentDeleteMarkerExpiresViaNoncurrentDays(t *testing.T) {
|
||||
// A non-current delete marker is just a version. NoncurrentVersionExpirationDays
|
||||
// must apply to it.
|
||||
rule := &Rule{Status: StatusEnabled, NoncurrentVersionExpirationDays: 7}
|
||||
successor := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{
|
||||
Key: "a",
|
||||
IsLatest: false,
|
||||
IsDeleteMarker: true,
|
||||
NumVersions: 3,
|
||||
SuccessorModTime: successor,
|
||||
ModTime: successor.AddDate(0, 0, -1),
|
||||
}
|
||||
if got := EvaluateAction(rule, ActionKindNoncurrentDays, info, successor.AddDate(0, 0, 7)); got.Action != ActionDeleteVersion {
|
||||
t.Fatalf("noncurrent delete marker should be eligible under NoncurrentDays, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_NoncurrentVersionDays(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, NoncurrentVersionExpirationDays: 30}
|
||||
successor := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{
|
||||
Key: "a",
|
||||
IsLatest: false,
|
||||
ModTime: mustTime(t, "2023-01-01T00:00:00Z"),
|
||||
SuccessorModTime: successor,
|
||||
NoncurrentIndex: 0,
|
||||
}
|
||||
if got := EvaluateAction(rule, ActionKindNoncurrentDays, info, successor.AddDate(0, 0, 29)); got.Action != ActionNone {
|
||||
t.Fatalf("not due, got %v", got)
|
||||
}
|
||||
if got := EvaluateAction(rule, ActionKindNoncurrentDays, info, successor.AddDate(0, 0, 30)); got.Action != ActionDeleteVersion {
|
||||
t.Fatalf("due, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_NoncurrentDaysFallsBackToModTime(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, NoncurrentVersionExpirationDays: 30}
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{Key: "a", IsLatest: false, ModTime: mod, NoncurrentIndex: 0}
|
||||
if got := EvaluateAction(rule, ActionKindNoncurrentDays, info, mod.AddDate(0, 0, 30)); got.Action != ActionDeleteVersion {
|
||||
t.Fatalf("expected fallback to ModTime, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_NewerNoncurrentCountOnly(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, NewerNoncurrentVersions: 3}
|
||||
now := mustTime(t, "2025-01-01T00:00:00Z")
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
info := &ObjectInfo{Key: "a", IsLatest: false, ModTime: mod, NoncurrentIndex: i}
|
||||
if got := EvaluateAction(rule, ActionKindNewerNoncurrent, info, now); got.Action != ActionNone {
|
||||
t.Fatalf("idx=%d should be retained, got %v", i, got)
|
||||
}
|
||||
}
|
||||
info := &ObjectInfo{Key: "a", IsLatest: false, ModTime: mod, NoncurrentIndex: 3}
|
||||
if got := EvaluateAction(rule, ActionKindNewerNoncurrent, info, now); got.Action != ActionDeleteVersion {
|
||||
t.Fatalf("idx=3 should fire, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_NoncurrentDaysAndCount(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, NoncurrentVersionExpirationDays: 30, NewerNoncurrentVersions: 2}
|
||||
successor := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
now := successor.AddDate(0, 0, 30)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
info := &ObjectInfo{Key: "a", IsLatest: false, ModTime: successor, SuccessorModTime: successor, NoncurrentIndex: i}
|
||||
if got := EvaluateAction(rule, ActionKindNoncurrentDays, info, now); got.Action != ActionNone {
|
||||
t.Fatalf("idx=%d kept by NewerNoncurrent retention, got %v", i, got)
|
||||
}
|
||||
}
|
||||
info := &ObjectInfo{Key: "a", IsLatest: false, ModTime: successor, SuccessorModTime: successor, NoncurrentIndex: 2}
|
||||
if got := EvaluateAction(rule, ActionKindNoncurrentDays, info, now); got.Action != ActionDeleteVersion {
|
||||
t.Fatalf("idx=2 satisfies both, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_AbortMultipartUpload(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, AbortMPUDaysAfterInitiation: 7}
|
||||
init := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{Key: ".uploads/u1/", IsMPUInit: true, ModTime: init}
|
||||
if got := EvaluateAction(rule, ActionKindAbortMPU, info, init.AddDate(0, 0, 6)); got.Action != ActionNone {
|
||||
t.Fatalf("not due, got %v", got)
|
||||
}
|
||||
if got := EvaluateAction(rule, ActionKindAbortMPU, info, init.AddDate(0, 0, 7)); got.Action != ActionAbortMultipartUpload {
|
||||
t.Fatalf("due, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_PrefixFilter(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, Prefix: "logs/", ExpirationDays: 1}
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
now := mod.AddDate(0, 0, 10)
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, &ObjectInfo{Key: "data/x", IsLatest: true, ModTime: mod}, now); got.Action != ActionNone {
|
||||
t.Fatalf("prefix mismatch should reject, got %v", got)
|
||||
}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, &ObjectInfo{Key: "logs/x", IsLatest: true, ModTime: mod}, now); got.Action != ActionDeleteObject {
|
||||
t.Fatalf("prefix match should fire, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_TagFilter(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDays: 1, FilterTags: map[string]string{"env": "prod", "tier": "cold"}}
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
now := mod.AddDate(0, 0, 10)
|
||||
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mod}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, now); got.Action != ActionNone {
|
||||
t.Fatalf("missing tags should reject, got %v", got)
|
||||
}
|
||||
info.Tags = map[string]string{"env": "prod"}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, now); got.Action != ActionNone {
|
||||
t.Fatalf("partial match should reject, got %v", got)
|
||||
}
|
||||
info.Tags = map[string]string{"env": "prod", "tier": "cold", "extra": "ignored"}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, now); got.Action != ActionDeleteObject {
|
||||
t.Fatalf("all tags match should fire, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_SizeFilter(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDays: 1, FilterSizeGreaterThan: 100, FilterSizeLessThan: 1000}
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
now := mod.AddDate(0, 0, 10)
|
||||
cases := []struct {
|
||||
size int64
|
||||
want Action
|
||||
}{
|
||||
{50, ActionNone},
|
||||
{100, ActionNone},
|
||||
{500, ActionDeleteObject},
|
||||
{1000, ActionNone},
|
||||
{2000, ActionNone},
|
||||
}
|
||||
for _, c := range cases {
|
||||
info := &ObjectInfo{Key: "a", IsLatest: true, ModTime: mod, Size: c.size}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, now); got.Action != c.want {
|
||||
t.Fatalf("size=%d want=%v got=%v", c.size, c.want, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluateAction_EmptyPrefixMatchesAll(t *testing.T) {
|
||||
rule := &Rule{Status: StatusEnabled, ExpirationDays: 1, Prefix: ""}
|
||||
mod := mustTime(t, "2024-01-01T00:00:00Z")
|
||||
info := &ObjectInfo{Key: "deeply/nested/path/x", IsLatest: true, ModTime: mod}
|
||||
if got := EvaluateAction(rule, ActionKindExpirationDays, info, mod.AddDate(0, 0, 10)); got.Action != ActionDeleteObject {
|
||||
t.Fatalf("empty prefix should match, got %v", got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package s3lifecycle
|
||||
|
||||
import "time"
|
||||
|
||||
// EventLogHorizon returns the maximum age of an event the reader needs to
|
||||
// observe to drive the (rule, kind) compiled action. Used by the retention
|
||||
// mode gate: if metaLogRetention < EventLogHorizon(rule, kind) +
|
||||
// bootstrapLookbackMin, this action is promoted to scan_only with
|
||||
// degraded_reason=RETENTION_BELOW_HORIZON.
|
||||
//
|
||||
// One XML rule may declare multiple actions with different horizons (a 90d
|
||||
// EXPIRATION_DAYS sibling alongside a 7d ABORT_MPU); the gate runs per
|
||||
// compiled action so each can degrade independently.
|
||||
//
|
||||
// Per-kind values:
|
||||
//
|
||||
// EXPIRATION_DAYS -> rule.ExpirationDays
|
||||
// NONCURRENT_DAYS -> rule.NoncurrentVersionExpirationDays
|
||||
// ABORT_MPU -> rule.AbortMPUDaysAfterInitiation
|
||||
// NEWER_NONCURRENT -> SmallDelay (count-only retention; immediate at flip)
|
||||
// EXPIRED_DELETE_MARKER -> SmallDelay (immediate when sole survivor)
|
||||
// EXPIRATION_DATE -> 0 (date kind bypasses the gate)
|
||||
func EventLogHorizon(rule *Rule, kind ActionKind) time.Duration {
|
||||
if rule == nil {
|
||||
return 0
|
||||
}
|
||||
const day = 24 * time.Hour
|
||||
switch kind {
|
||||
case ActionKindExpirationDays:
|
||||
if rule.ExpirationDays > 0 {
|
||||
return time.Duration(rule.ExpirationDays) * day
|
||||
}
|
||||
case ActionKindNoncurrentDays:
|
||||
if rule.NoncurrentVersionExpirationDays > 0 {
|
||||
return time.Duration(rule.NoncurrentVersionExpirationDays) * day
|
||||
}
|
||||
case ActionKindAbortMPU:
|
||||
if rule.AbortMPUDaysAfterInitiation > 0 {
|
||||
return time.Duration(rule.AbortMPUDaysAfterInitiation) * day
|
||||
}
|
||||
case ActionKindNewerNoncurrent:
|
||||
if rule.NewerNoncurrentVersions > 0 && rule.NoncurrentVersionExpirationDays == 0 {
|
||||
return SmallDelay
|
||||
}
|
||||
case ActionKindExpiredDeleteMarker:
|
||||
if rule.ExpiredObjectDeleteMarker {
|
||||
return SmallDelay
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
package s3lifecycle
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestEventLogHorizon_PerActionIsIndependent(t *testing.T) {
|
||||
// A multi-action rule's horizon is computed per kind: the gate runs on
|
||||
// each compiled action separately, so EXPIRATION_DAYS at 90d does NOT
|
||||
// share a horizon with its sibling ABORT_MPU at 7d.
|
||||
rule := &Rule{ExpirationDays: 90, AbortMPUDaysAfterInitiation: 7, NoncurrentVersionExpirationDays: 30}
|
||||
cases := []struct {
|
||||
kind ActionKind
|
||||
want time.Duration
|
||||
}{
|
||||
{ActionKindExpirationDays, 90 * 24 * time.Hour},
|
||||
{ActionKindAbortMPU, 7 * 24 * time.Hour},
|
||||
{ActionKindNoncurrentDays, 30 * 24 * time.Hour},
|
||||
{ActionKindExpirationDate, 0},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.kind.String(), func(t *testing.T) {
|
||||
if got := EventLogHorizon(rule, c.kind); got != c.want {
|
||||
t.Fatalf("want %v, got %v", c.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventLogHorizon_NewerNoncurrentCountOnlyIsSmallDelay(t *testing.T) {
|
||||
rule := &Rule{NewerNoncurrentVersions: 5}
|
||||
if got := EventLogHorizon(rule, ActionKindNewerNoncurrent); got != SmallDelay {
|
||||
t.Fatalf("want SmallDelay, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventLogHorizon_NewerNoncurrentWithDaysIsZero(t *testing.T) {
|
||||
// Pure NEWER_NONCURRENT only when NoncurrentDays is unset; otherwise the
|
||||
// rule produces a NONCURRENT_DAYS action and asking for NEWER_NONCURRENT
|
||||
// returns 0 (not declared by this rule).
|
||||
rule := &Rule{NoncurrentVersionExpirationDays: 30, NewerNoncurrentVersions: 5}
|
||||
if got := EventLogHorizon(rule, ActionKindNewerNoncurrent); got != 0 {
|
||||
t.Fatalf("NEWER_NONCURRENT not declared when paired with days, got %v", got)
|
||||
}
|
||||
// The actual action this rule declares is NONCURRENT_DAYS — verify horizon there.
|
||||
if got := EventLogHorizon(rule, ActionKindNoncurrentDays); got != 30*24*time.Hour {
|
||||
t.Fatalf("NoncurrentDays horizon, want 30d, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventLogHorizon_ExpiredDeleteMarkerIsSmallDelay(t *testing.T) {
|
||||
rule := &Rule{ExpiredObjectDeleteMarker: true}
|
||||
if got := EventLogHorizon(rule, ActionKindExpiredDeleteMarker); got != SmallDelay {
|
||||
t.Fatalf("want SmallDelay, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventLogHorizon_DateKindReturnsZero(t *testing.T) {
|
||||
rule := &Rule{ExpirationDate: mustTime(t, "2025-06-15T00:00:00Z")}
|
||||
if got := EventLogHorizon(rule, ActionKindExpirationDate); got != 0 {
|
||||
t.Fatalf("date kind bypasses gate, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventLogHorizon_NilRule(t *testing.T) {
|
||||
if got := EventLogHorizon(nil, ActionKindExpirationDays); got != 0 {
|
||||
t.Fatalf("nil rule should return 0, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventLogHorizon_KindNotDeclaredReturnsZero(t *testing.T) {
|
||||
// Asking for a kind the rule doesn't declare returns 0; the gate then
|
||||
// trivially passes for that compiled action (which doesn't exist).
|
||||
rule := &Rule{ExpirationDays: 30}
|
||||
if got := EventLogHorizon(rule, ActionKindAbortMPU); got != 0 {
|
||||
t.Fatalf("want 0 for undeclared kind, got %v", got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package s3lifecycle
|
||||
|
||||
import "time"
|
||||
|
||||
// MinTriggerAge returns the day threshold defined by `kind` on `rule`. Used by
|
||||
// the safety-scan cadence as `max(MinTriggerAge(rule, kind), kindFloor)` —
|
||||
// see the per-kind cadence table in the design doc. Returns 0 when the kind
|
||||
// has no day-style threshold (date / count / immediate kinds), in which case
|
||||
// the caller's kind-floor is the cadence directly.
|
||||
//
|
||||
// One XML rule may declare multiple actions; this helper takes a kind so the
|
||||
// cadence is computed independently per compiled action.
|
||||
func MinTriggerAge(rule *Rule, kind ActionKind) time.Duration {
|
||||
if rule == nil {
|
||||
return 0
|
||||
}
|
||||
const day = 24 * time.Hour
|
||||
switch kind {
|
||||
case ActionKindExpirationDays:
|
||||
if rule.ExpirationDays > 0 {
|
||||
return time.Duration(rule.ExpirationDays) * day
|
||||
}
|
||||
case ActionKindNoncurrentDays:
|
||||
if rule.NoncurrentVersionExpirationDays > 0 {
|
||||
return time.Duration(rule.NoncurrentVersionExpirationDays) * day
|
||||
}
|
||||
case ActionKindAbortMPU:
|
||||
if rule.AbortMPUDaysAfterInitiation > 0 {
|
||||
return time.Duration(rule.AbortMPUDaysAfterInitiation) * day
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package s3lifecycle
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMinTriggerAge_PerKind(t *testing.T) {
|
||||
rule := &Rule{
|
||||
ExpirationDays: 30,
|
||||
NoncurrentVersionExpirationDays: 7,
|
||||
AbortMPUDaysAfterInitiation: 14,
|
||||
}
|
||||
cases := []struct {
|
||||
kind ActionKind
|
||||
want time.Duration
|
||||
}{
|
||||
{ActionKindExpirationDays, 30 * 24 * time.Hour},
|
||||
{ActionKindNoncurrentDays, 7 * 24 * time.Hour},
|
||||
{ActionKindAbortMPU, 14 * 24 * time.Hour},
|
||||
{ActionKindExpirationDate, 0},
|
||||
{ActionKindNewerNoncurrent, 0},
|
||||
{ActionKindExpiredDeleteMarker, 0},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.kind.String(), func(t *testing.T) {
|
||||
if got := MinTriggerAge(rule, c.kind); got != c.want {
|
||||
t.Fatalf("want %v, got %v", c.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinTriggerAge_KindNotSetReturnsZero(t *testing.T) {
|
||||
// Asking for a kind that the rule doesn't actually declare returns 0,
|
||||
// so the safety-scan cadence falls through to the kind floor.
|
||||
rule := &Rule{ExpirationDays: 30}
|
||||
if got := MinTriggerAge(rule, ActionKindAbortMPU); got != 0 {
|
||||
t.Fatalf("want 0, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinTriggerAge_DateOnlyReturnsZero(t *testing.T) {
|
||||
rule := &Rule{ExpirationDate: mustTime(t, "2025-06-15T00:00:00Z")}
|
||||
if got := MinTriggerAge(rule, ActionKindExpirationDate); got != 0 {
|
||||
t.Fatalf("date kind has no day threshold, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinTriggerAge_NilRule(t *testing.T) {
|
||||
if got := MinTriggerAge(nil, ActionKindExpirationDays); got != 0 {
|
||||
t.Fatalf("nil rule should return 0, got %v", got)
|
||||
}
|
||||
}
|
||||
@@ -68,8 +68,26 @@ type ObjectInfo struct {
|
||||
// Tags are the object's user-defined tags, extracted from the entry's
|
||||
// Extended metadata (keys prefixed with "X-Amz-Tagging-").
|
||||
Tags map[string]string
|
||||
|
||||
// IsMPUInit is true when this entry represents an in-flight multipart
|
||||
// upload init under <bucket>/.uploads/<uploadId>/. The evaluator routes
|
||||
// these entries to the AbortIncompleteMultipartUpload action shape.
|
||||
// ModTime carries the upload's initiation time when this is set.
|
||||
IsMPUInit bool
|
||||
}
|
||||
|
||||
// Status values for Rule.Status.
|
||||
const (
|
||||
StatusEnabled = "Enabled"
|
||||
StatusDisabled = "Disabled"
|
||||
)
|
||||
|
||||
// SmallDelay is the lookback used for predicate-change events and as the
|
||||
// event-log horizon for count-based / immediate rule kinds (NewerNoncurrent,
|
||||
// ExpiredObjectDeleteMarker). A small fixed value avoids races with in-flight
|
||||
// writes without forcing the reader to keep deep history.
|
||||
const SmallDelay = time.Minute
|
||||
|
||||
// Action represents the lifecycle action to take on an object.
|
||||
type Action int
|
||||
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
package s3lifecycle
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
// RuleHash returns the first 8 bytes of sha256 over a canonicalized rule
|
||||
// representation. The hash is stable across:
|
||||
//
|
||||
// - tag-key reorder (FilterTags is sorted before hashing)
|
||||
// - rule.ID changes (ID is excluded — it's display-only)
|
||||
// - rule.Status flips (Enabled <-> Disabled — state continuity preserved
|
||||
// across operator toggles)
|
||||
//
|
||||
// Prefix is hashed verbatim: "logs" and "logs/" produce different hashes
|
||||
// because they match different objects under literal prefix semantics
|
||||
// (strings.HasPrefix). Collapsing them would let an edit silently reuse
|
||||
// per-rule durable state for a rule that no longer matches the same set.
|
||||
//
|
||||
// Different action shapes (different days, different filter, different
|
||||
// action types) hash to different values.
|
||||
//
|
||||
// Encoding is length-prefixed to avoid delimiter ambiguity: a tag value
|
||||
// containing "=" or "\n" or a prefix containing the field-tag separator
|
||||
// must not be able to forge a different tuple that hashes the same. Each
|
||||
// scalar is written as `<field-tag-byte> <uvarint-length> <bytes>`.
|
||||
func RuleHash(rule *Rule) [8]byte {
|
||||
if rule == nil {
|
||||
var zero [8]byte
|
||||
return zero
|
||||
}
|
||||
h := sha256.New()
|
||||
// Filter.
|
||||
writeBytes(h, fieldPrefix, []byte(rule.Prefix))
|
||||
tagKeys := make([]string, 0, len(rule.FilterTags))
|
||||
for k := range rule.FilterTags {
|
||||
tagKeys = append(tagKeys, k)
|
||||
}
|
||||
sort.Strings(tagKeys)
|
||||
writeUvarint(h, fieldTagCount, uint64(len(tagKeys)))
|
||||
for _, k := range tagKeys {
|
||||
writeBytes(h, fieldTagKey, []byte(k))
|
||||
writeBytes(h, fieldTagValue, []byte(rule.FilterTags[k]))
|
||||
}
|
||||
writeInt64(h, fieldSizeGT, rule.FilterSizeGreaterThan)
|
||||
writeInt64(h, fieldSizeLT, rule.FilterSizeLessThan)
|
||||
// Actions.
|
||||
writeInt64(h, fieldExpDays, int64(rule.ExpirationDays))
|
||||
writeBytes(h, fieldExpDate, []byte(canonicalTime(rule.ExpirationDate)))
|
||||
writeBool(h, fieldExpDeleteMarker, rule.ExpiredObjectDeleteMarker)
|
||||
writeInt64(h, fieldNoncurDays, int64(rule.NoncurrentVersionExpirationDays))
|
||||
writeInt64(h, fieldNoncurKeep, int64(rule.NewerNoncurrentVersions))
|
||||
writeInt64(h, fieldMPUDays, int64(rule.AbortMPUDaysAfterInitiation))
|
||||
|
||||
sum := h.Sum(nil)
|
||||
var out [8]byte
|
||||
copy(out[:], sum[:8])
|
||||
return out
|
||||
}
|
||||
|
||||
// Field tags namespace each scalar so an attacker can't substitute one
|
||||
// string for another and re-collide. Values are arbitrary but stable.
|
||||
const (
|
||||
fieldPrefix byte = 0x01
|
||||
fieldTagCount byte = 0x02
|
||||
fieldTagKey byte = 0x03
|
||||
fieldTagValue byte = 0x04
|
||||
fieldSizeGT byte = 0x05
|
||||
fieldSizeLT byte = 0x06
|
||||
fieldExpDays byte = 0x10
|
||||
fieldExpDate byte = 0x11
|
||||
fieldExpDeleteMarker byte = 0x12
|
||||
fieldNoncurDays byte = 0x13
|
||||
fieldNoncurKeep byte = 0x14
|
||||
fieldMPUDays byte = 0x15
|
||||
)
|
||||
|
||||
type byteSink interface {
|
||||
Write(p []byte) (n int, err error)
|
||||
}
|
||||
|
||||
func writeBytes(w byteSink, tag byte, b []byte) {
|
||||
var lenbuf [binary.MaxVarintLen64]byte
|
||||
n := binary.PutUvarint(lenbuf[:], uint64(len(b)))
|
||||
w.Write([]byte{tag})
|
||||
w.Write(lenbuf[:n])
|
||||
w.Write(b)
|
||||
}
|
||||
|
||||
func writeUvarint(w byteSink, tag byte, v uint64) {
|
||||
var buf [binary.MaxVarintLen64]byte
|
||||
n := binary.PutUvarint(buf[:], v)
|
||||
w.Write([]byte{tag})
|
||||
w.Write(buf[:n])
|
||||
}
|
||||
|
||||
func writeInt64(w byteSink, tag byte, v int64) {
|
||||
var buf [binary.MaxVarintLen64]byte
|
||||
n := binary.PutVarint(buf[:], v)
|
||||
w.Write([]byte{tag})
|
||||
w.Write(buf[:n])
|
||||
}
|
||||
|
||||
func writeBool(w byteSink, tag byte, b bool) {
|
||||
v := byte(0)
|
||||
if b {
|
||||
v = 1
|
||||
}
|
||||
w.Write([]byte{tag, v})
|
||||
}
|
||||
|
||||
func canonicalTime(t time.Time) string {
|
||||
if t.IsZero() {
|
||||
return ""
|
||||
}
|
||||
return t.UTC().Format(time.RFC3339Nano)
|
||||
}
|
||||
@@ -0,0 +1,114 @@
|
||||
package s3lifecycle
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRuleHash_Stable(t *testing.T) {
|
||||
r := &Rule{Status: StatusEnabled, ExpirationDays: 30, Prefix: "logs/"}
|
||||
a := RuleHash(r)
|
||||
b := RuleHash(r)
|
||||
if a != b {
|
||||
t.Fatalf("hash should be deterministic, got %x vs %x", a, b)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_TagOrderInvariant(t *testing.T) {
|
||||
r1 := &Rule{ExpirationDays: 30, FilterTags: map[string]string{"a": "1", "b": "2"}}
|
||||
r2 := &Rule{ExpirationDays: 30, FilterTags: map[string]string{"b": "2", "a": "1"}}
|
||||
if RuleHash(r1) != RuleHash(r2) {
|
||||
t.Fatalf("tag order should not affect hash")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_PrefixTrailingSlashMattersToHash(t *testing.T) {
|
||||
// "logs" matches "logs", "logsmore/x", "logs/x" (literal HasPrefix);
|
||||
// "logs/" matches only "logs/x". Different match sets -> different rules
|
||||
// -> different hashes. Collapsing would let an edit silently reuse
|
||||
// state for a rule that no longer matches the same objects.
|
||||
r1 := &Rule{ExpirationDays: 30, Prefix: "logs"}
|
||||
r2 := &Rule{ExpirationDays: 30, Prefix: "logs/"}
|
||||
if RuleHash(r1) == RuleHash(r2) {
|
||||
t.Fatalf("trailing slash MUST affect hash; rules match different objects")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_IDIgnored(t *testing.T) {
|
||||
r1 := &Rule{ID: "first", ExpirationDays: 30}
|
||||
r2 := &Rule{ID: "renamed", ExpirationDays: 30}
|
||||
if RuleHash(r1) != RuleHash(r2) {
|
||||
t.Fatalf("ID change should not affect hash")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_StatusIgnored(t *testing.T) {
|
||||
r1 := &Rule{Status: StatusEnabled, ExpirationDays: 30}
|
||||
r2 := &Rule{Status: StatusDisabled, ExpirationDays: 30}
|
||||
if RuleHash(r1) != RuleHash(r2) {
|
||||
t.Fatalf("status flip should not affect hash (state continuity)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_DifferentDaysHashDifferent(t *testing.T) {
|
||||
r1 := &Rule{ExpirationDays: 30}
|
||||
r2 := &Rule{ExpirationDays: 31}
|
||||
if RuleHash(r1) == RuleHash(r2) {
|
||||
t.Fatalf("different days must hash differently")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_DifferentActionTypesHashDifferent(t *testing.T) {
|
||||
r1 := &Rule{ExpirationDays: 30}
|
||||
r2 := &Rule{NoncurrentVersionExpirationDays: 30}
|
||||
r3 := &Rule{AbortMPUDaysAfterInitiation: 30}
|
||||
if RuleHash(r1) == RuleHash(r2) || RuleHash(r2) == RuleHash(r3) || RuleHash(r1) == RuleHash(r3) {
|
||||
t.Fatalf("different action types must hash differently")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_FilterMatters(t *testing.T) {
|
||||
r1 := &Rule{ExpirationDays: 30, Prefix: "logs/"}
|
||||
r2 := &Rule{ExpirationDays: 30, Prefix: "data/"}
|
||||
r3 := &Rule{ExpirationDays: 30, Prefix: "logs/", FilterTags: map[string]string{"env": "prod"}}
|
||||
r4 := &Rule{ExpirationDays: 30, FilterSizeGreaterThan: 1000}
|
||||
if RuleHash(r1) == RuleHash(r2) || RuleHash(r1) == RuleHash(r3) || RuleHash(r1) == RuleHash(r4) {
|
||||
t.Fatalf("different filters must hash differently")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_NilSafe(t *testing.T) {
|
||||
if h := RuleHash(nil); h != ([8]byte{}) {
|
||||
t.Fatalf("nil rule should yield zero hash, got %x", h)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_TagDelimiterCollisionResistant(t *testing.T) {
|
||||
// With a naive "tag=K=V\n" encoding, ("a=b", "c") and ("a", "b=c")
|
||||
// serialize identically. Length-prefixed encoding must keep them
|
||||
// distinct.
|
||||
r1 := &Rule{ExpirationDays: 30, FilterTags: map[string]string{"a=b": "c"}}
|
||||
r2 := &Rule{ExpirationDays: 30, FilterTags: map[string]string{"a": "b=c"}}
|
||||
if RuleHash(r1) == RuleHash(r2) {
|
||||
t.Fatalf("delimiter collision: tag(a=b,c) and tag(a,b=c) hash equal")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_TagNewlineCollisionResistant(t *testing.T) {
|
||||
// "a\nb" -> single tag with a key that embeds a newline; naively this
|
||||
// could be split across lines and matched by an unrelated 2-tag rule.
|
||||
r1 := &Rule{ExpirationDays: 30, FilterTags: map[string]string{"a\nb": "c"}}
|
||||
r2 := &Rule{ExpirationDays: 30, FilterTags: map[string]string{"a": "b", "c": ""}}
|
||||
if RuleHash(r1) == RuleHash(r2) {
|
||||
t.Fatalf("delimiter collision via embedded newline")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuleHash_PrefixSeparatorIsolation(t *testing.T) {
|
||||
// A prefix containing whatever the previous encoder used as a separator
|
||||
// ("=" / "\n") must not be able to "escape" into another field.
|
||||
r1 := &Rule{ExpirationDays: 30, Prefix: "logs\nexp_days=99"}
|
||||
r2 := &Rule{ExpirationDays: 99, Prefix: "logs"}
|
||||
if RuleHash(r1) == RuleHash(r2) {
|
||||
t.Fatalf("prefix-side delimiter forgery hashes equal")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user