feat(s3/lifecycle): LifecycleDelete RPC server (#9349)

* feat(s3/lifecycle): SeaweedS3LifecycleInternal.LifecycleDelete RPC

Adds the worker-to-S3 RPC the lifecycle worker calls to execute one
(rule, action) verdict against one entry. The S3 server is the only
component allowed to mutate filer state, so it gets the final word:
re-fetch, identity CAS, object-lock check, dispatch.

LifecycleDeleteRequest carries the routing tuple (bucket, object_path,
version_id, rule_hash, action_kind), the per-stream context echoed
into BlockerRecord on FATAL outcomes, and an EntryIdentity CAS witness
that lets the server detect mid-flight drift (mtime, size, head fid,
sorted-Extended hash).

LifecycleDeleteOutcome covers all five worker-actionable verdicts:
DONE / NOOP_RESOLVED / SKIPPED_OBJECT_LOCK / RETRY_LATER / BLOCKED.
Cursor-advance / pending-mutation rules per outcome are documented
inline.

Makefile updated to emit the gRPC stubs alongside the message types.

* feat(s3/lifecycle): LifecycleDelete server handler

Server-side handler for the worker-to-S3 RPC. Five-step flow:

1. Re-fetch live entry (NOT_FOUND -> NOOP_RESOLVED).
2. CAS on EntryIdentity (mtime / size / head fid / sorted-Extended
   sha256). Mismatch -> NOOP_RESOLVED with reason STALE_IDENTITY.
3. enforceObjectLockProtections with governanceBypassAllowed=false.
   Lifecycle never bypasses legal-hold or compliance retention; the
   safety scan re-attempts after the hold lifts. Lock refusal ->
   SKIPPED_OBJECT_LOCK (logged + counted, cursor advances).
4. Dispatch by action_kind:
   - EXPIRATION_DAYS / EXPIRATION_DATE: createDeleteMarker on
     versioned buckets, deleteUnversionedObjectWithClient otherwise.
   - NONCURRENT_DAYS / NEWER_NONCURRENT / EXPIRED_DELETE_MARKER:
     deleteSpecificObjectVersion (the marker is just a version).
   - ABORT_MPU: stub returning RETRY_LATER pending Phase 5 wiring.
5. Helper failures the server can't classify as transient -> BLOCKED
   with reason "FATAL_EVENT_ERROR: <detail>"; the worker writes a
   durable BlockerRecord and pauses the failing stream. Filer fetch
   transport errors -> RETRY_LATER (sustained transients eventually
   promote to BLOCKED via the worker's retry budget).

hashExtended uses length-prefixed encoding so a forged Extended
payload (e.g. one tag value containing the byte sequence of two real
tags) can't collide with a real two-tag map. Regression test pins
this.

Tests cover identity-comparison fields, hashExtended order/delimiter
stability, empty-request rejection. Full filer-integration tests come
in Layer 2.

* fix(s3/lifecycle): use stdlib bytes.Equal for ExtendedHash compare

Replaces a hand-rolled byte slice comparator with bytes.Equal —
idiomatic, slightly faster (the stdlib version is intrinsified on
amd64/arm64), and one fewer test surface to maintain.

* refactor(s3api): trim narration from lifecycle RPC + canonicalizer

Drop step-by-step doc-block narrations on LifecycleDelete and the
helpers that reproduced what the code already says. Keep WHY one-
liners at non-obvious spots: the http.Request=nil safety claim,
why hashExtended is length-prefixed, the safety-scan revisit
contract for SKIPPED_OBJECT_LOCK, the TODO marker for ABORT_MPU.

* refactor(s3/lifecycle): retryLater helper, drop inline literals

Adds retryLater() alongside done/noopResolved/blocked so the four
LifecycleDeleteOutcome constructions look the same. Replaces the two
inline RETRY_LATER literal returns (live-fetch transport error and
the ABORT_MPU stub) with the helper. No behavior change.

* fix(s3/lifecycle): default filer-error classification to RETRY_LATER

Versioning lookup, createDeleteMarker, deleteUnversionedObjectWithClient,
and deleteSpecificObjectVersion all do filer round-trips. Most failures
are transient (filer unavailable, slow, network blip), not deterministic
per-event errors. Returning BLOCKED for them was too aggressive: BLOCKED
halts the stream until an operator intervenes, which would surface
on every transient filer hiccup.

Default these paths to RETRY_LATER. The worker's retry budget already
promotes sustained transients to BLOCKED after a configured threshold,
which is the right place for that escalation. BLOCKED stays for the
truly deterministic error: the request-shape check (missing version_id
on noncurrent delete) and the unknown-action-kind dispatch fallback.

* fix(s3/lifecycle): honor versioning Suspended on current-version expiration

Treating Suspended-versioning buckets like never-versioned ones was
wrong: per AWS S3, current-version expiration on Suspended must
remove the existing null version and insert a new delete marker (the
same behavior a user DELETE produces). The previous code branched
only on isVersioningEnabled() — which returns false for Suspended —
and dropped through to the actual-delete path, losing the version
history that Suspended buckets are still expected to keep.

Switch to getVersioningState() and dispatch on three branches:

  Enabled    -> createDeleteMarker (current becomes non-current)
  Suspended  -> deleteSpecificObjectVersion("null"), createDeleteMarker
  Off / never configured -> deleteUnversionedObjectWithClient

The Suspended path's null-version deletion tolerates NotFound (the
null version may not exist when this is the first delete after a
versioning toggle); other errors classify as RETRY_LATER like the
rest of the dispatch.

* refactor(s3/lifecycle): trim narration on LifecycleDelete

Drop the 6-line versioning-state docblock and the inline
"transient -> RETRY_LATER" rationales; keep one-liners.

* fix(s3/lifecycle): bucket-not-found is NOOP; include ErrObjectNotFound

Three follow-ups from review (the rest were already addressed by
prior commits):

- Versioning lookup now distinguishes filer_pb.ErrNotFound
  (BUCKET_NOT_FOUND -> NOOP_RESOLVED) from genuinely transient
  failures (RETRY_LATER). A bucket deleted between live-fetch and
  versioning-lookup is benign, not transient.
- deleteUnversionedObjectWithClient and deleteSpecificObjectVersion
  now also recognise ErrObjectNotFound as a resolved-NOOP, matching
  the live-fetch step's classification.
This commit is contained in:
Chris Lu
2026-05-07 15:03:33 -07:00
committed by GitHub
parent 7f2b20d577
commit 5ab3860005
7 changed files with 927 additions and 139 deletions
+1 -1
View File
@@ -13,7 +13,7 @@ gen:
protoc mount_peer.proto --go_out=./mount_peer_pb --go-grpc_out=./mount_peer_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc s3.proto --go_out=./s3_pb --go-grpc_out=./s3_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
mkdir -p ./s3_lifecycle_pb
protoc s3_lifecycle.proto --go_out=./s3_lifecycle_pb --go_opt=paths=source_relative
protoc s3_lifecycle.proto --go_out=./s3_lifecycle_pb --go-grpc_out=./s3_lifecycle_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc mq_broker.proto --go_out=./mq_pb --go-grpc_out=./mq_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc mq_schema.proto --go_out=./schema_pb --go-grpc_out=./schema_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc mq_agent.proto --go_out=./mq_agent_pb --go-grpc_out=./mq_agent_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
+60
View File
@@ -33,6 +33,66 @@ enum ActionKind {
EXPIRED_DELETE_MARKER = 6; // Expiration.ExpiredObjectDeleteMarker
}
// SeaweedS3LifecycleInternal is the worker-to-S3 service that performs the
// actual lifecycle deletion. The lifecycle worker computes the (rule, action)
// verdict locally; the S3 server is the only component allowed to mutate the
// filer state, so it gets the final word: it re-fetches the live entry,
// verifies the EntryIdentity CAS, runs object-lock protections, and dispatches
// to the appropriate internal helper.
service SeaweedS3LifecycleInternal {
rpc LifecycleDelete(LifecycleDeleteRequest) returns (LifecycleDeleteResponse);
}
// LifecycleDeleteOutcome captures every per-event verdict the worker needs.
// Cursor advance / pending mutation rules:
//
// DONE -> advance cursor / drop pending
// NOOP_RESOLVED -> advance cursor / drop pending (object already gone or stale identity)
// SKIPPED_OBJECT_LOCK -> log + counter; advance cursor (per design: object lock is operator concern)
// RETRY_LATER -> hold cursor; feed retry-budget; next batch retries from same position
// BLOCKED -> hold cursor; durable BlockerRecord written; operator must intervene
enum LifecycleDeleteOutcome {
LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED = 0;
DONE = 1;
NOOP_RESOLVED = 2;
SKIPPED_OBJECT_LOCK = 3;
RETRY_LATER = 4;
BLOCKED = 5;
}
message LifecycleDeleteRequest {
// Routing.
string bucket = 1;
string object_path = 2; // bucket-relative; no leading slash
string version_id = 3; // empty for non-versioned
bytes rule_hash = 4; // 8 bytes; matches the rule's per-rule dir
ActionKind action_kind = 5; // chooses dispatch within the rule
// Stream context (echoed in BlockerRecord on FATAL outcomes so operators
// can resolve the right (shard, delay) tuple).
string stream_shard = 10;
int64 stream_delay_seconds = 11;
int64 stream_position_ts_ns = 12;
int64 stream_position_offset = 13;
// CAS witness. The server re-fetches the live entry and aborts as
// NOOP_RESOLVED (STALE_IDENTITY) if any field doesn't match.
EntryIdentity expected_identity = 20;
// Snapshot id at the time the worker computed this verdict; the server
// verifies the (rule_hash, action_kind) is still in the current policy
// snapshot and aborts as NOOP_RESOLVED (STALE_POLICY) if not.
uint64 engine_snapshot_id = 21;
}
message LifecycleDeleteResponse {
LifecycleDeleteOutcome outcome = 1;
// Human-readable cause, e.g. "STALE_IDENTITY: mtime drift",
// "FATAL_EVENT_ERROR: malformed entry", or "TRANSPORT_ERROR: filer
// unavailable". Echoed into BlockerRecord.last_error on FATAL.
string reason = 2;
}
// MessagePosition mirrors weed/util/log_buffer.MessagePosition for durable
// storage. ts_ns + offset uniquely identifies a position within a per-filer
// log buffer; per-shard cursors (see ReaderState) use this shape to skip past
+409 -120
View File
@@ -85,6 +85,72 @@ func (ActionKind) EnumDescriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{0}
}
// LifecycleDeleteOutcome captures every per-event verdict the worker needs.
// Cursor advance / pending mutation rules:
//
// DONE -> advance cursor / drop pending
// NOOP_RESOLVED -> advance cursor / drop pending (object already gone or stale identity)
// SKIPPED_OBJECT_LOCK -> log + counter; advance cursor (per design: object lock is operator concern)
// RETRY_LATER -> hold cursor; feed retry-budget; next batch retries from same position
// BLOCKED -> hold cursor; durable BlockerRecord written; operator must intervene
type LifecycleDeleteOutcome int32
const (
LifecycleDeleteOutcome_LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED LifecycleDeleteOutcome = 0
LifecycleDeleteOutcome_DONE LifecycleDeleteOutcome = 1
LifecycleDeleteOutcome_NOOP_RESOLVED LifecycleDeleteOutcome = 2
LifecycleDeleteOutcome_SKIPPED_OBJECT_LOCK LifecycleDeleteOutcome = 3
LifecycleDeleteOutcome_RETRY_LATER LifecycleDeleteOutcome = 4
LifecycleDeleteOutcome_BLOCKED LifecycleDeleteOutcome = 5
)
// Enum value maps for LifecycleDeleteOutcome.
var (
LifecycleDeleteOutcome_name = map[int32]string{
0: "LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED",
1: "DONE",
2: "NOOP_RESOLVED",
3: "SKIPPED_OBJECT_LOCK",
4: "RETRY_LATER",
5: "BLOCKED",
}
LifecycleDeleteOutcome_value = map[string]int32{
"LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED": 0,
"DONE": 1,
"NOOP_RESOLVED": 2,
"SKIPPED_OBJECT_LOCK": 3,
"RETRY_LATER": 4,
"BLOCKED": 5,
}
)
func (x LifecycleDeleteOutcome) Enum() *LifecycleDeleteOutcome {
p := new(LifecycleDeleteOutcome)
*p = x
return p
}
func (x LifecycleDeleteOutcome) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (LifecycleDeleteOutcome) Descriptor() protoreflect.EnumDescriptor {
return file_s3_lifecycle_proto_enumTypes[1].Descriptor()
}
func (LifecycleDeleteOutcome) Type() protoreflect.EnumType {
return &file_s3_lifecycle_proto_enumTypes[1]
}
func (x LifecycleDeleteOutcome) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use LifecycleDeleteOutcome.Descriptor instead.
func (LifecycleDeleteOutcome) EnumDescriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{1}
}
// StreamKind classifies the four blockable streams. ORIGINAL/PREDICATE pause
// reader cursors; BOOTSTRAP pauses a bucket walker; PENDING pauses a rule's
// drain task. Zero is an UNSPECIFIED sentinel — a BlockerRecord whose
@@ -128,11 +194,11 @@ func (x StreamKind) String() string {
}
func (StreamKind) Descriptor() protoreflect.EnumDescriptor {
return file_s3_lifecycle_proto_enumTypes[1].Descriptor()
return file_s3_lifecycle_proto_enumTypes[2].Descriptor()
}
func (StreamKind) Type() protoreflect.EnumType {
return &file_s3_lifecycle_proto_enumTypes[1]
return &file_s3_lifecycle_proto_enumTypes[2]
}
func (x StreamKind) Number() protoreflect.EnumNumber {
@@ -141,7 +207,7 @@ func (x StreamKind) Number() protoreflect.EnumNumber {
// Deprecated: Use StreamKind.Descriptor instead.
func (StreamKind) EnumDescriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{1}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{2}
}
// Proto3 best practice: zero value is an UNSPECIFIED sentinel so that
@@ -191,11 +257,11 @@ func (x LifecycleState_RuleMode) String() string {
}
func (LifecycleState_RuleMode) Descriptor() protoreflect.EnumDescriptor {
return file_s3_lifecycle_proto_enumTypes[2].Descriptor()
return file_s3_lifecycle_proto_enumTypes[3].Descriptor()
}
func (LifecycleState_RuleMode) Type() protoreflect.EnumType {
return &file_s3_lifecycle_proto_enumTypes[2]
return &file_s3_lifecycle_proto_enumTypes[3]
}
func (x LifecycleState_RuleMode) Number() protoreflect.EnumNumber {
@@ -204,7 +270,7 @@ func (x LifecycleState_RuleMode) Number() protoreflect.EnumNumber {
// Deprecated: Use LifecycleState_RuleMode.Descriptor instead.
func (LifecycleState_RuleMode) EnumDescriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{3, 0}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{5, 0}
}
// DEGRADED_REASON_UNSPECIFIED replaces the old NONE sentinel: an unset
@@ -255,11 +321,11 @@ func (x LifecycleState_DegradedReason) String() string {
}
func (LifecycleState_DegradedReason) Descriptor() protoreflect.EnumDescriptor {
return file_s3_lifecycle_proto_enumTypes[3].Descriptor()
return file_s3_lifecycle_proto_enumTypes[4].Descriptor()
}
func (LifecycleState_DegradedReason) Type() protoreflect.EnumType {
return &file_s3_lifecycle_proto_enumTypes[3]
return &file_s3_lifecycle_proto_enumTypes[4]
}
func (x LifecycleState_DegradedReason) Number() protoreflect.EnumNumber {
@@ -268,7 +334,194 @@ func (x LifecycleState_DegradedReason) Number() protoreflect.EnumNumber {
// Deprecated: Use LifecycleState_DegradedReason.Descriptor instead.
func (LifecycleState_DegradedReason) EnumDescriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{3, 1}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{5, 1}
}
type LifecycleDeleteRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
// Routing.
Bucket string `protobuf:"bytes,1,opt,name=bucket,proto3" json:"bucket,omitempty"`
ObjectPath string `protobuf:"bytes,2,opt,name=object_path,json=objectPath,proto3" json:"object_path,omitempty"` // bucket-relative; no leading slash
VersionId string `protobuf:"bytes,3,opt,name=version_id,json=versionId,proto3" json:"version_id,omitempty"` // empty for non-versioned
RuleHash []byte `protobuf:"bytes,4,opt,name=rule_hash,json=ruleHash,proto3" json:"rule_hash,omitempty"` // 8 bytes; matches the rule's per-rule dir
ActionKind ActionKind `protobuf:"varint,5,opt,name=action_kind,json=actionKind,proto3,enum=s3_lifecycle_pb.ActionKind" json:"action_kind,omitempty"` // chooses dispatch within the rule
// Stream context (echoed in BlockerRecord on FATAL outcomes so operators
// can resolve the right (shard, delay) tuple).
StreamShard string `protobuf:"bytes,10,opt,name=stream_shard,json=streamShard,proto3" json:"stream_shard,omitempty"`
StreamDelaySeconds int64 `protobuf:"varint,11,opt,name=stream_delay_seconds,json=streamDelaySeconds,proto3" json:"stream_delay_seconds,omitempty"`
StreamPositionTsNs int64 `protobuf:"varint,12,opt,name=stream_position_ts_ns,json=streamPositionTsNs,proto3" json:"stream_position_ts_ns,omitempty"`
StreamPositionOffset int64 `protobuf:"varint,13,opt,name=stream_position_offset,json=streamPositionOffset,proto3" json:"stream_position_offset,omitempty"`
// CAS witness. The server re-fetches the live entry and aborts as
// NOOP_RESOLVED (STALE_IDENTITY) if any field doesn't match.
ExpectedIdentity *EntryIdentity `protobuf:"bytes,20,opt,name=expected_identity,json=expectedIdentity,proto3" json:"expected_identity,omitempty"`
// Snapshot id at the time the worker computed this verdict; the server
// verifies the (rule_hash, action_kind) is still in the current policy
// snapshot and aborts as NOOP_RESOLVED (STALE_POLICY) if not.
EngineSnapshotId uint64 `protobuf:"varint,21,opt,name=engine_snapshot_id,json=engineSnapshotId,proto3" json:"engine_snapshot_id,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *LifecycleDeleteRequest) Reset() {
*x = LifecycleDeleteRequest{}
mi := &file_s3_lifecycle_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *LifecycleDeleteRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*LifecycleDeleteRequest) ProtoMessage() {}
func (x *LifecycleDeleteRequest) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use LifecycleDeleteRequest.ProtoReflect.Descriptor instead.
func (*LifecycleDeleteRequest) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{0}
}
func (x *LifecycleDeleteRequest) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
func (x *LifecycleDeleteRequest) GetObjectPath() string {
if x != nil {
return x.ObjectPath
}
return ""
}
func (x *LifecycleDeleteRequest) GetVersionId() string {
if x != nil {
return x.VersionId
}
return ""
}
func (x *LifecycleDeleteRequest) GetRuleHash() []byte {
if x != nil {
return x.RuleHash
}
return nil
}
func (x *LifecycleDeleteRequest) GetActionKind() ActionKind {
if x != nil {
return x.ActionKind
}
return ActionKind_ACTION_KIND_UNSPECIFIED
}
func (x *LifecycleDeleteRequest) GetStreamShard() string {
if x != nil {
return x.StreamShard
}
return ""
}
func (x *LifecycleDeleteRequest) GetStreamDelaySeconds() int64 {
if x != nil {
return x.StreamDelaySeconds
}
return 0
}
func (x *LifecycleDeleteRequest) GetStreamPositionTsNs() int64 {
if x != nil {
return x.StreamPositionTsNs
}
return 0
}
func (x *LifecycleDeleteRequest) GetStreamPositionOffset() int64 {
if x != nil {
return x.StreamPositionOffset
}
return 0
}
func (x *LifecycleDeleteRequest) GetExpectedIdentity() *EntryIdentity {
if x != nil {
return x.ExpectedIdentity
}
return nil
}
func (x *LifecycleDeleteRequest) GetEngineSnapshotId() uint64 {
if x != nil {
return x.EngineSnapshotId
}
return 0
}
type LifecycleDeleteResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Outcome LifecycleDeleteOutcome `protobuf:"varint,1,opt,name=outcome,proto3,enum=s3_lifecycle_pb.LifecycleDeleteOutcome" json:"outcome,omitempty"`
// Human-readable cause, e.g. "STALE_IDENTITY: mtime drift",
// "FATAL_EVENT_ERROR: malformed entry", or "TRANSPORT_ERROR: filer
// unavailable". Echoed into BlockerRecord.last_error on FATAL.
Reason string `protobuf:"bytes,2,opt,name=reason,proto3" json:"reason,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *LifecycleDeleteResponse) Reset() {
*x = LifecycleDeleteResponse{}
mi := &file_s3_lifecycle_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *LifecycleDeleteResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*LifecycleDeleteResponse) ProtoMessage() {}
func (x *LifecycleDeleteResponse) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use LifecycleDeleteResponse.ProtoReflect.Descriptor instead.
func (*LifecycleDeleteResponse) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{1}
}
func (x *LifecycleDeleteResponse) GetOutcome() LifecycleDeleteOutcome {
if x != nil {
return x.Outcome
}
return LifecycleDeleteOutcome_LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED
}
func (x *LifecycleDeleteResponse) GetReason() string {
if x != nil {
return x.Reason
}
return ""
}
// MessagePosition mirrors weed/util/log_buffer.MessagePosition for durable
@@ -285,7 +538,7 @@ type MessagePosition struct {
func (x *MessagePosition) Reset() {
*x = MessagePosition{}
mi := &file_s3_lifecycle_proto_msgTypes[0]
mi := &file_s3_lifecycle_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -297,7 +550,7 @@ func (x *MessagePosition) String() string {
func (*MessagePosition) ProtoMessage() {}
func (x *MessagePosition) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[0]
mi := &file_s3_lifecycle_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -310,7 +563,7 @@ func (x *MessagePosition) ProtoReflect() protoreflect.Message {
// Deprecated: Use MessagePosition.ProtoReflect.Descriptor instead.
func (*MessagePosition) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{0}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{2}
}
func (x *MessagePosition) GetTsNs() int64 {
@@ -340,7 +593,7 @@ type FilerShardCursor struct {
func (x *FilerShardCursor) Reset() {
*x = FilerShardCursor{}
mi := &file_s3_lifecycle_proto_msgTypes[1]
mi := &file_s3_lifecycle_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -352,7 +605,7 @@ func (x *FilerShardCursor) String() string {
func (*FilerShardCursor) ProtoMessage() {}
func (x *FilerShardCursor) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[1]
mi := &file_s3_lifecycle_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -365,7 +618,7 @@ func (x *FilerShardCursor) ProtoReflect() protoreflect.Message {
// Deprecated: Use FilerShardCursor.ProtoReflect.Descriptor instead.
func (*FilerShardCursor) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{1}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{3}
}
func (x *FilerShardCursor) GetFilerId() string {
@@ -397,7 +650,7 @@ type EntryIdentity struct {
func (x *EntryIdentity) Reset() {
*x = EntryIdentity{}
mi := &file_s3_lifecycle_proto_msgTypes[2]
mi := &file_s3_lifecycle_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -409,7 +662,7 @@ func (x *EntryIdentity) String() string {
func (*EntryIdentity) ProtoMessage() {}
func (x *EntryIdentity) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[2]
mi := &file_s3_lifecycle_proto_msgTypes[4]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -422,7 +675,7 @@ func (x *EntryIdentity) ProtoReflect() protoreflect.Message {
// Deprecated: Use EntryIdentity.ProtoReflect.Descriptor instead.
func (*EntryIdentity) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{2}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{4}
}
func (x *EntryIdentity) GetMtimeNs() int64 {
@@ -496,7 +749,7 @@ type LifecycleState struct {
func (x *LifecycleState) Reset() {
*x = LifecycleState{}
mi := &file_s3_lifecycle_proto_msgTypes[3]
mi := &file_s3_lifecycle_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -508,7 +761,7 @@ func (x *LifecycleState) String() string {
func (*LifecycleState) ProtoMessage() {}
func (x *LifecycleState) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[3]
mi := &file_s3_lifecycle_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -521,7 +774,7 @@ func (x *LifecycleState) ProtoReflect() protoreflect.Message {
// Deprecated: Use LifecycleState.ProtoReflect.Descriptor instead.
func (*LifecycleState) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{3}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{5}
}
func (x *LifecycleState) GetRuleHash() []byte {
@@ -644,7 +897,7 @@ type PendingItem struct {
func (x *PendingItem) Reset() {
*x = PendingItem{}
mi := &file_s3_lifecycle_proto_msgTypes[4]
mi := &file_s3_lifecycle_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -656,7 +909,7 @@ func (x *PendingItem) String() string {
func (*PendingItem) ProtoMessage() {}
func (x *PendingItem) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[4]
mi := &file_s3_lifecycle_proto_msgTypes[6]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -669,7 +922,7 @@ func (x *PendingItem) ProtoReflect() protoreflect.Message {
// Deprecated: Use PendingItem.ProtoReflect.Descriptor instead.
func (*PendingItem) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{4}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{6}
}
func (x *PendingItem) GetObjectPath() string {
@@ -714,7 +967,7 @@ type BootstrapState struct {
func (x *BootstrapState) Reset() {
*x = BootstrapState{}
mi := &file_s3_lifecycle_proto_msgTypes[5]
mi := &file_s3_lifecycle_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -726,7 +979,7 @@ func (x *BootstrapState) String() string {
func (*BootstrapState) ProtoMessage() {}
func (x *BootstrapState) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[5]
mi := &file_s3_lifecycle_proto_msgTypes[7]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -739,7 +992,7 @@ func (x *BootstrapState) ProtoReflect() protoreflect.Message {
// Deprecated: Use BootstrapState.ProtoReflect.Descriptor instead.
func (*BootstrapState) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{5}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{7}
}
func (x *BootstrapState) GetLastScannedPath() string {
@@ -785,7 +1038,7 @@ type ReaderState struct {
func (x *ReaderState) Reset() {
*x = ReaderState{}
mi := &file_s3_lifecycle_proto_msgTypes[6]
mi := &file_s3_lifecycle_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -797,7 +1050,7 @@ func (x *ReaderState) String() string {
func (*ReaderState) ProtoMessage() {}
func (x *ReaderState) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[6]
mi := &file_s3_lifecycle_proto_msgTypes[8]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -810,7 +1063,7 @@ func (x *ReaderState) ProtoReflect() protoreflect.Message {
// Deprecated: Use ReaderState.ProtoReflect.Descriptor instead.
func (*ReaderState) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{6}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{8}
}
func (x *ReaderState) GetPrimaryFilerEndpoint() string {
@@ -857,7 +1110,7 @@ type FilerShardCursorList struct {
func (x *FilerShardCursorList) Reset() {
*x = FilerShardCursorList{}
mi := &file_s3_lifecycle_proto_msgTypes[7]
mi := &file_s3_lifecycle_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -869,7 +1122,7 @@ func (x *FilerShardCursorList) String() string {
func (*FilerShardCursorList) ProtoMessage() {}
func (x *FilerShardCursorList) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[7]
mi := &file_s3_lifecycle_proto_msgTypes[9]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -882,7 +1135,7 @@ func (x *FilerShardCursorList) ProtoReflect() protoreflect.Message {
// Deprecated: Use FilerShardCursorList.ProtoReflect.Descriptor instead.
func (*FilerShardCursorList) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{7}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{9}
}
func (x *FilerShardCursorList) GetCursors() []*FilerShardCursor {
@@ -906,7 +1159,7 @@ type TailDrainedKey struct {
func (x *TailDrainedKey) Reset() {
*x = TailDrainedKey{}
mi := &file_s3_lifecycle_proto_msgTypes[8]
mi := &file_s3_lifecycle_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -918,7 +1171,7 @@ func (x *TailDrainedKey) String() string {
func (*TailDrainedKey) ProtoMessage() {}
func (x *TailDrainedKey) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[8]
mi := &file_s3_lifecycle_proto_msgTypes[10]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -931,7 +1184,7 @@ func (x *TailDrainedKey) ProtoReflect() protoreflect.Message {
// Deprecated: Use TailDrainedKey.ProtoReflect.Descriptor instead.
func (*TailDrainedKey) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{8}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{10}
}
func (x *TailDrainedKey) GetStreamKind() StreamKind {
@@ -988,7 +1241,7 @@ type BlockerRecord struct {
func (x *BlockerRecord) Reset() {
*x = BlockerRecord{}
mi := &file_s3_lifecycle_proto_msgTypes[9]
mi := &file_s3_lifecycle_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1000,7 +1253,7 @@ func (x *BlockerRecord) String() string {
func (*BlockerRecord) ProtoMessage() {}
func (x *BlockerRecord) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[9]
mi := &file_s3_lifecycle_proto_msgTypes[11]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1013,7 +1266,7 @@ func (x *BlockerRecord) ProtoReflect() protoreflect.Message {
// Deprecated: Use BlockerRecord.ProtoReflect.Descriptor instead.
func (*BlockerRecord) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{9}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{11}
}
func (x *BlockerRecord) GetStreamKind() StreamKind {
@@ -1131,7 +1384,7 @@ type StreamKey struct {
func (x *StreamKey) Reset() {
*x = StreamKey{}
mi := &file_s3_lifecycle_proto_msgTypes[10]
mi := &file_s3_lifecycle_proto_msgTypes[12]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1143,7 +1396,7 @@ func (x *StreamKey) String() string {
func (*StreamKey) ProtoMessage() {}
func (x *StreamKey) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[10]
mi := &file_s3_lifecycle_proto_msgTypes[12]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1156,7 +1409,7 @@ func (x *StreamKey) ProtoReflect() protoreflect.Message {
// Deprecated: Use StreamKey.ProtoReflect.Descriptor instead.
func (*StreamKey) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{10}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{12}
}
func (x *StreamKey) GetKey() isStreamKey_Key {
@@ -1241,7 +1494,7 @@ type OriginalKey struct {
func (x *OriginalKey) Reset() {
*x = OriginalKey{}
mi := &file_s3_lifecycle_proto_msgTypes[11]
mi := &file_s3_lifecycle_proto_msgTypes[13]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1253,7 +1506,7 @@ func (x *OriginalKey) String() string {
func (*OriginalKey) ProtoMessage() {}
func (x *OriginalKey) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[11]
mi := &file_s3_lifecycle_proto_msgTypes[13]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1266,7 +1519,7 @@ func (x *OriginalKey) ProtoReflect() protoreflect.Message {
// Deprecated: Use OriginalKey.ProtoReflect.Descriptor instead.
func (*OriginalKey) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{11}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{13}
}
func (x *OriginalKey) GetShard() string {
@@ -1300,7 +1553,7 @@ type PredicateKey struct {
func (x *PredicateKey) Reset() {
*x = PredicateKey{}
mi := &file_s3_lifecycle_proto_msgTypes[12]
mi := &file_s3_lifecycle_proto_msgTypes[14]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1312,7 +1565,7 @@ func (x *PredicateKey) String() string {
func (*PredicateKey) ProtoMessage() {}
func (x *PredicateKey) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[12]
mi := &file_s3_lifecycle_proto_msgTypes[14]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1325,7 +1578,7 @@ func (x *PredicateKey) ProtoReflect() protoreflect.Message {
// Deprecated: Use PredicateKey.ProtoReflect.Descriptor instead.
func (*PredicateKey) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{12}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{14}
}
func (x *PredicateKey) GetShard() string {
@@ -1355,7 +1608,7 @@ type BootstrapKey struct {
func (x *BootstrapKey) Reset() {
*x = BootstrapKey{}
mi := &file_s3_lifecycle_proto_msgTypes[13]
mi := &file_s3_lifecycle_proto_msgTypes[15]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1367,7 +1620,7 @@ func (x *BootstrapKey) String() string {
func (*BootstrapKey) ProtoMessage() {}
func (x *BootstrapKey) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[13]
mi := &file_s3_lifecycle_proto_msgTypes[15]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1380,7 +1633,7 @@ func (x *BootstrapKey) ProtoReflect() protoreflect.Message {
// Deprecated: Use BootstrapKey.ProtoReflect.Descriptor instead.
func (*BootstrapKey) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{13}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{15}
}
func (x *BootstrapKey) GetBucket() string {
@@ -1431,7 +1684,7 @@ type PendingKey struct {
func (x *PendingKey) Reset() {
*x = PendingKey{}
mi := &file_s3_lifecycle_proto_msgTypes[14]
mi := &file_s3_lifecycle_proto_msgTypes[16]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1443,7 +1696,7 @@ func (x *PendingKey) String() string {
func (*PendingKey) ProtoMessage() {}
func (x *PendingKey) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[14]
mi := &file_s3_lifecycle_proto_msgTypes[16]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1456,7 +1709,7 @@ func (x *PendingKey) ProtoReflect() protoreflect.Message {
// Deprecated: Use PendingKey.ProtoReflect.Descriptor instead.
func (*PendingKey) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{14}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{16}
}
func (x *PendingKey) GetBucket() string {
@@ -1509,7 +1762,7 @@ type RetryBudgetEntry struct {
func (x *RetryBudgetEntry) Reset() {
*x = RetryBudgetEntry{}
mi := &file_s3_lifecycle_proto_msgTypes[15]
mi := &file_s3_lifecycle_proto_msgTypes[17]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1521,7 +1774,7 @@ func (x *RetryBudgetEntry) String() string {
func (*RetryBudgetEntry) ProtoMessage() {}
func (x *RetryBudgetEntry) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[15]
mi := &file_s3_lifecycle_proto_msgTypes[17]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1534,7 +1787,7 @@ func (x *RetryBudgetEntry) ProtoReflect() protoreflect.Message {
// Deprecated: Use RetryBudgetEntry.ProtoReflect.Descriptor instead.
func (*RetryBudgetEntry) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{15}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{17}
}
func (x *RetryBudgetEntry) GetKey() *StreamKey {
@@ -1583,7 +1836,7 @@ type RetryTarget struct {
func (x *RetryTarget) Reset() {
*x = RetryTarget{}
mi := &file_s3_lifecycle_proto_msgTypes[16]
mi := &file_s3_lifecycle_proto_msgTypes[18]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1595,7 +1848,7 @@ func (x *RetryTarget) String() string {
func (*RetryTarget) ProtoMessage() {}
func (x *RetryTarget) ProtoReflect() protoreflect.Message {
mi := &file_s3_lifecycle_proto_msgTypes[16]
mi := &file_s3_lifecycle_proto_msgTypes[18]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1608,7 +1861,7 @@ func (x *RetryTarget) ProtoReflect() protoreflect.Message {
// Deprecated: Use RetryTarget.ProtoReflect.Descriptor instead.
func (*RetryTarget) Descriptor() ([]byte, []int) {
return file_s3_lifecycle_proto_rawDescGZIP(), []int{16}
return file_s3_lifecycle_proto_rawDescGZIP(), []int{18}
}
func (x *RetryTarget) GetKey() *StreamKey {
@@ -1657,7 +1910,26 @@ var File_s3_lifecycle_proto protoreflect.FileDescriptor
const file_s3_lifecycle_proto_rawDesc = "" +
"\n" +
"\x12s3_lifecycle.proto\x12\x0fs3_lifecycle_pb\">\n" +
"\x12s3_lifecycle.proto\x12\x0fs3_lifecycle_pb\"\x84\x04\n" +
"\x16LifecycleDeleteRequest\x12\x16\n" +
"\x06bucket\x18\x01 \x01(\tR\x06bucket\x12\x1f\n" +
"\vobject_path\x18\x02 \x01(\tR\n" +
"objectPath\x12\x1d\n" +
"\n" +
"version_id\x18\x03 \x01(\tR\tversionId\x12\x1b\n" +
"\trule_hash\x18\x04 \x01(\fR\bruleHash\x12<\n" +
"\vaction_kind\x18\x05 \x01(\x0e2\x1b.s3_lifecycle_pb.ActionKindR\n" +
"actionKind\x12!\n" +
"\fstream_shard\x18\n" +
" \x01(\tR\vstreamShard\x120\n" +
"\x14stream_delay_seconds\x18\v \x01(\x03R\x12streamDelaySeconds\x121\n" +
"\x15stream_position_ts_ns\x18\f \x01(\x03R\x12streamPositionTsNs\x124\n" +
"\x16stream_position_offset\x18\r \x01(\x03R\x14streamPositionOffset\x12K\n" +
"\x11expected_identity\x18\x14 \x01(\v2\x1e.s3_lifecycle_pb.EntryIdentityR\x10expectedIdentity\x12,\n" +
"\x12engine_snapshot_id\x18\x15 \x01(\x04R\x10engineSnapshotId\"t\n" +
"\x17LifecycleDeleteResponse\x12A\n" +
"\aoutcome\x18\x01 \x01(\x0e2'.s3_lifecycle_pb.LifecycleDeleteOutcomeR\aoutcome\x12\x16\n" +
"\x06reason\x18\x02 \x01(\tR\x06reason\">\n" +
"\x0fMessagePosition\x12\x13\n" +
"\x05ts_ns\x18\x01 \x01(\x03R\x04tsNs\x12\x16\n" +
"\x06offset\x18\x02 \x01(\x03R\x06offset\"k\n" +
@@ -1807,14 +2079,23 @@ const file_s3_lifecycle_proto_rawDesc = "" +
"\x0fNONCURRENT_DAYS\x10\x03\x12\x14\n" +
"\x10NEWER_NONCURRENT\x10\x04\x12\r\n" +
"\tABORT_MPU\x10\x05\x12\x19\n" +
"\x15EXPIRED_DELETE_MARKER\x10\x06*b\n" +
"\x15EXPIRED_DELETE_MARKER\x10\x06*\x96\x01\n" +
"\x16LifecycleDeleteOutcome\x12(\n" +
"$LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED\x10\x00\x12\b\n" +
"\x04DONE\x10\x01\x12\x11\n" +
"\rNOOP_RESOLVED\x10\x02\x12\x17\n" +
"\x13SKIPPED_OBJECT_LOCK\x10\x03\x12\x0f\n" +
"\vRETRY_LATER\x10\x04\x12\v\n" +
"\aBLOCKED\x10\x05*b\n" +
"\n" +
"StreamKind\x12\x1b\n" +
"\x17STREAM_KIND_UNSPECIFIED\x10\x00\x12\f\n" +
"\bORIGINAL\x10\x01\x12\r\n" +
"\tPREDICATE\x10\x02\x12\r\n" +
"\tBOOTSTRAP\x10\x03\x12\v\n" +
"\aPENDING\x10\x04B\\\n" +
"\aPENDING\x10\x042\x82\x01\n" +
"\x1aSeaweedS3LifecycleInternal\x12d\n" +
"\x0fLifecycleDelete\x12'.s3_lifecycle_pb.LifecycleDeleteRequest\x1a(.s3_lifecycle_pb.LifecycleDeleteResponseB\\\n" +
"\x10seaweedfs.clientB\x10S3LifecycleProtoZ6github.com/seaweedfs/seaweedfs/weed/pb/s3_lifecycle_pbb\x06proto3"
var (
@@ -1829,63 +2110,71 @@ func file_s3_lifecycle_proto_rawDescGZIP() []byte {
return file_s3_lifecycle_proto_rawDescData
}
var file_s3_lifecycle_proto_enumTypes = make([]protoimpl.EnumInfo, 4)
var file_s3_lifecycle_proto_msgTypes = make([]protoimpl.MessageInfo, 18)
var file_s3_lifecycle_proto_enumTypes = make([]protoimpl.EnumInfo, 5)
var file_s3_lifecycle_proto_msgTypes = make([]protoimpl.MessageInfo, 20)
var file_s3_lifecycle_proto_goTypes = []any{
(ActionKind)(0), // 0: s3_lifecycle_pb.ActionKind
(StreamKind)(0), // 1: s3_lifecycle_pb.StreamKind
(LifecycleState_RuleMode)(0), // 2: s3_lifecycle_pb.LifecycleState.RuleMode
(LifecycleState_DegradedReason)(0), // 3: s3_lifecycle_pb.LifecycleState.DegradedReason
(*MessagePosition)(nil), // 4: s3_lifecycle_pb.MessagePosition
(*FilerShardCursor)(nil), // 5: s3_lifecycle_pb.FilerShardCursor
(*EntryIdentity)(nil), // 6: s3_lifecycle_pb.EntryIdentity
(*LifecycleState)(nil), // 7: s3_lifecycle_pb.LifecycleState
(*PendingItem)(nil), // 8: s3_lifecycle_pb.PendingItem
(*BootstrapState)(nil), // 9: s3_lifecycle_pb.BootstrapState
(*ReaderState)(nil), // 10: s3_lifecycle_pb.ReaderState
(*FilerShardCursorList)(nil), // 11: s3_lifecycle_pb.FilerShardCursorList
(*TailDrainedKey)(nil), // 12: s3_lifecycle_pb.TailDrainedKey
(*BlockerRecord)(nil), // 13: s3_lifecycle_pb.BlockerRecord
(*StreamKey)(nil), // 14: s3_lifecycle_pb.StreamKey
(*OriginalKey)(nil), // 15: s3_lifecycle_pb.OriginalKey
(*PredicateKey)(nil), // 16: s3_lifecycle_pb.PredicateKey
(*BootstrapKey)(nil), // 17: s3_lifecycle_pb.BootstrapKey
(*PendingKey)(nil), // 18: s3_lifecycle_pb.PendingKey
(*RetryBudgetEntry)(nil), // 19: s3_lifecycle_pb.RetryBudgetEntry
(*RetryTarget)(nil), // 20: s3_lifecycle_pb.RetryTarget
nil, // 21: s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry
(LifecycleDeleteOutcome)(0), // 1: s3_lifecycle_pb.LifecycleDeleteOutcome
(StreamKind)(0), // 2: s3_lifecycle_pb.StreamKind
(LifecycleState_RuleMode)(0), // 3: s3_lifecycle_pb.LifecycleState.RuleMode
(LifecycleState_DegradedReason)(0), // 4: s3_lifecycle_pb.LifecycleState.DegradedReason
(*LifecycleDeleteRequest)(nil), // 5: s3_lifecycle_pb.LifecycleDeleteRequest
(*LifecycleDeleteResponse)(nil), // 6: s3_lifecycle_pb.LifecycleDeleteResponse
(*MessagePosition)(nil), // 7: s3_lifecycle_pb.MessagePosition
(*FilerShardCursor)(nil), // 8: s3_lifecycle_pb.FilerShardCursor
(*EntryIdentity)(nil), // 9: s3_lifecycle_pb.EntryIdentity
(*LifecycleState)(nil), // 10: s3_lifecycle_pb.LifecycleState
(*PendingItem)(nil), // 11: s3_lifecycle_pb.PendingItem
(*BootstrapState)(nil), // 12: s3_lifecycle_pb.BootstrapState
(*ReaderState)(nil), // 13: s3_lifecycle_pb.ReaderState
(*FilerShardCursorList)(nil), // 14: s3_lifecycle_pb.FilerShardCursorList
(*TailDrainedKey)(nil), // 15: s3_lifecycle_pb.TailDrainedKey
(*BlockerRecord)(nil), // 16: s3_lifecycle_pb.BlockerRecord
(*StreamKey)(nil), // 17: s3_lifecycle_pb.StreamKey
(*OriginalKey)(nil), // 18: s3_lifecycle_pb.OriginalKey
(*PredicateKey)(nil), // 19: s3_lifecycle_pb.PredicateKey
(*BootstrapKey)(nil), // 20: s3_lifecycle_pb.BootstrapKey
(*PendingKey)(nil), // 21: s3_lifecycle_pb.PendingKey
(*RetryBudgetEntry)(nil), // 22: s3_lifecycle_pb.RetryBudgetEntry
(*RetryTarget)(nil), // 23: s3_lifecycle_pb.RetryTarget
nil, // 24: s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry
}
var file_s3_lifecycle_proto_depIdxs = []int32{
4, // 0: s3_lifecycle_pb.FilerShardCursor.position:type_name -> s3_lifecycle_pb.MessagePosition
0, // 1: s3_lifecycle_pb.LifecycleState.action_kind:type_name -> s3_lifecycle_pb.ActionKind
2, // 2: s3_lifecycle_pb.LifecycleState.mode:type_name -> s3_lifecycle_pb.LifecycleState.RuleMode
3, // 3: s3_lifecycle_pb.LifecycleState.degraded_reason:type_name -> s3_lifecycle_pb.LifecycleState.DegradedReason
6, // 4: s3_lifecycle_pb.PendingItem.expected_identity:type_name -> s3_lifecycle_pb.EntryIdentity
21, // 5: s3_lifecycle_pb.ReaderState.last_processed_original:type_name -> s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry
5, // 6: s3_lifecycle_pb.ReaderState.last_processed_predicate:type_name -> s3_lifecycle_pb.FilerShardCursor
12, // 7: s3_lifecycle_pb.ReaderState.tail_drained_streams:type_name -> s3_lifecycle_pb.TailDrainedKey
5, // 8: s3_lifecycle_pb.FilerShardCursorList.cursors:type_name -> s3_lifecycle_pb.FilerShardCursor
1, // 9: s3_lifecycle_pb.TailDrainedKey.stream_kind:type_name -> s3_lifecycle_pb.StreamKind
1, // 10: s3_lifecycle_pb.BlockerRecord.stream_kind:type_name -> s3_lifecycle_pb.StreamKind
4, // 11: s3_lifecycle_pb.BlockerRecord.position:type_name -> s3_lifecycle_pb.MessagePosition
0, // 12: s3_lifecycle_pb.BlockerRecord.action_kind:type_name -> s3_lifecycle_pb.ActionKind
15, // 13: s3_lifecycle_pb.StreamKey.original:type_name -> s3_lifecycle_pb.OriginalKey
16, // 14: s3_lifecycle_pb.StreamKey.predicate:type_name -> s3_lifecycle_pb.PredicateKey
17, // 15: s3_lifecycle_pb.StreamKey.bootstrap:type_name -> s3_lifecycle_pb.BootstrapKey
18, // 16: s3_lifecycle_pb.StreamKey.pending:type_name -> s3_lifecycle_pb.PendingKey
4, // 17: s3_lifecycle_pb.OriginalKey.position:type_name -> s3_lifecycle_pb.MessagePosition
4, // 18: s3_lifecycle_pb.PredicateKey.position:type_name -> s3_lifecycle_pb.MessagePosition
0, // 19: s3_lifecycle_pb.BootstrapKey.action_kind:type_name -> s3_lifecycle_pb.ActionKind
0, // 20: s3_lifecycle_pb.PendingKey.action_kind:type_name -> s3_lifecycle_pb.ActionKind
14, // 21: s3_lifecycle_pb.RetryBudgetEntry.key:type_name -> s3_lifecycle_pb.StreamKey
14, // 22: s3_lifecycle_pb.RetryTarget.key:type_name -> s3_lifecycle_pb.StreamKey
0, // 23: s3_lifecycle_pb.RetryTarget.action_kind:type_name -> s3_lifecycle_pb.ActionKind
11, // 24: s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry.value:type_name -> s3_lifecycle_pb.FilerShardCursorList
25, // [25:25] is the sub-list for method output_type
25, // [25:25] is the sub-list for method input_type
25, // [25:25] is the sub-list for extension type_name
25, // [25:25] is the sub-list for extension extendee
0, // [0:25] is the sub-list for field type_name
0, // 0: s3_lifecycle_pb.LifecycleDeleteRequest.action_kind:type_name -> s3_lifecycle_pb.ActionKind
9, // 1: s3_lifecycle_pb.LifecycleDeleteRequest.expected_identity:type_name -> s3_lifecycle_pb.EntryIdentity
1, // 2: s3_lifecycle_pb.LifecycleDeleteResponse.outcome:type_name -> s3_lifecycle_pb.LifecycleDeleteOutcome
7, // 3: s3_lifecycle_pb.FilerShardCursor.position:type_name -> s3_lifecycle_pb.MessagePosition
0, // 4: s3_lifecycle_pb.LifecycleState.action_kind:type_name -> s3_lifecycle_pb.ActionKind
3, // 5: s3_lifecycle_pb.LifecycleState.mode:type_name -> s3_lifecycle_pb.LifecycleState.RuleMode
4, // 6: s3_lifecycle_pb.LifecycleState.degraded_reason:type_name -> s3_lifecycle_pb.LifecycleState.DegradedReason
9, // 7: s3_lifecycle_pb.PendingItem.expected_identity:type_name -> s3_lifecycle_pb.EntryIdentity
24, // 8: s3_lifecycle_pb.ReaderState.last_processed_original:type_name -> s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry
8, // 9: s3_lifecycle_pb.ReaderState.last_processed_predicate:type_name -> s3_lifecycle_pb.FilerShardCursor
15, // 10: s3_lifecycle_pb.ReaderState.tail_drained_streams:type_name -> s3_lifecycle_pb.TailDrainedKey
8, // 11: s3_lifecycle_pb.FilerShardCursorList.cursors:type_name -> s3_lifecycle_pb.FilerShardCursor
2, // 12: s3_lifecycle_pb.TailDrainedKey.stream_kind:type_name -> s3_lifecycle_pb.StreamKind
2, // 13: s3_lifecycle_pb.BlockerRecord.stream_kind:type_name -> s3_lifecycle_pb.StreamKind
7, // 14: s3_lifecycle_pb.BlockerRecord.position:type_name -> s3_lifecycle_pb.MessagePosition
0, // 15: s3_lifecycle_pb.BlockerRecord.action_kind:type_name -> s3_lifecycle_pb.ActionKind
18, // 16: s3_lifecycle_pb.StreamKey.original:type_name -> s3_lifecycle_pb.OriginalKey
19, // 17: s3_lifecycle_pb.StreamKey.predicate:type_name -> s3_lifecycle_pb.PredicateKey
20, // 18: s3_lifecycle_pb.StreamKey.bootstrap:type_name -> s3_lifecycle_pb.BootstrapKey
21, // 19: s3_lifecycle_pb.StreamKey.pending:type_name -> s3_lifecycle_pb.PendingKey
7, // 20: s3_lifecycle_pb.OriginalKey.position:type_name -> s3_lifecycle_pb.MessagePosition
7, // 21: s3_lifecycle_pb.PredicateKey.position:type_name -> s3_lifecycle_pb.MessagePosition
0, // 22: s3_lifecycle_pb.BootstrapKey.action_kind:type_name -> s3_lifecycle_pb.ActionKind
0, // 23: s3_lifecycle_pb.PendingKey.action_kind:type_name -> s3_lifecycle_pb.ActionKind
17, // 24: s3_lifecycle_pb.RetryBudgetEntry.key:type_name -> s3_lifecycle_pb.StreamKey
17, // 25: s3_lifecycle_pb.RetryTarget.key:type_name -> s3_lifecycle_pb.StreamKey
0, // 26: s3_lifecycle_pb.RetryTarget.action_kind:type_name -> s3_lifecycle_pb.ActionKind
14, // 27: s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry.value:type_name -> s3_lifecycle_pb.FilerShardCursorList
5, // 28: s3_lifecycle_pb.SeaweedS3LifecycleInternal.LifecycleDelete:input_type -> s3_lifecycle_pb.LifecycleDeleteRequest
6, // 29: s3_lifecycle_pb.SeaweedS3LifecycleInternal.LifecycleDelete:output_type -> s3_lifecycle_pb.LifecycleDeleteResponse
29, // [29:30] is the sub-list for method output_type
28, // [28:29] is the sub-list for method input_type
28, // [28:28] is the sub-list for extension type_name
28, // [28:28] is the sub-list for extension extendee
0, // [0:28] is the sub-list for field type_name
}
func init() { file_s3_lifecycle_proto_init() }
@@ -1893,7 +2182,7 @@ func file_s3_lifecycle_proto_init() {
if File_s3_lifecycle_proto != nil {
return
}
file_s3_lifecycle_proto_msgTypes[10].OneofWrappers = []any{
file_s3_lifecycle_proto_msgTypes[12].OneofWrappers = []any{
(*StreamKey_Original)(nil),
(*StreamKey_Predicate)(nil),
(*StreamKey_Bootstrap)(nil),
@@ -1904,10 +2193,10 @@ func file_s3_lifecycle_proto_init() {
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_s3_lifecycle_proto_rawDesc), len(file_s3_lifecycle_proto_rawDesc)),
NumEnums: 4,
NumMessages: 18,
NumEnums: 5,
NumMessages: 20,
NumExtensions: 0,
NumServices: 0,
NumServices: 1,
},
GoTypes: file_s3_lifecycle_proto_goTypes,
DependencyIndexes: file_s3_lifecycle_proto_depIdxs,
@@ -0,0 +1,136 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v6.33.4
// source: s3_lifecycle.proto
package s3_lifecycle_pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
SeaweedS3LifecycleInternal_LifecycleDelete_FullMethodName = "/s3_lifecycle_pb.SeaweedS3LifecycleInternal/LifecycleDelete"
)
// SeaweedS3LifecycleInternalClient is the client API for SeaweedS3LifecycleInternal service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// SeaweedS3LifecycleInternal is the worker-to-S3 service that performs the
// actual lifecycle deletion. The lifecycle worker computes the (rule, action)
// verdict locally; the S3 server is the only component allowed to mutate the
// filer state, so it gets the final word: it re-fetches the live entry,
// verifies the EntryIdentity CAS, runs object-lock protections, and dispatches
// to the appropriate internal helper.
type SeaweedS3LifecycleInternalClient interface {
LifecycleDelete(ctx context.Context, in *LifecycleDeleteRequest, opts ...grpc.CallOption) (*LifecycleDeleteResponse, error)
}
type seaweedS3LifecycleInternalClient struct {
cc grpc.ClientConnInterface
}
func NewSeaweedS3LifecycleInternalClient(cc grpc.ClientConnInterface) SeaweedS3LifecycleInternalClient {
return &seaweedS3LifecycleInternalClient{cc}
}
func (c *seaweedS3LifecycleInternalClient) LifecycleDelete(ctx context.Context, in *LifecycleDeleteRequest, opts ...grpc.CallOption) (*LifecycleDeleteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(LifecycleDeleteResponse)
err := c.cc.Invoke(ctx, SeaweedS3LifecycleInternal_LifecycleDelete_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// SeaweedS3LifecycleInternalServer is the server API for SeaweedS3LifecycleInternal service.
// All implementations must embed UnimplementedSeaweedS3LifecycleInternalServer
// for forward compatibility.
//
// SeaweedS3LifecycleInternal is the worker-to-S3 service that performs the
// actual lifecycle deletion. The lifecycle worker computes the (rule, action)
// verdict locally; the S3 server is the only component allowed to mutate the
// filer state, so it gets the final word: it re-fetches the live entry,
// verifies the EntryIdentity CAS, runs object-lock protections, and dispatches
// to the appropriate internal helper.
type SeaweedS3LifecycleInternalServer interface {
LifecycleDelete(context.Context, *LifecycleDeleteRequest) (*LifecycleDeleteResponse, error)
mustEmbedUnimplementedSeaweedS3LifecycleInternalServer()
}
// UnimplementedSeaweedS3LifecycleInternalServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedSeaweedS3LifecycleInternalServer struct{}
func (UnimplementedSeaweedS3LifecycleInternalServer) LifecycleDelete(context.Context, *LifecycleDeleteRequest) (*LifecycleDeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method LifecycleDelete not implemented")
}
func (UnimplementedSeaweedS3LifecycleInternalServer) mustEmbedUnimplementedSeaweedS3LifecycleInternalServer() {
}
func (UnimplementedSeaweedS3LifecycleInternalServer) testEmbeddedByValue() {}
// UnsafeSeaweedS3LifecycleInternalServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to SeaweedS3LifecycleInternalServer will
// result in compilation errors.
type UnsafeSeaweedS3LifecycleInternalServer interface {
mustEmbedUnimplementedSeaweedS3LifecycleInternalServer()
}
func RegisterSeaweedS3LifecycleInternalServer(s grpc.ServiceRegistrar, srv SeaweedS3LifecycleInternalServer) {
// If the following call pancis, it indicates UnimplementedSeaweedS3LifecycleInternalServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&SeaweedS3LifecycleInternal_ServiceDesc, srv)
}
func _SeaweedS3LifecycleInternal_LifecycleDelete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(LifecycleDeleteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SeaweedS3LifecycleInternalServer).LifecycleDelete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: SeaweedS3LifecycleInternal_LifecycleDelete_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedS3LifecycleInternalServer).LifecycleDelete(ctx, req.(*LifecycleDeleteRequest))
}
return interceptor(ctx, in, info, handler)
}
// SeaweedS3LifecycleInternal_ServiceDesc is the grpc.ServiceDesc for SeaweedS3LifecycleInternal service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var SeaweedS3LifecycleInternal_ServiceDesc = grpc.ServiceDesc{
ServiceName: "s3_lifecycle_pb.SeaweedS3LifecycleInternal",
HandlerType: (*SeaweedS3LifecycleInternalServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "LifecycleDelete",
Handler: _SeaweedS3LifecycleInternal_LifecycleDelete_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "s3_lifecycle.proto",
}
+200
View File
@@ -0,0 +1,200 @@
package s3api
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"sort"
"strconv"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_lifecycle_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
// LifecycleDelete executes one (rule, action) verdict: re-fetch, identity
// CAS, object-lock check, dispatch by kind. Errors surface as outcomes;
// reader cursors and pending state are the worker's concern.
func (s3a *S3ApiServer) LifecycleDelete(ctx context.Context, req *s3_lifecycle_pb.LifecycleDeleteRequest) (*s3_lifecycle_pb.LifecycleDeleteResponse, error) {
if req == nil || req.Bucket == "" || req.ObjectPath == "" {
return blocked("FATAL_EVENT_ERROR: empty bucket or object_path"), nil
}
// MPU init lives at .uploads/<id>/; not handled by getObjectEntry.
if req.ActionKind == s3_lifecycle_pb.ActionKind_ABORT_MPU {
return s3a.lifecycleAbortMPU(ctx, req)
}
entry, err := s3a.getObjectEntry(req.Bucket, req.ObjectPath, req.VersionId)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrObjectNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrLatestVersionNotFound) {
return noopResolved("NOT_FOUND"), nil
}
glog.V(1).Infof("lifecycle: live fetch %s/%s@%s: %v", req.Bucket, req.ObjectPath, req.VersionId, err)
return retryLater("TRANSPORT_ERROR: " + err.Error()), nil
}
live := computeEntryIdentity(entry)
if !identityMatches(live, req.ExpectedIdentity) {
return noopResolved("STALE_IDENTITY"), nil
}
// Lifecycle never bypasses governance/compliance; the http.Request is
// only read when bypass is allowed, so nil is safe here.
if err := s3a.enforceObjectLockProtections(nil, req.Bucket, req.ObjectPath, req.VersionId, false); err != nil {
glog.V(2).Infof("lifecycle: SKIPPED_OBJECT_LOCK %s/%s@%s: %v", req.Bucket, req.ObjectPath, req.VersionId, err)
return &s3_lifecycle_pb.LifecycleDeleteResponse{
Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_SKIPPED_OBJECT_LOCK,
Reason: err.Error(),
}, nil
}
return s3a.lifecycleDispatch(ctx, req, entry)
}
func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle_pb.LifecycleDeleteRequest, entry *filer_pb.Entry) (*s3_lifecycle_pb.LifecycleDeleteResponse, error) {
switch req.ActionKind {
case s3_lifecycle_pb.ActionKind_EXPIRATION_DAYS, s3_lifecycle_pb.ActionKind_EXPIRATION_DATE:
// Current-version expiration: Enabled -> delete marker; Suspended
// -> delete null + new marker; Off -> remove. Filer errors classify
// as RETRY_LATER; the worker's budget promotes to BLOCKED.
state, vErr := s3a.getVersioningState(req.Bucket)
if vErr != nil {
if errors.Is(vErr, filer_pb.ErrNotFound) {
return noopResolved("BUCKET_NOT_FOUND"), nil
}
return retryLater("TRANSPORT_ERROR: versioning lookup: " + vErr.Error()), nil
}
switch state {
case s3_constants.VersioningEnabled:
if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath); err != nil {
return retryLater("TRANSPORT_ERROR: createDeleteMarker: " + err.Error()), nil
}
return done(), nil
case s3_constants.VersioningSuspended:
// Best-effort null delete; NotFound is benign.
if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, "null"); err != nil {
if !errors.Is(err, filer_pb.ErrNotFound) && !errors.Is(err, ErrVersionNotFound) {
return retryLater("TRANSPORT_ERROR: deleteNullVersion: " + err.Error()), nil
}
}
if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath); err != nil {
return retryLater("TRANSPORT_ERROR: createDeleteMarker: " + err.Error()), nil
}
return done(), nil
default:
err := s3a.WithFilerClient(false, func(c filer_pb.SeaweedFilerClient) error {
return s3a.deleteUnversionedObjectWithClient(c, req.Bucket, req.ObjectPath)
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrObjectNotFound) {
return noopResolved("NOT_FOUND_AT_DELETE"), nil
}
return retryLater("TRANSPORT_ERROR: deleteUnversioned: " + err.Error()), nil
}
return done(), nil
}
case s3_lifecycle_pb.ActionKind_NONCURRENT_DAYS,
s3_lifecycle_pb.ActionKind_NEWER_NONCURRENT,
s3_lifecycle_pb.ActionKind_EXPIRED_DELETE_MARKER:
// EXPIRED_DELETE_MARKER targets the marker version itself.
if req.VersionId == "" {
return blocked("FATAL_EVENT_ERROR: version_id required for noncurrent / delete-marker delete"), nil
}
if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, req.VersionId); err != nil {
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrObjectNotFound) {
return noopResolved("NOT_FOUND_AT_DELETE"), nil
}
return retryLater("TRANSPORT_ERROR: deleteSpecificVersion: " + err.Error()), nil
}
return done(), nil
case s3_lifecycle_pb.ActionKind_ABORT_MPU:
return blocked("FATAL_EVENT_ERROR: ABORT_MPU dispatched after fetch"), nil
default:
return blocked("FATAL_EVENT_ERROR: unknown action_kind " + req.ActionKind.String()), nil
}
}
func (s3a *S3ApiServer) lifecycleAbortMPU(ctx context.Context, req *s3_lifecycle_pb.LifecycleDeleteRequest) (*s3_lifecycle_pb.LifecycleDeleteResponse, error) {
// TODO(phase-5): plumb to abortMultipartUpload (currently expects an
// *s3.AbortMultipartUploadInput; lifecycle has no HTTP request).
return retryLater("ABORT_MPU not yet wired"), nil
}
// computeEntryIdentity captures (mtime, size, head fid, sorted-Extended hash):
// an overwrite changes mtime/size/fid; a metadata edit changes Extended; a
// snapshot-restore that preserves mtime+size still differs in head_fid.
func computeEntryIdentity(entry *filer_pb.Entry) *s3_lifecycle_pb.EntryIdentity {
if entry == nil {
return nil
}
id := &s3_lifecycle_pb.EntryIdentity{}
if entry.Attributes != nil {
id.MtimeNs = entry.Attributes.Mtime
id.Size = int64(entry.Attributes.FileSize)
}
if len(entry.GetChunks()) > 0 {
id.HeadFid = entry.GetChunks()[0].FileId
}
id.ExtendedHash = hashExtended(entry.Extended)
return id
}
// hashExtended is length-prefixed; a forged tag value can't collide with a
// legitimate multi-tag map.
func hashExtended(ext map[string][]byte) []byte {
if len(ext) == 0 {
return nil
}
keys := make([]string, 0, len(ext))
for k := range ext {
keys = append(keys, k)
}
sort.Strings(keys)
h := sha256.New()
for _, k := range keys {
h.Write([]byte(strconv.Itoa(len(k))))
h.Write([]byte{':'})
h.Write([]byte(k))
v := ext[k]
h.Write([]byte(strconv.Itoa(len(v))))
h.Write([]byte{':'})
h.Write(v)
}
return h.Sum(nil)
}
func identityMatches(live, want *s3_lifecycle_pb.EntryIdentity) bool {
if want == nil {
// No CAS witness (early bootstrap); skip.
return true
}
if live == nil {
return false
}
if live.MtimeNs != want.MtimeNs || live.Size != want.Size {
return false
}
if live.HeadFid != want.HeadFid {
return false
}
return bytes.Equal(live.ExtendedHash, want.ExtendedHash)
}
func done() *s3_lifecycle_pb.LifecycleDeleteResponse {
return &s3_lifecycle_pb.LifecycleDeleteResponse{Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_DONE}
}
func noopResolved(reason string) *s3_lifecycle_pb.LifecycleDeleteResponse {
return &s3_lifecycle_pb.LifecycleDeleteResponse{Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_NOOP_RESOLVED, Reason: reason}
}
func blocked(reason string) *s3_lifecycle_pb.LifecycleDeleteResponse {
return &s3_lifecycle_pb.LifecycleDeleteResponse{Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_BLOCKED, Reason: reason}
}
func retryLater(reason string) *s3_lifecycle_pb.LifecycleDeleteResponse {
return &s3_lifecycle_pb.LifecycleDeleteResponse{Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_RETRY_LATER, Reason: reason}
}
+117
View File
@@ -0,0 +1,117 @@
package s3api
import (
"bytes"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_lifecycle_pb"
)
func TestComputeEntryIdentity_BasicFields(t *testing.T) {
entry := &filer_pb.Entry{
Attributes: &filer_pb.FuseAttributes{Mtime: 1700000000, FileSize: 4096},
Chunks: []*filer_pb.FileChunk{
{FileId: "1,abc"},
{FileId: "1,def"},
},
}
id := computeEntryIdentity(entry)
if id.MtimeNs != 1700000000 {
t.Fatalf("MtimeNs want 1700000000, got %d", id.MtimeNs)
}
if id.Size != 4096 {
t.Fatalf("Size want 4096, got %d", id.Size)
}
if id.HeadFid != "1,abc" {
t.Fatalf("HeadFid want 1,abc, got %s", id.HeadFid)
}
}
func TestComputeEntryIdentity_NilSafeMissingChunks(t *testing.T) {
if got := computeEntryIdentity(nil); got != nil {
t.Fatalf("nil entry should return nil, got %v", got)
}
id := computeEntryIdentity(&filer_pb.Entry{})
if id == nil {
t.Fatalf("entry with nil Attributes should still produce identity")
}
if id.HeadFid != "" {
t.Fatalf("missing chunks should yield empty HeadFid, got %s", id.HeadFid)
}
}
func TestHashExtended_OrderStable(t *testing.T) {
a := map[string][]byte{"k1": []byte("v1"), "k2": []byte("v2")}
b := map[string][]byte{"k2": []byte("v2"), "k1": []byte("v1")}
if !bytes.Equal(hashExtended(a), hashExtended(b)) {
t.Fatalf("hash should be insensitive to map iteration order")
}
}
func TestHashExtended_DelimiterCollisionResistant(t *testing.T) {
// Naively concatenated: "k1=v1k2v2" could collide with "k1=v1k" / "2v2".
// Length-prefix encoding must keep them apart.
a := map[string][]byte{"k1": []byte("v1"), "k2": []byte("v2")}
b := map[string][]byte{"k1": []byte("v1k2v2")}
if bytes.Equal(hashExtended(a), hashExtended(b)) {
t.Fatalf("delimiter-forged Extended payloads must not collide")
}
}
func TestHashExtended_NilEqualsEmpty(t *testing.T) {
if got := hashExtended(nil); len(got) != 0 {
t.Fatalf("nil should produce zero-length hash, got %d bytes", len(got))
}
if got := hashExtended(map[string][]byte{}); len(got) != 0 {
t.Fatalf("empty map should produce zero-length hash, got %d bytes", len(got))
}
}
func TestIdentityMatches_NilWantTreatedAsMatch(t *testing.T) {
// Bootstrap callers that don't yet have an identity to CAS against
// pass nil expected_identity; the server treats this as "no CAS".
live := &s3_lifecycle_pb.EntryIdentity{MtimeNs: 1, Size: 2}
if !identityMatches(live, nil) {
t.Fatalf("nil want should match")
}
}
func TestIdentityMatches_NilLiveDoesNotMatch(t *testing.T) {
if identityMatches(nil, &s3_lifecycle_pb.EntryIdentity{MtimeNs: 1}) {
t.Fatalf("nil live should not match a populated want")
}
}
func TestIdentityMatches_AllFieldsCompared(t *testing.T) {
base := &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2048, HeadFid: "1,abc", ExtendedHash: []byte{0x01, 0x02}}
cases := []struct {
name string
live *s3_lifecycle_pb.EntryIdentity
want bool
}{
{"identical", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2048, HeadFid: "1,abc", ExtendedHash: []byte{0x01, 0x02}}, true},
{"mtime-drift", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 101, Size: 2048, HeadFid: "1,abc", ExtendedHash: []byte{0x01, 0x02}}, false},
{"size-drift", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2049, HeadFid: "1,abc", ExtendedHash: []byte{0x01, 0x02}}, false},
{"fid-drift", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2048, HeadFid: "1,xyz", ExtendedHash: []byte{0x01, 0x02}}, false},
{"extended-drift", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2048, HeadFid: "1,abc", ExtendedHash: []byte{0x03, 0x04}}, false},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if got := identityMatches(c.live, base); got != c.want {
t.Fatalf("want %v, got %v", c.want, got)
}
})
}
}
func TestLifecycleDelete_RejectsEmptyRequest(t *testing.T) {
s := &S3ApiServer{}
resp, err := s.LifecycleDelete(nil, &s3_lifecycle_pb.LifecycleDeleteRequest{})
if err != nil {
t.Fatalf("unexpected gRPC error: %v", err)
}
if resp.Outcome != s3_lifecycle_pb.LifecycleDeleteOutcome_BLOCKED {
t.Fatalf("empty request should be BLOCKED, got %v", resp.Outcome)
}
}
+4 -18
View File
@@ -4,17 +4,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
)
// LifecycleToCanonical converts the XML-deserialized Lifecycle into the flat
// s3lifecycle.Rule shape the engine compiles against. One XML <Rule> maps to
// exactly one s3lifecycle.Rule (with potentially multiple action sub-fields
// populated); the engine then expands each into its compiled actions via
// s3lifecycle.RuleActionKinds.
//
// Filter resolution mirrors AWS semantics: the optional <Filter> element may
// contain a single <Prefix> | <Tag> | <And>, or be absent (in which case the
// older top-level <Prefix> applies). When <And> is used, its sub-elements
// (Prefix + multiple Tags + size filters) are flattened into the canonical
// Rule's individual fields.
// LifecycleToCanonical flattens the XML-deserialized Lifecycle into the
// engine's flat Rule shape. The optional <Filter> element may contain
// <Prefix> | <Tag> | <And>, or be absent (in which case the older top-level
// <Prefix> applies).
func LifecycleToCanonical(lc *Lifecycle) []*s3lifecycle.Rule {
if lc == nil {
return nil
@@ -32,7 +25,6 @@ func ruleToCanonical(r *Rule) *s3lifecycle.Rule {
Status: string(r.Status),
}
// Prefix: <Filter><Prefix> or <Filter><And><Prefix> or top-level <Prefix>.
prefix, tags, sizeGT, sizeLT := flattenFilter(&r.Filter)
if prefix == "" && r.Prefix.set {
prefix = r.Prefix.val
@@ -44,7 +36,6 @@ func ruleToCanonical(r *Rule) *s3lifecycle.Rule {
out.FilterSizeGreaterThan = sizeGT
out.FilterSizeLessThan = sizeLT
// Expiration sub-fields.
if r.Expiration.set {
out.ExpirationDays = r.Expiration.Days
if !r.Expiration.Date.Time.IsZero() {
@@ -55,13 +46,11 @@ func ruleToCanonical(r *Rule) *s3lifecycle.Rule {
}
}
// Non-current version expiration.
if r.NoncurrentVersionExpiration.set {
out.NoncurrentVersionExpirationDays = r.NoncurrentVersionExpiration.NoncurrentDays
out.NewerNoncurrentVersions = r.NoncurrentVersionExpiration.NewerNoncurrentVersions
}
// Abort multipart.
if r.AbortIncompleteMultipartUpload.set {
out.AbortMPUDaysAfterInitiation = r.AbortIncompleteMultipartUpload.DaysAfterInitiation
}
@@ -69,9 +58,6 @@ func ruleToCanonical(r *Rule) *s3lifecycle.Rule {
return out
}
// flattenFilter pulls Prefix / Tags / Size constraints out of the XML Filter
// element. Returns zero values when the field is unset; the caller falls back
// to the rule's top-level Prefix when prefix is "".
func flattenFilter(f *Filter) (prefix string, tags map[string]string, sizeGT, sizeLT int64) {
if !f.set {
return