diff --git a/weed/pb/Makefile b/weed/pb/Makefile index f8ead50ac..f6edcef40 100644 --- a/weed/pb/Makefile +++ b/weed/pb/Makefile @@ -13,7 +13,7 @@ gen: protoc mount_peer.proto --go_out=./mount_peer_pb --go-grpc_out=./mount_peer_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc s3.proto --go_out=./s3_pb --go-grpc_out=./s3_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative mkdir -p ./s3_lifecycle_pb - protoc s3_lifecycle.proto --go_out=./s3_lifecycle_pb --go_opt=paths=source_relative + protoc s3_lifecycle.proto --go_out=./s3_lifecycle_pb --go-grpc_out=./s3_lifecycle_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc mq_broker.proto --go_out=./mq_pb --go-grpc_out=./mq_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc mq_schema.proto --go_out=./schema_pb --go-grpc_out=./schema_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc mq_agent.proto --go_out=./mq_agent_pb --go-grpc_out=./mq_agent_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative diff --git a/weed/pb/s3_lifecycle.proto b/weed/pb/s3_lifecycle.proto index 9a802ba51..77addaf1b 100644 --- a/weed/pb/s3_lifecycle.proto +++ b/weed/pb/s3_lifecycle.proto @@ -33,6 +33,66 @@ enum ActionKind { EXPIRED_DELETE_MARKER = 6; // Expiration.ExpiredObjectDeleteMarker } +// SeaweedS3LifecycleInternal is the worker-to-S3 service that performs the +// actual lifecycle deletion. The lifecycle worker computes the (rule, action) +// verdict locally; the S3 server is the only component allowed to mutate the +// filer state, so it gets the final word: it re-fetches the live entry, +// verifies the EntryIdentity CAS, runs object-lock protections, and dispatches +// to the appropriate internal helper. +service SeaweedS3LifecycleInternal { + rpc LifecycleDelete(LifecycleDeleteRequest) returns (LifecycleDeleteResponse); +} + +// LifecycleDeleteOutcome captures every per-event verdict the worker needs. +// Cursor advance / pending mutation rules: +// +// DONE -> advance cursor / drop pending +// NOOP_RESOLVED -> advance cursor / drop pending (object already gone or stale identity) +// SKIPPED_OBJECT_LOCK -> log + counter; advance cursor (per design: object lock is operator concern) +// RETRY_LATER -> hold cursor; feed retry-budget; next batch retries from same position +// BLOCKED -> hold cursor; durable BlockerRecord written; operator must intervene +enum LifecycleDeleteOutcome { + LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED = 0; + DONE = 1; + NOOP_RESOLVED = 2; + SKIPPED_OBJECT_LOCK = 3; + RETRY_LATER = 4; + BLOCKED = 5; +} + +message LifecycleDeleteRequest { + // Routing. + string bucket = 1; + string object_path = 2; // bucket-relative; no leading slash + string version_id = 3; // empty for non-versioned + bytes rule_hash = 4; // 8 bytes; matches the rule's per-rule dir + ActionKind action_kind = 5; // chooses dispatch within the rule + + // Stream context (echoed in BlockerRecord on FATAL outcomes so operators + // can resolve the right (shard, delay) tuple). + string stream_shard = 10; + int64 stream_delay_seconds = 11; + int64 stream_position_ts_ns = 12; + int64 stream_position_offset = 13; + + // CAS witness. The server re-fetches the live entry and aborts as + // NOOP_RESOLVED (STALE_IDENTITY) if any field doesn't match. + EntryIdentity expected_identity = 20; + + // Snapshot id at the time the worker computed this verdict; the server + // verifies the (rule_hash, action_kind) is still in the current policy + // snapshot and aborts as NOOP_RESOLVED (STALE_POLICY) if not. + uint64 engine_snapshot_id = 21; +} + +message LifecycleDeleteResponse { + LifecycleDeleteOutcome outcome = 1; + // Human-readable cause, e.g. "STALE_IDENTITY: mtime drift", + // "FATAL_EVENT_ERROR: malformed entry", or "TRANSPORT_ERROR: filer + // unavailable". Echoed into BlockerRecord.last_error on FATAL. + string reason = 2; +} + // MessagePosition mirrors weed/util/log_buffer.MessagePosition for durable // storage. ts_ns + offset uniquely identifies a position within a per-filer // log buffer; per-shard cursors (see ReaderState) use this shape to skip past diff --git a/weed/pb/s3_lifecycle_pb/s3_lifecycle.pb.go b/weed/pb/s3_lifecycle_pb/s3_lifecycle.pb.go index 07451edd7..dddcaff01 100644 --- a/weed/pb/s3_lifecycle_pb/s3_lifecycle.pb.go +++ b/weed/pb/s3_lifecycle_pb/s3_lifecycle.pb.go @@ -85,6 +85,72 @@ func (ActionKind) EnumDescriptor() ([]byte, []int) { return file_s3_lifecycle_proto_rawDescGZIP(), []int{0} } +// LifecycleDeleteOutcome captures every per-event verdict the worker needs. +// Cursor advance / pending mutation rules: +// +// DONE -> advance cursor / drop pending +// NOOP_RESOLVED -> advance cursor / drop pending (object already gone or stale identity) +// SKIPPED_OBJECT_LOCK -> log + counter; advance cursor (per design: object lock is operator concern) +// RETRY_LATER -> hold cursor; feed retry-budget; next batch retries from same position +// BLOCKED -> hold cursor; durable BlockerRecord written; operator must intervene +type LifecycleDeleteOutcome int32 + +const ( + LifecycleDeleteOutcome_LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED LifecycleDeleteOutcome = 0 + LifecycleDeleteOutcome_DONE LifecycleDeleteOutcome = 1 + LifecycleDeleteOutcome_NOOP_RESOLVED LifecycleDeleteOutcome = 2 + LifecycleDeleteOutcome_SKIPPED_OBJECT_LOCK LifecycleDeleteOutcome = 3 + LifecycleDeleteOutcome_RETRY_LATER LifecycleDeleteOutcome = 4 + LifecycleDeleteOutcome_BLOCKED LifecycleDeleteOutcome = 5 +) + +// Enum value maps for LifecycleDeleteOutcome. +var ( + LifecycleDeleteOutcome_name = map[int32]string{ + 0: "LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED", + 1: "DONE", + 2: "NOOP_RESOLVED", + 3: "SKIPPED_OBJECT_LOCK", + 4: "RETRY_LATER", + 5: "BLOCKED", + } + LifecycleDeleteOutcome_value = map[string]int32{ + "LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED": 0, + "DONE": 1, + "NOOP_RESOLVED": 2, + "SKIPPED_OBJECT_LOCK": 3, + "RETRY_LATER": 4, + "BLOCKED": 5, + } +) + +func (x LifecycleDeleteOutcome) Enum() *LifecycleDeleteOutcome { + p := new(LifecycleDeleteOutcome) + *p = x + return p +} + +func (x LifecycleDeleteOutcome) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (LifecycleDeleteOutcome) Descriptor() protoreflect.EnumDescriptor { + return file_s3_lifecycle_proto_enumTypes[1].Descriptor() +} + +func (LifecycleDeleteOutcome) Type() protoreflect.EnumType { + return &file_s3_lifecycle_proto_enumTypes[1] +} + +func (x LifecycleDeleteOutcome) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use LifecycleDeleteOutcome.Descriptor instead. +func (LifecycleDeleteOutcome) EnumDescriptor() ([]byte, []int) { + return file_s3_lifecycle_proto_rawDescGZIP(), []int{1} +} + // StreamKind classifies the four blockable streams. ORIGINAL/PREDICATE pause // reader cursors; BOOTSTRAP pauses a bucket walker; PENDING pauses a rule's // drain task. Zero is an UNSPECIFIED sentinel — a BlockerRecord whose @@ -128,11 +194,11 @@ func (x StreamKind) String() string { } func (StreamKind) Descriptor() protoreflect.EnumDescriptor { - return file_s3_lifecycle_proto_enumTypes[1].Descriptor() + return file_s3_lifecycle_proto_enumTypes[2].Descriptor() } func (StreamKind) Type() protoreflect.EnumType { - return &file_s3_lifecycle_proto_enumTypes[1] + return &file_s3_lifecycle_proto_enumTypes[2] } func (x StreamKind) Number() protoreflect.EnumNumber { @@ -141,7 +207,7 @@ func (x StreamKind) Number() protoreflect.EnumNumber { // Deprecated: Use StreamKind.Descriptor instead. func (StreamKind) EnumDescriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{1} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{2} } // Proto3 best practice: zero value is an UNSPECIFIED sentinel so that @@ -191,11 +257,11 @@ func (x LifecycleState_RuleMode) String() string { } func (LifecycleState_RuleMode) Descriptor() protoreflect.EnumDescriptor { - return file_s3_lifecycle_proto_enumTypes[2].Descriptor() + return file_s3_lifecycle_proto_enumTypes[3].Descriptor() } func (LifecycleState_RuleMode) Type() protoreflect.EnumType { - return &file_s3_lifecycle_proto_enumTypes[2] + return &file_s3_lifecycle_proto_enumTypes[3] } func (x LifecycleState_RuleMode) Number() protoreflect.EnumNumber { @@ -204,7 +270,7 @@ func (x LifecycleState_RuleMode) Number() protoreflect.EnumNumber { // Deprecated: Use LifecycleState_RuleMode.Descriptor instead. func (LifecycleState_RuleMode) EnumDescriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{3, 0} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{5, 0} } // DEGRADED_REASON_UNSPECIFIED replaces the old NONE sentinel: an unset @@ -255,11 +321,11 @@ func (x LifecycleState_DegradedReason) String() string { } func (LifecycleState_DegradedReason) Descriptor() protoreflect.EnumDescriptor { - return file_s3_lifecycle_proto_enumTypes[3].Descriptor() + return file_s3_lifecycle_proto_enumTypes[4].Descriptor() } func (LifecycleState_DegradedReason) Type() protoreflect.EnumType { - return &file_s3_lifecycle_proto_enumTypes[3] + return &file_s3_lifecycle_proto_enumTypes[4] } func (x LifecycleState_DegradedReason) Number() protoreflect.EnumNumber { @@ -268,7 +334,194 @@ func (x LifecycleState_DegradedReason) Number() protoreflect.EnumNumber { // Deprecated: Use LifecycleState_DegradedReason.Descriptor instead. func (LifecycleState_DegradedReason) EnumDescriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{3, 1} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{5, 1} +} + +type LifecycleDeleteRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Routing. + Bucket string `protobuf:"bytes,1,opt,name=bucket,proto3" json:"bucket,omitempty"` + ObjectPath string `protobuf:"bytes,2,opt,name=object_path,json=objectPath,proto3" json:"object_path,omitempty"` // bucket-relative; no leading slash + VersionId string `protobuf:"bytes,3,opt,name=version_id,json=versionId,proto3" json:"version_id,omitempty"` // empty for non-versioned + RuleHash []byte `protobuf:"bytes,4,opt,name=rule_hash,json=ruleHash,proto3" json:"rule_hash,omitempty"` // 8 bytes; matches the rule's per-rule dir + ActionKind ActionKind `protobuf:"varint,5,opt,name=action_kind,json=actionKind,proto3,enum=s3_lifecycle_pb.ActionKind" json:"action_kind,omitempty"` // chooses dispatch within the rule + // Stream context (echoed in BlockerRecord on FATAL outcomes so operators + // can resolve the right (shard, delay) tuple). + StreamShard string `protobuf:"bytes,10,opt,name=stream_shard,json=streamShard,proto3" json:"stream_shard,omitempty"` + StreamDelaySeconds int64 `protobuf:"varint,11,opt,name=stream_delay_seconds,json=streamDelaySeconds,proto3" json:"stream_delay_seconds,omitempty"` + StreamPositionTsNs int64 `protobuf:"varint,12,opt,name=stream_position_ts_ns,json=streamPositionTsNs,proto3" json:"stream_position_ts_ns,omitempty"` + StreamPositionOffset int64 `protobuf:"varint,13,opt,name=stream_position_offset,json=streamPositionOffset,proto3" json:"stream_position_offset,omitempty"` + // CAS witness. The server re-fetches the live entry and aborts as + // NOOP_RESOLVED (STALE_IDENTITY) if any field doesn't match. + ExpectedIdentity *EntryIdentity `protobuf:"bytes,20,opt,name=expected_identity,json=expectedIdentity,proto3" json:"expected_identity,omitempty"` + // Snapshot id at the time the worker computed this verdict; the server + // verifies the (rule_hash, action_kind) is still in the current policy + // snapshot and aborts as NOOP_RESOLVED (STALE_POLICY) if not. + EngineSnapshotId uint64 `protobuf:"varint,21,opt,name=engine_snapshot_id,json=engineSnapshotId,proto3" json:"engine_snapshot_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LifecycleDeleteRequest) Reset() { + *x = LifecycleDeleteRequest{} + mi := &file_s3_lifecycle_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LifecycleDeleteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LifecycleDeleteRequest) ProtoMessage() {} + +func (x *LifecycleDeleteRequest) ProtoReflect() protoreflect.Message { + mi := &file_s3_lifecycle_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LifecycleDeleteRequest.ProtoReflect.Descriptor instead. +func (*LifecycleDeleteRequest) Descriptor() ([]byte, []int) { + return file_s3_lifecycle_proto_rawDescGZIP(), []int{0} +} + +func (x *LifecycleDeleteRequest) GetBucket() string { + if x != nil { + return x.Bucket + } + return "" +} + +func (x *LifecycleDeleteRequest) GetObjectPath() string { + if x != nil { + return x.ObjectPath + } + return "" +} + +func (x *LifecycleDeleteRequest) GetVersionId() string { + if x != nil { + return x.VersionId + } + return "" +} + +func (x *LifecycleDeleteRequest) GetRuleHash() []byte { + if x != nil { + return x.RuleHash + } + return nil +} + +func (x *LifecycleDeleteRequest) GetActionKind() ActionKind { + if x != nil { + return x.ActionKind + } + return ActionKind_ACTION_KIND_UNSPECIFIED +} + +func (x *LifecycleDeleteRequest) GetStreamShard() string { + if x != nil { + return x.StreamShard + } + return "" +} + +func (x *LifecycleDeleteRequest) GetStreamDelaySeconds() int64 { + if x != nil { + return x.StreamDelaySeconds + } + return 0 +} + +func (x *LifecycleDeleteRequest) GetStreamPositionTsNs() int64 { + if x != nil { + return x.StreamPositionTsNs + } + return 0 +} + +func (x *LifecycleDeleteRequest) GetStreamPositionOffset() int64 { + if x != nil { + return x.StreamPositionOffset + } + return 0 +} + +func (x *LifecycleDeleteRequest) GetExpectedIdentity() *EntryIdentity { + if x != nil { + return x.ExpectedIdentity + } + return nil +} + +func (x *LifecycleDeleteRequest) GetEngineSnapshotId() uint64 { + if x != nil { + return x.EngineSnapshotId + } + return 0 +} + +type LifecycleDeleteResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Outcome LifecycleDeleteOutcome `protobuf:"varint,1,opt,name=outcome,proto3,enum=s3_lifecycle_pb.LifecycleDeleteOutcome" json:"outcome,omitempty"` + // Human-readable cause, e.g. "STALE_IDENTITY: mtime drift", + // "FATAL_EVENT_ERROR: malformed entry", or "TRANSPORT_ERROR: filer + // unavailable". Echoed into BlockerRecord.last_error on FATAL. + Reason string `protobuf:"bytes,2,opt,name=reason,proto3" json:"reason,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LifecycleDeleteResponse) Reset() { + *x = LifecycleDeleteResponse{} + mi := &file_s3_lifecycle_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LifecycleDeleteResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LifecycleDeleteResponse) ProtoMessage() {} + +func (x *LifecycleDeleteResponse) ProtoReflect() protoreflect.Message { + mi := &file_s3_lifecycle_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LifecycleDeleteResponse.ProtoReflect.Descriptor instead. +func (*LifecycleDeleteResponse) Descriptor() ([]byte, []int) { + return file_s3_lifecycle_proto_rawDescGZIP(), []int{1} +} + +func (x *LifecycleDeleteResponse) GetOutcome() LifecycleDeleteOutcome { + if x != nil { + return x.Outcome + } + return LifecycleDeleteOutcome_LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED +} + +func (x *LifecycleDeleteResponse) GetReason() string { + if x != nil { + return x.Reason + } + return "" } // MessagePosition mirrors weed/util/log_buffer.MessagePosition for durable @@ -285,7 +538,7 @@ type MessagePosition struct { func (x *MessagePosition) Reset() { *x = MessagePosition{} - mi := &file_s3_lifecycle_proto_msgTypes[0] + mi := &file_s3_lifecycle_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -297,7 +550,7 @@ func (x *MessagePosition) String() string { func (*MessagePosition) ProtoMessage() {} func (x *MessagePosition) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[0] + mi := &file_s3_lifecycle_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -310,7 +563,7 @@ func (x *MessagePosition) ProtoReflect() protoreflect.Message { // Deprecated: Use MessagePosition.ProtoReflect.Descriptor instead. func (*MessagePosition) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{0} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{2} } func (x *MessagePosition) GetTsNs() int64 { @@ -340,7 +593,7 @@ type FilerShardCursor struct { func (x *FilerShardCursor) Reset() { *x = FilerShardCursor{} - mi := &file_s3_lifecycle_proto_msgTypes[1] + mi := &file_s3_lifecycle_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -352,7 +605,7 @@ func (x *FilerShardCursor) String() string { func (*FilerShardCursor) ProtoMessage() {} func (x *FilerShardCursor) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[1] + mi := &file_s3_lifecycle_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -365,7 +618,7 @@ func (x *FilerShardCursor) ProtoReflect() protoreflect.Message { // Deprecated: Use FilerShardCursor.ProtoReflect.Descriptor instead. func (*FilerShardCursor) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{1} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{3} } func (x *FilerShardCursor) GetFilerId() string { @@ -397,7 +650,7 @@ type EntryIdentity struct { func (x *EntryIdentity) Reset() { *x = EntryIdentity{} - mi := &file_s3_lifecycle_proto_msgTypes[2] + mi := &file_s3_lifecycle_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -409,7 +662,7 @@ func (x *EntryIdentity) String() string { func (*EntryIdentity) ProtoMessage() {} func (x *EntryIdentity) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[2] + mi := &file_s3_lifecycle_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -422,7 +675,7 @@ func (x *EntryIdentity) ProtoReflect() protoreflect.Message { // Deprecated: Use EntryIdentity.ProtoReflect.Descriptor instead. func (*EntryIdentity) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{2} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{4} } func (x *EntryIdentity) GetMtimeNs() int64 { @@ -496,7 +749,7 @@ type LifecycleState struct { func (x *LifecycleState) Reset() { *x = LifecycleState{} - mi := &file_s3_lifecycle_proto_msgTypes[3] + mi := &file_s3_lifecycle_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -508,7 +761,7 @@ func (x *LifecycleState) String() string { func (*LifecycleState) ProtoMessage() {} func (x *LifecycleState) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[3] + mi := &file_s3_lifecycle_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -521,7 +774,7 @@ func (x *LifecycleState) ProtoReflect() protoreflect.Message { // Deprecated: Use LifecycleState.ProtoReflect.Descriptor instead. func (*LifecycleState) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{3} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{5} } func (x *LifecycleState) GetRuleHash() []byte { @@ -644,7 +897,7 @@ type PendingItem struct { func (x *PendingItem) Reset() { *x = PendingItem{} - mi := &file_s3_lifecycle_proto_msgTypes[4] + mi := &file_s3_lifecycle_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -656,7 +909,7 @@ func (x *PendingItem) String() string { func (*PendingItem) ProtoMessage() {} func (x *PendingItem) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[4] + mi := &file_s3_lifecycle_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -669,7 +922,7 @@ func (x *PendingItem) ProtoReflect() protoreflect.Message { // Deprecated: Use PendingItem.ProtoReflect.Descriptor instead. func (*PendingItem) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{4} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{6} } func (x *PendingItem) GetObjectPath() string { @@ -714,7 +967,7 @@ type BootstrapState struct { func (x *BootstrapState) Reset() { *x = BootstrapState{} - mi := &file_s3_lifecycle_proto_msgTypes[5] + mi := &file_s3_lifecycle_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -726,7 +979,7 @@ func (x *BootstrapState) String() string { func (*BootstrapState) ProtoMessage() {} func (x *BootstrapState) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[5] + mi := &file_s3_lifecycle_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -739,7 +992,7 @@ func (x *BootstrapState) ProtoReflect() protoreflect.Message { // Deprecated: Use BootstrapState.ProtoReflect.Descriptor instead. func (*BootstrapState) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{5} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{7} } func (x *BootstrapState) GetLastScannedPath() string { @@ -785,7 +1038,7 @@ type ReaderState struct { func (x *ReaderState) Reset() { *x = ReaderState{} - mi := &file_s3_lifecycle_proto_msgTypes[6] + mi := &file_s3_lifecycle_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -797,7 +1050,7 @@ func (x *ReaderState) String() string { func (*ReaderState) ProtoMessage() {} func (x *ReaderState) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[6] + mi := &file_s3_lifecycle_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -810,7 +1063,7 @@ func (x *ReaderState) ProtoReflect() protoreflect.Message { // Deprecated: Use ReaderState.ProtoReflect.Descriptor instead. func (*ReaderState) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{6} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{8} } func (x *ReaderState) GetPrimaryFilerEndpoint() string { @@ -857,7 +1110,7 @@ type FilerShardCursorList struct { func (x *FilerShardCursorList) Reset() { *x = FilerShardCursorList{} - mi := &file_s3_lifecycle_proto_msgTypes[7] + mi := &file_s3_lifecycle_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -869,7 +1122,7 @@ func (x *FilerShardCursorList) String() string { func (*FilerShardCursorList) ProtoMessage() {} func (x *FilerShardCursorList) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[7] + mi := &file_s3_lifecycle_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -882,7 +1135,7 @@ func (x *FilerShardCursorList) ProtoReflect() protoreflect.Message { // Deprecated: Use FilerShardCursorList.ProtoReflect.Descriptor instead. func (*FilerShardCursorList) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{7} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{9} } func (x *FilerShardCursorList) GetCursors() []*FilerShardCursor { @@ -906,7 +1159,7 @@ type TailDrainedKey struct { func (x *TailDrainedKey) Reset() { *x = TailDrainedKey{} - mi := &file_s3_lifecycle_proto_msgTypes[8] + mi := &file_s3_lifecycle_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -918,7 +1171,7 @@ func (x *TailDrainedKey) String() string { func (*TailDrainedKey) ProtoMessage() {} func (x *TailDrainedKey) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[8] + mi := &file_s3_lifecycle_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -931,7 +1184,7 @@ func (x *TailDrainedKey) ProtoReflect() protoreflect.Message { // Deprecated: Use TailDrainedKey.ProtoReflect.Descriptor instead. func (*TailDrainedKey) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{8} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{10} } func (x *TailDrainedKey) GetStreamKind() StreamKind { @@ -988,7 +1241,7 @@ type BlockerRecord struct { func (x *BlockerRecord) Reset() { *x = BlockerRecord{} - mi := &file_s3_lifecycle_proto_msgTypes[9] + mi := &file_s3_lifecycle_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1000,7 +1253,7 @@ func (x *BlockerRecord) String() string { func (*BlockerRecord) ProtoMessage() {} func (x *BlockerRecord) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[9] + mi := &file_s3_lifecycle_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1013,7 +1266,7 @@ func (x *BlockerRecord) ProtoReflect() protoreflect.Message { // Deprecated: Use BlockerRecord.ProtoReflect.Descriptor instead. func (*BlockerRecord) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{9} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{11} } func (x *BlockerRecord) GetStreamKind() StreamKind { @@ -1131,7 +1384,7 @@ type StreamKey struct { func (x *StreamKey) Reset() { *x = StreamKey{} - mi := &file_s3_lifecycle_proto_msgTypes[10] + mi := &file_s3_lifecycle_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1143,7 +1396,7 @@ func (x *StreamKey) String() string { func (*StreamKey) ProtoMessage() {} func (x *StreamKey) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[10] + mi := &file_s3_lifecycle_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1156,7 +1409,7 @@ func (x *StreamKey) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamKey.ProtoReflect.Descriptor instead. func (*StreamKey) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{10} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{12} } func (x *StreamKey) GetKey() isStreamKey_Key { @@ -1241,7 +1494,7 @@ type OriginalKey struct { func (x *OriginalKey) Reset() { *x = OriginalKey{} - mi := &file_s3_lifecycle_proto_msgTypes[11] + mi := &file_s3_lifecycle_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1253,7 +1506,7 @@ func (x *OriginalKey) String() string { func (*OriginalKey) ProtoMessage() {} func (x *OriginalKey) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[11] + mi := &file_s3_lifecycle_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1266,7 +1519,7 @@ func (x *OriginalKey) ProtoReflect() protoreflect.Message { // Deprecated: Use OriginalKey.ProtoReflect.Descriptor instead. func (*OriginalKey) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{11} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{13} } func (x *OriginalKey) GetShard() string { @@ -1300,7 +1553,7 @@ type PredicateKey struct { func (x *PredicateKey) Reset() { *x = PredicateKey{} - mi := &file_s3_lifecycle_proto_msgTypes[12] + mi := &file_s3_lifecycle_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1312,7 +1565,7 @@ func (x *PredicateKey) String() string { func (*PredicateKey) ProtoMessage() {} func (x *PredicateKey) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[12] + mi := &file_s3_lifecycle_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1325,7 +1578,7 @@ func (x *PredicateKey) ProtoReflect() protoreflect.Message { // Deprecated: Use PredicateKey.ProtoReflect.Descriptor instead. func (*PredicateKey) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{12} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{14} } func (x *PredicateKey) GetShard() string { @@ -1355,7 +1608,7 @@ type BootstrapKey struct { func (x *BootstrapKey) Reset() { *x = BootstrapKey{} - mi := &file_s3_lifecycle_proto_msgTypes[13] + mi := &file_s3_lifecycle_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1367,7 +1620,7 @@ func (x *BootstrapKey) String() string { func (*BootstrapKey) ProtoMessage() {} func (x *BootstrapKey) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[13] + mi := &file_s3_lifecycle_proto_msgTypes[15] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1380,7 +1633,7 @@ func (x *BootstrapKey) ProtoReflect() protoreflect.Message { // Deprecated: Use BootstrapKey.ProtoReflect.Descriptor instead. func (*BootstrapKey) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{13} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{15} } func (x *BootstrapKey) GetBucket() string { @@ -1431,7 +1684,7 @@ type PendingKey struct { func (x *PendingKey) Reset() { *x = PendingKey{} - mi := &file_s3_lifecycle_proto_msgTypes[14] + mi := &file_s3_lifecycle_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1443,7 +1696,7 @@ func (x *PendingKey) String() string { func (*PendingKey) ProtoMessage() {} func (x *PendingKey) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[14] + mi := &file_s3_lifecycle_proto_msgTypes[16] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1456,7 +1709,7 @@ func (x *PendingKey) ProtoReflect() protoreflect.Message { // Deprecated: Use PendingKey.ProtoReflect.Descriptor instead. func (*PendingKey) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{14} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{16} } func (x *PendingKey) GetBucket() string { @@ -1509,7 +1762,7 @@ type RetryBudgetEntry struct { func (x *RetryBudgetEntry) Reset() { *x = RetryBudgetEntry{} - mi := &file_s3_lifecycle_proto_msgTypes[15] + mi := &file_s3_lifecycle_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1521,7 +1774,7 @@ func (x *RetryBudgetEntry) String() string { func (*RetryBudgetEntry) ProtoMessage() {} func (x *RetryBudgetEntry) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[15] + mi := &file_s3_lifecycle_proto_msgTypes[17] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1534,7 +1787,7 @@ func (x *RetryBudgetEntry) ProtoReflect() protoreflect.Message { // Deprecated: Use RetryBudgetEntry.ProtoReflect.Descriptor instead. func (*RetryBudgetEntry) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{15} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{17} } func (x *RetryBudgetEntry) GetKey() *StreamKey { @@ -1583,7 +1836,7 @@ type RetryTarget struct { func (x *RetryTarget) Reset() { *x = RetryTarget{} - mi := &file_s3_lifecycle_proto_msgTypes[16] + mi := &file_s3_lifecycle_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1595,7 +1848,7 @@ func (x *RetryTarget) String() string { func (*RetryTarget) ProtoMessage() {} func (x *RetryTarget) ProtoReflect() protoreflect.Message { - mi := &file_s3_lifecycle_proto_msgTypes[16] + mi := &file_s3_lifecycle_proto_msgTypes[18] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1608,7 +1861,7 @@ func (x *RetryTarget) ProtoReflect() protoreflect.Message { // Deprecated: Use RetryTarget.ProtoReflect.Descriptor instead. func (*RetryTarget) Descriptor() ([]byte, []int) { - return file_s3_lifecycle_proto_rawDescGZIP(), []int{16} + return file_s3_lifecycle_proto_rawDescGZIP(), []int{18} } func (x *RetryTarget) GetKey() *StreamKey { @@ -1657,7 +1910,26 @@ var File_s3_lifecycle_proto protoreflect.FileDescriptor const file_s3_lifecycle_proto_rawDesc = "" + "\n" + - "\x12s3_lifecycle.proto\x12\x0fs3_lifecycle_pb\">\n" + + "\x12s3_lifecycle.proto\x12\x0fs3_lifecycle_pb\"\x84\x04\n" + + "\x16LifecycleDeleteRequest\x12\x16\n" + + "\x06bucket\x18\x01 \x01(\tR\x06bucket\x12\x1f\n" + + "\vobject_path\x18\x02 \x01(\tR\n" + + "objectPath\x12\x1d\n" + + "\n" + + "version_id\x18\x03 \x01(\tR\tversionId\x12\x1b\n" + + "\trule_hash\x18\x04 \x01(\fR\bruleHash\x12<\n" + + "\vaction_kind\x18\x05 \x01(\x0e2\x1b.s3_lifecycle_pb.ActionKindR\n" + + "actionKind\x12!\n" + + "\fstream_shard\x18\n" + + " \x01(\tR\vstreamShard\x120\n" + + "\x14stream_delay_seconds\x18\v \x01(\x03R\x12streamDelaySeconds\x121\n" + + "\x15stream_position_ts_ns\x18\f \x01(\x03R\x12streamPositionTsNs\x124\n" + + "\x16stream_position_offset\x18\r \x01(\x03R\x14streamPositionOffset\x12K\n" + + "\x11expected_identity\x18\x14 \x01(\v2\x1e.s3_lifecycle_pb.EntryIdentityR\x10expectedIdentity\x12,\n" + + "\x12engine_snapshot_id\x18\x15 \x01(\x04R\x10engineSnapshotId\"t\n" + + "\x17LifecycleDeleteResponse\x12A\n" + + "\aoutcome\x18\x01 \x01(\x0e2'.s3_lifecycle_pb.LifecycleDeleteOutcomeR\aoutcome\x12\x16\n" + + "\x06reason\x18\x02 \x01(\tR\x06reason\">\n" + "\x0fMessagePosition\x12\x13\n" + "\x05ts_ns\x18\x01 \x01(\x03R\x04tsNs\x12\x16\n" + "\x06offset\x18\x02 \x01(\x03R\x06offset\"k\n" + @@ -1807,14 +2079,23 @@ const file_s3_lifecycle_proto_rawDesc = "" + "\x0fNONCURRENT_DAYS\x10\x03\x12\x14\n" + "\x10NEWER_NONCURRENT\x10\x04\x12\r\n" + "\tABORT_MPU\x10\x05\x12\x19\n" + - "\x15EXPIRED_DELETE_MARKER\x10\x06*b\n" + + "\x15EXPIRED_DELETE_MARKER\x10\x06*\x96\x01\n" + + "\x16LifecycleDeleteOutcome\x12(\n" + + "$LIFECYCLE_DELETE_OUTCOME_UNSPECIFIED\x10\x00\x12\b\n" + + "\x04DONE\x10\x01\x12\x11\n" + + "\rNOOP_RESOLVED\x10\x02\x12\x17\n" + + "\x13SKIPPED_OBJECT_LOCK\x10\x03\x12\x0f\n" + + "\vRETRY_LATER\x10\x04\x12\v\n" + + "\aBLOCKED\x10\x05*b\n" + "\n" + "StreamKind\x12\x1b\n" + "\x17STREAM_KIND_UNSPECIFIED\x10\x00\x12\f\n" + "\bORIGINAL\x10\x01\x12\r\n" + "\tPREDICATE\x10\x02\x12\r\n" + "\tBOOTSTRAP\x10\x03\x12\v\n" + - "\aPENDING\x10\x04B\\\n" + + "\aPENDING\x10\x042\x82\x01\n" + + "\x1aSeaweedS3LifecycleInternal\x12d\n" + + "\x0fLifecycleDelete\x12'.s3_lifecycle_pb.LifecycleDeleteRequest\x1a(.s3_lifecycle_pb.LifecycleDeleteResponseB\\\n" + "\x10seaweedfs.clientB\x10S3LifecycleProtoZ6github.com/seaweedfs/seaweedfs/weed/pb/s3_lifecycle_pbb\x06proto3" var ( @@ -1829,63 +2110,71 @@ func file_s3_lifecycle_proto_rawDescGZIP() []byte { return file_s3_lifecycle_proto_rawDescData } -var file_s3_lifecycle_proto_enumTypes = make([]protoimpl.EnumInfo, 4) -var file_s3_lifecycle_proto_msgTypes = make([]protoimpl.MessageInfo, 18) +var file_s3_lifecycle_proto_enumTypes = make([]protoimpl.EnumInfo, 5) +var file_s3_lifecycle_proto_msgTypes = make([]protoimpl.MessageInfo, 20) var file_s3_lifecycle_proto_goTypes = []any{ (ActionKind)(0), // 0: s3_lifecycle_pb.ActionKind - (StreamKind)(0), // 1: s3_lifecycle_pb.StreamKind - (LifecycleState_RuleMode)(0), // 2: s3_lifecycle_pb.LifecycleState.RuleMode - (LifecycleState_DegradedReason)(0), // 3: s3_lifecycle_pb.LifecycleState.DegradedReason - (*MessagePosition)(nil), // 4: s3_lifecycle_pb.MessagePosition - (*FilerShardCursor)(nil), // 5: s3_lifecycle_pb.FilerShardCursor - (*EntryIdentity)(nil), // 6: s3_lifecycle_pb.EntryIdentity - (*LifecycleState)(nil), // 7: s3_lifecycle_pb.LifecycleState - (*PendingItem)(nil), // 8: s3_lifecycle_pb.PendingItem - (*BootstrapState)(nil), // 9: s3_lifecycle_pb.BootstrapState - (*ReaderState)(nil), // 10: s3_lifecycle_pb.ReaderState - (*FilerShardCursorList)(nil), // 11: s3_lifecycle_pb.FilerShardCursorList - (*TailDrainedKey)(nil), // 12: s3_lifecycle_pb.TailDrainedKey - (*BlockerRecord)(nil), // 13: s3_lifecycle_pb.BlockerRecord - (*StreamKey)(nil), // 14: s3_lifecycle_pb.StreamKey - (*OriginalKey)(nil), // 15: s3_lifecycle_pb.OriginalKey - (*PredicateKey)(nil), // 16: s3_lifecycle_pb.PredicateKey - (*BootstrapKey)(nil), // 17: s3_lifecycle_pb.BootstrapKey - (*PendingKey)(nil), // 18: s3_lifecycle_pb.PendingKey - (*RetryBudgetEntry)(nil), // 19: s3_lifecycle_pb.RetryBudgetEntry - (*RetryTarget)(nil), // 20: s3_lifecycle_pb.RetryTarget - nil, // 21: s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry + (LifecycleDeleteOutcome)(0), // 1: s3_lifecycle_pb.LifecycleDeleteOutcome + (StreamKind)(0), // 2: s3_lifecycle_pb.StreamKind + (LifecycleState_RuleMode)(0), // 3: s3_lifecycle_pb.LifecycleState.RuleMode + (LifecycleState_DegradedReason)(0), // 4: s3_lifecycle_pb.LifecycleState.DegradedReason + (*LifecycleDeleteRequest)(nil), // 5: s3_lifecycle_pb.LifecycleDeleteRequest + (*LifecycleDeleteResponse)(nil), // 6: s3_lifecycle_pb.LifecycleDeleteResponse + (*MessagePosition)(nil), // 7: s3_lifecycle_pb.MessagePosition + (*FilerShardCursor)(nil), // 8: s3_lifecycle_pb.FilerShardCursor + (*EntryIdentity)(nil), // 9: s3_lifecycle_pb.EntryIdentity + (*LifecycleState)(nil), // 10: s3_lifecycle_pb.LifecycleState + (*PendingItem)(nil), // 11: s3_lifecycle_pb.PendingItem + (*BootstrapState)(nil), // 12: s3_lifecycle_pb.BootstrapState + (*ReaderState)(nil), // 13: s3_lifecycle_pb.ReaderState + (*FilerShardCursorList)(nil), // 14: s3_lifecycle_pb.FilerShardCursorList + (*TailDrainedKey)(nil), // 15: s3_lifecycle_pb.TailDrainedKey + (*BlockerRecord)(nil), // 16: s3_lifecycle_pb.BlockerRecord + (*StreamKey)(nil), // 17: s3_lifecycle_pb.StreamKey + (*OriginalKey)(nil), // 18: s3_lifecycle_pb.OriginalKey + (*PredicateKey)(nil), // 19: s3_lifecycle_pb.PredicateKey + (*BootstrapKey)(nil), // 20: s3_lifecycle_pb.BootstrapKey + (*PendingKey)(nil), // 21: s3_lifecycle_pb.PendingKey + (*RetryBudgetEntry)(nil), // 22: s3_lifecycle_pb.RetryBudgetEntry + (*RetryTarget)(nil), // 23: s3_lifecycle_pb.RetryTarget + nil, // 24: s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry } var file_s3_lifecycle_proto_depIdxs = []int32{ - 4, // 0: s3_lifecycle_pb.FilerShardCursor.position:type_name -> s3_lifecycle_pb.MessagePosition - 0, // 1: s3_lifecycle_pb.LifecycleState.action_kind:type_name -> s3_lifecycle_pb.ActionKind - 2, // 2: s3_lifecycle_pb.LifecycleState.mode:type_name -> s3_lifecycle_pb.LifecycleState.RuleMode - 3, // 3: s3_lifecycle_pb.LifecycleState.degraded_reason:type_name -> s3_lifecycle_pb.LifecycleState.DegradedReason - 6, // 4: s3_lifecycle_pb.PendingItem.expected_identity:type_name -> s3_lifecycle_pb.EntryIdentity - 21, // 5: s3_lifecycle_pb.ReaderState.last_processed_original:type_name -> s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry - 5, // 6: s3_lifecycle_pb.ReaderState.last_processed_predicate:type_name -> s3_lifecycle_pb.FilerShardCursor - 12, // 7: s3_lifecycle_pb.ReaderState.tail_drained_streams:type_name -> s3_lifecycle_pb.TailDrainedKey - 5, // 8: s3_lifecycle_pb.FilerShardCursorList.cursors:type_name -> s3_lifecycle_pb.FilerShardCursor - 1, // 9: s3_lifecycle_pb.TailDrainedKey.stream_kind:type_name -> s3_lifecycle_pb.StreamKind - 1, // 10: s3_lifecycle_pb.BlockerRecord.stream_kind:type_name -> s3_lifecycle_pb.StreamKind - 4, // 11: s3_lifecycle_pb.BlockerRecord.position:type_name -> s3_lifecycle_pb.MessagePosition - 0, // 12: s3_lifecycle_pb.BlockerRecord.action_kind:type_name -> s3_lifecycle_pb.ActionKind - 15, // 13: s3_lifecycle_pb.StreamKey.original:type_name -> s3_lifecycle_pb.OriginalKey - 16, // 14: s3_lifecycle_pb.StreamKey.predicate:type_name -> s3_lifecycle_pb.PredicateKey - 17, // 15: s3_lifecycle_pb.StreamKey.bootstrap:type_name -> s3_lifecycle_pb.BootstrapKey - 18, // 16: s3_lifecycle_pb.StreamKey.pending:type_name -> s3_lifecycle_pb.PendingKey - 4, // 17: s3_lifecycle_pb.OriginalKey.position:type_name -> s3_lifecycle_pb.MessagePosition - 4, // 18: s3_lifecycle_pb.PredicateKey.position:type_name -> s3_lifecycle_pb.MessagePosition - 0, // 19: s3_lifecycle_pb.BootstrapKey.action_kind:type_name -> s3_lifecycle_pb.ActionKind - 0, // 20: s3_lifecycle_pb.PendingKey.action_kind:type_name -> s3_lifecycle_pb.ActionKind - 14, // 21: s3_lifecycle_pb.RetryBudgetEntry.key:type_name -> s3_lifecycle_pb.StreamKey - 14, // 22: s3_lifecycle_pb.RetryTarget.key:type_name -> s3_lifecycle_pb.StreamKey - 0, // 23: s3_lifecycle_pb.RetryTarget.action_kind:type_name -> s3_lifecycle_pb.ActionKind - 11, // 24: s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry.value:type_name -> s3_lifecycle_pb.FilerShardCursorList - 25, // [25:25] is the sub-list for method output_type - 25, // [25:25] is the sub-list for method input_type - 25, // [25:25] is the sub-list for extension type_name - 25, // [25:25] is the sub-list for extension extendee - 0, // [0:25] is the sub-list for field type_name + 0, // 0: s3_lifecycle_pb.LifecycleDeleteRequest.action_kind:type_name -> s3_lifecycle_pb.ActionKind + 9, // 1: s3_lifecycle_pb.LifecycleDeleteRequest.expected_identity:type_name -> s3_lifecycle_pb.EntryIdentity + 1, // 2: s3_lifecycle_pb.LifecycleDeleteResponse.outcome:type_name -> s3_lifecycle_pb.LifecycleDeleteOutcome + 7, // 3: s3_lifecycle_pb.FilerShardCursor.position:type_name -> s3_lifecycle_pb.MessagePosition + 0, // 4: s3_lifecycle_pb.LifecycleState.action_kind:type_name -> s3_lifecycle_pb.ActionKind + 3, // 5: s3_lifecycle_pb.LifecycleState.mode:type_name -> s3_lifecycle_pb.LifecycleState.RuleMode + 4, // 6: s3_lifecycle_pb.LifecycleState.degraded_reason:type_name -> s3_lifecycle_pb.LifecycleState.DegradedReason + 9, // 7: s3_lifecycle_pb.PendingItem.expected_identity:type_name -> s3_lifecycle_pb.EntryIdentity + 24, // 8: s3_lifecycle_pb.ReaderState.last_processed_original:type_name -> s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry + 8, // 9: s3_lifecycle_pb.ReaderState.last_processed_predicate:type_name -> s3_lifecycle_pb.FilerShardCursor + 15, // 10: s3_lifecycle_pb.ReaderState.tail_drained_streams:type_name -> s3_lifecycle_pb.TailDrainedKey + 8, // 11: s3_lifecycle_pb.FilerShardCursorList.cursors:type_name -> s3_lifecycle_pb.FilerShardCursor + 2, // 12: s3_lifecycle_pb.TailDrainedKey.stream_kind:type_name -> s3_lifecycle_pb.StreamKind + 2, // 13: s3_lifecycle_pb.BlockerRecord.stream_kind:type_name -> s3_lifecycle_pb.StreamKind + 7, // 14: s3_lifecycle_pb.BlockerRecord.position:type_name -> s3_lifecycle_pb.MessagePosition + 0, // 15: s3_lifecycle_pb.BlockerRecord.action_kind:type_name -> s3_lifecycle_pb.ActionKind + 18, // 16: s3_lifecycle_pb.StreamKey.original:type_name -> s3_lifecycle_pb.OriginalKey + 19, // 17: s3_lifecycle_pb.StreamKey.predicate:type_name -> s3_lifecycle_pb.PredicateKey + 20, // 18: s3_lifecycle_pb.StreamKey.bootstrap:type_name -> s3_lifecycle_pb.BootstrapKey + 21, // 19: s3_lifecycle_pb.StreamKey.pending:type_name -> s3_lifecycle_pb.PendingKey + 7, // 20: s3_lifecycle_pb.OriginalKey.position:type_name -> s3_lifecycle_pb.MessagePosition + 7, // 21: s3_lifecycle_pb.PredicateKey.position:type_name -> s3_lifecycle_pb.MessagePosition + 0, // 22: s3_lifecycle_pb.BootstrapKey.action_kind:type_name -> s3_lifecycle_pb.ActionKind + 0, // 23: s3_lifecycle_pb.PendingKey.action_kind:type_name -> s3_lifecycle_pb.ActionKind + 17, // 24: s3_lifecycle_pb.RetryBudgetEntry.key:type_name -> s3_lifecycle_pb.StreamKey + 17, // 25: s3_lifecycle_pb.RetryTarget.key:type_name -> s3_lifecycle_pb.StreamKey + 0, // 26: s3_lifecycle_pb.RetryTarget.action_kind:type_name -> s3_lifecycle_pb.ActionKind + 14, // 27: s3_lifecycle_pb.ReaderState.LastProcessedOriginalEntry.value:type_name -> s3_lifecycle_pb.FilerShardCursorList + 5, // 28: s3_lifecycle_pb.SeaweedS3LifecycleInternal.LifecycleDelete:input_type -> s3_lifecycle_pb.LifecycleDeleteRequest + 6, // 29: s3_lifecycle_pb.SeaweedS3LifecycleInternal.LifecycleDelete:output_type -> s3_lifecycle_pb.LifecycleDeleteResponse + 29, // [29:30] is the sub-list for method output_type + 28, // [28:29] is the sub-list for method input_type + 28, // [28:28] is the sub-list for extension type_name + 28, // [28:28] is the sub-list for extension extendee + 0, // [0:28] is the sub-list for field type_name } func init() { file_s3_lifecycle_proto_init() } @@ -1893,7 +2182,7 @@ func file_s3_lifecycle_proto_init() { if File_s3_lifecycle_proto != nil { return } - file_s3_lifecycle_proto_msgTypes[10].OneofWrappers = []any{ + file_s3_lifecycle_proto_msgTypes[12].OneofWrappers = []any{ (*StreamKey_Original)(nil), (*StreamKey_Predicate)(nil), (*StreamKey_Bootstrap)(nil), @@ -1904,10 +2193,10 @@ func file_s3_lifecycle_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_s3_lifecycle_proto_rawDesc), len(file_s3_lifecycle_proto_rawDesc)), - NumEnums: 4, - NumMessages: 18, + NumEnums: 5, + NumMessages: 20, NumExtensions: 0, - NumServices: 0, + NumServices: 1, }, GoTypes: file_s3_lifecycle_proto_goTypes, DependencyIndexes: file_s3_lifecycle_proto_depIdxs, diff --git a/weed/pb/s3_lifecycle_pb/s3_lifecycle_grpc.pb.go b/weed/pb/s3_lifecycle_pb/s3_lifecycle_grpc.pb.go new file mode 100644 index 000000000..d773deb62 --- /dev/null +++ b/weed/pb/s3_lifecycle_pb/s3_lifecycle_grpc.pb.go @@ -0,0 +1,136 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.33.4 +// source: s3_lifecycle.proto + +package s3_lifecycle_pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + SeaweedS3LifecycleInternal_LifecycleDelete_FullMethodName = "/s3_lifecycle_pb.SeaweedS3LifecycleInternal/LifecycleDelete" +) + +// SeaweedS3LifecycleInternalClient is the client API for SeaweedS3LifecycleInternal service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// SeaweedS3LifecycleInternal is the worker-to-S3 service that performs the +// actual lifecycle deletion. The lifecycle worker computes the (rule, action) +// verdict locally; the S3 server is the only component allowed to mutate the +// filer state, so it gets the final word: it re-fetches the live entry, +// verifies the EntryIdentity CAS, runs object-lock protections, and dispatches +// to the appropriate internal helper. +type SeaweedS3LifecycleInternalClient interface { + LifecycleDelete(ctx context.Context, in *LifecycleDeleteRequest, opts ...grpc.CallOption) (*LifecycleDeleteResponse, error) +} + +type seaweedS3LifecycleInternalClient struct { + cc grpc.ClientConnInterface +} + +func NewSeaweedS3LifecycleInternalClient(cc grpc.ClientConnInterface) SeaweedS3LifecycleInternalClient { + return &seaweedS3LifecycleInternalClient{cc} +} + +func (c *seaweedS3LifecycleInternalClient) LifecycleDelete(ctx context.Context, in *LifecycleDeleteRequest, opts ...grpc.CallOption) (*LifecycleDeleteResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(LifecycleDeleteResponse) + err := c.cc.Invoke(ctx, SeaweedS3LifecycleInternal_LifecycleDelete_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SeaweedS3LifecycleInternalServer is the server API for SeaweedS3LifecycleInternal service. +// All implementations must embed UnimplementedSeaweedS3LifecycleInternalServer +// for forward compatibility. +// +// SeaweedS3LifecycleInternal is the worker-to-S3 service that performs the +// actual lifecycle deletion. The lifecycle worker computes the (rule, action) +// verdict locally; the S3 server is the only component allowed to mutate the +// filer state, so it gets the final word: it re-fetches the live entry, +// verifies the EntryIdentity CAS, runs object-lock protections, and dispatches +// to the appropriate internal helper. +type SeaweedS3LifecycleInternalServer interface { + LifecycleDelete(context.Context, *LifecycleDeleteRequest) (*LifecycleDeleteResponse, error) + mustEmbedUnimplementedSeaweedS3LifecycleInternalServer() +} + +// UnimplementedSeaweedS3LifecycleInternalServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedSeaweedS3LifecycleInternalServer struct{} + +func (UnimplementedSeaweedS3LifecycleInternalServer) LifecycleDelete(context.Context, *LifecycleDeleteRequest) (*LifecycleDeleteResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method LifecycleDelete not implemented") +} +func (UnimplementedSeaweedS3LifecycleInternalServer) mustEmbedUnimplementedSeaweedS3LifecycleInternalServer() { +} +func (UnimplementedSeaweedS3LifecycleInternalServer) testEmbeddedByValue() {} + +// UnsafeSeaweedS3LifecycleInternalServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SeaweedS3LifecycleInternalServer will +// result in compilation errors. +type UnsafeSeaweedS3LifecycleInternalServer interface { + mustEmbedUnimplementedSeaweedS3LifecycleInternalServer() +} + +func RegisterSeaweedS3LifecycleInternalServer(s grpc.ServiceRegistrar, srv SeaweedS3LifecycleInternalServer) { + // If the following call pancis, it indicates UnimplementedSeaweedS3LifecycleInternalServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&SeaweedS3LifecycleInternal_ServiceDesc, srv) +} + +func _SeaweedS3LifecycleInternal_LifecycleDelete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LifecycleDeleteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SeaweedS3LifecycleInternalServer).LifecycleDelete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SeaweedS3LifecycleInternal_LifecycleDelete_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SeaweedS3LifecycleInternalServer).LifecycleDelete(ctx, req.(*LifecycleDeleteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// SeaweedS3LifecycleInternal_ServiceDesc is the grpc.ServiceDesc for SeaweedS3LifecycleInternal service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SeaweedS3LifecycleInternal_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "s3_lifecycle_pb.SeaweedS3LifecycleInternal", + HandlerType: (*SeaweedS3LifecycleInternalServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "LifecycleDelete", + Handler: _SeaweedS3LifecycleInternal_LifecycleDelete_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "s3_lifecycle.proto", +} diff --git a/weed/s3api/s3api_internal_lifecycle.go b/weed/s3api/s3api_internal_lifecycle.go new file mode 100644 index 000000000..87d94882a --- /dev/null +++ b/weed/s3api/s3api_internal_lifecycle.go @@ -0,0 +1,200 @@ +package s3api + +import ( + "bytes" + "context" + "crypto/sha256" + "errors" + "sort" + "strconv" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/s3_lifecycle_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" +) + +// LifecycleDelete executes one (rule, action) verdict: re-fetch, identity +// CAS, object-lock check, dispatch by kind. Errors surface as outcomes; +// reader cursors and pending state are the worker's concern. +func (s3a *S3ApiServer) LifecycleDelete(ctx context.Context, req *s3_lifecycle_pb.LifecycleDeleteRequest) (*s3_lifecycle_pb.LifecycleDeleteResponse, error) { + if req == nil || req.Bucket == "" || req.ObjectPath == "" { + return blocked("FATAL_EVENT_ERROR: empty bucket or object_path"), nil + } + + // MPU init lives at .uploads//; not handled by getObjectEntry. + if req.ActionKind == s3_lifecycle_pb.ActionKind_ABORT_MPU { + return s3a.lifecycleAbortMPU(ctx, req) + } + + entry, err := s3a.getObjectEntry(req.Bucket, req.ObjectPath, req.VersionId) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrObjectNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrLatestVersionNotFound) { + return noopResolved("NOT_FOUND"), nil + } + glog.V(1).Infof("lifecycle: live fetch %s/%s@%s: %v", req.Bucket, req.ObjectPath, req.VersionId, err) + return retryLater("TRANSPORT_ERROR: " + err.Error()), nil + } + + live := computeEntryIdentity(entry) + if !identityMatches(live, req.ExpectedIdentity) { + return noopResolved("STALE_IDENTITY"), nil + } + + // Lifecycle never bypasses governance/compliance; the http.Request is + // only read when bypass is allowed, so nil is safe here. + if err := s3a.enforceObjectLockProtections(nil, req.Bucket, req.ObjectPath, req.VersionId, false); err != nil { + glog.V(2).Infof("lifecycle: SKIPPED_OBJECT_LOCK %s/%s@%s: %v", req.Bucket, req.ObjectPath, req.VersionId, err) + return &s3_lifecycle_pb.LifecycleDeleteResponse{ + Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_SKIPPED_OBJECT_LOCK, + Reason: err.Error(), + }, nil + } + + return s3a.lifecycleDispatch(ctx, req, entry) +} + +func (s3a *S3ApiServer) lifecycleDispatch(ctx context.Context, req *s3_lifecycle_pb.LifecycleDeleteRequest, entry *filer_pb.Entry) (*s3_lifecycle_pb.LifecycleDeleteResponse, error) { + switch req.ActionKind { + case s3_lifecycle_pb.ActionKind_EXPIRATION_DAYS, s3_lifecycle_pb.ActionKind_EXPIRATION_DATE: + // Current-version expiration: Enabled -> delete marker; Suspended + // -> delete null + new marker; Off -> remove. Filer errors classify + // as RETRY_LATER; the worker's budget promotes to BLOCKED. + state, vErr := s3a.getVersioningState(req.Bucket) + if vErr != nil { + if errors.Is(vErr, filer_pb.ErrNotFound) { + return noopResolved("BUCKET_NOT_FOUND"), nil + } + return retryLater("TRANSPORT_ERROR: versioning lookup: " + vErr.Error()), nil + } + switch state { + case s3_constants.VersioningEnabled: + if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath); err != nil { + return retryLater("TRANSPORT_ERROR: createDeleteMarker: " + err.Error()), nil + } + return done(), nil + case s3_constants.VersioningSuspended: + // Best-effort null delete; NotFound is benign. + if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, "null"); err != nil { + if !errors.Is(err, filer_pb.ErrNotFound) && !errors.Is(err, ErrVersionNotFound) { + return retryLater("TRANSPORT_ERROR: deleteNullVersion: " + err.Error()), nil + } + } + if _, err := s3a.createDeleteMarker(req.Bucket, req.ObjectPath); err != nil { + return retryLater("TRANSPORT_ERROR: createDeleteMarker: " + err.Error()), nil + } + return done(), nil + default: + err := s3a.WithFilerClient(false, func(c filer_pb.SeaweedFilerClient) error { + return s3a.deleteUnversionedObjectWithClient(c, req.Bucket, req.ObjectPath) + }) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrObjectNotFound) { + return noopResolved("NOT_FOUND_AT_DELETE"), nil + } + return retryLater("TRANSPORT_ERROR: deleteUnversioned: " + err.Error()), nil + } + return done(), nil + } + + case s3_lifecycle_pb.ActionKind_NONCURRENT_DAYS, + s3_lifecycle_pb.ActionKind_NEWER_NONCURRENT, + s3_lifecycle_pb.ActionKind_EXPIRED_DELETE_MARKER: + // EXPIRED_DELETE_MARKER targets the marker version itself. + if req.VersionId == "" { + return blocked("FATAL_EVENT_ERROR: version_id required for noncurrent / delete-marker delete"), nil + } + if err := s3a.deleteSpecificObjectVersion(req.Bucket, req.ObjectPath, req.VersionId); err != nil { + if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrObjectNotFound) { + return noopResolved("NOT_FOUND_AT_DELETE"), nil + } + return retryLater("TRANSPORT_ERROR: deleteSpecificVersion: " + err.Error()), nil + } + return done(), nil + + case s3_lifecycle_pb.ActionKind_ABORT_MPU: + return blocked("FATAL_EVENT_ERROR: ABORT_MPU dispatched after fetch"), nil + + default: + return blocked("FATAL_EVENT_ERROR: unknown action_kind " + req.ActionKind.String()), nil + } +} + +func (s3a *S3ApiServer) lifecycleAbortMPU(ctx context.Context, req *s3_lifecycle_pb.LifecycleDeleteRequest) (*s3_lifecycle_pb.LifecycleDeleteResponse, error) { + // TODO(phase-5): plumb to abortMultipartUpload (currently expects an + // *s3.AbortMultipartUploadInput; lifecycle has no HTTP request). + return retryLater("ABORT_MPU not yet wired"), nil +} + +// computeEntryIdentity captures (mtime, size, head fid, sorted-Extended hash): +// an overwrite changes mtime/size/fid; a metadata edit changes Extended; a +// snapshot-restore that preserves mtime+size still differs in head_fid. +func computeEntryIdentity(entry *filer_pb.Entry) *s3_lifecycle_pb.EntryIdentity { + if entry == nil { + return nil + } + id := &s3_lifecycle_pb.EntryIdentity{} + if entry.Attributes != nil { + id.MtimeNs = entry.Attributes.Mtime + id.Size = int64(entry.Attributes.FileSize) + } + if len(entry.GetChunks()) > 0 { + id.HeadFid = entry.GetChunks()[0].FileId + } + id.ExtendedHash = hashExtended(entry.Extended) + return id +} + +// hashExtended is length-prefixed; a forged tag value can't collide with a +// legitimate multi-tag map. +func hashExtended(ext map[string][]byte) []byte { + if len(ext) == 0 { + return nil + } + keys := make([]string, 0, len(ext)) + for k := range ext { + keys = append(keys, k) + } + sort.Strings(keys) + h := sha256.New() + for _, k := range keys { + h.Write([]byte(strconv.Itoa(len(k)))) + h.Write([]byte{':'}) + h.Write([]byte(k)) + v := ext[k] + h.Write([]byte(strconv.Itoa(len(v)))) + h.Write([]byte{':'}) + h.Write(v) + } + return h.Sum(nil) +} + +func identityMatches(live, want *s3_lifecycle_pb.EntryIdentity) bool { + if want == nil { + // No CAS witness (early bootstrap); skip. + return true + } + if live == nil { + return false + } + if live.MtimeNs != want.MtimeNs || live.Size != want.Size { + return false + } + if live.HeadFid != want.HeadFid { + return false + } + return bytes.Equal(live.ExtendedHash, want.ExtendedHash) +} + +func done() *s3_lifecycle_pb.LifecycleDeleteResponse { + return &s3_lifecycle_pb.LifecycleDeleteResponse{Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_DONE} +} +func noopResolved(reason string) *s3_lifecycle_pb.LifecycleDeleteResponse { + return &s3_lifecycle_pb.LifecycleDeleteResponse{Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_NOOP_RESOLVED, Reason: reason} +} +func blocked(reason string) *s3_lifecycle_pb.LifecycleDeleteResponse { + return &s3_lifecycle_pb.LifecycleDeleteResponse{Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_BLOCKED, Reason: reason} +} +func retryLater(reason string) *s3_lifecycle_pb.LifecycleDeleteResponse { + return &s3_lifecycle_pb.LifecycleDeleteResponse{Outcome: s3_lifecycle_pb.LifecycleDeleteOutcome_RETRY_LATER, Reason: reason} +} diff --git a/weed/s3api/s3api_internal_lifecycle_test.go b/weed/s3api/s3api_internal_lifecycle_test.go new file mode 100644 index 000000000..0b5a3b53a --- /dev/null +++ b/weed/s3api/s3api_internal_lifecycle_test.go @@ -0,0 +1,117 @@ +package s3api + +import ( + "bytes" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/s3_lifecycle_pb" +) + +func TestComputeEntryIdentity_BasicFields(t *testing.T) { + entry := &filer_pb.Entry{ + Attributes: &filer_pb.FuseAttributes{Mtime: 1700000000, FileSize: 4096}, + Chunks: []*filer_pb.FileChunk{ + {FileId: "1,abc"}, + {FileId: "1,def"}, + }, + } + id := computeEntryIdentity(entry) + if id.MtimeNs != 1700000000 { + t.Fatalf("MtimeNs want 1700000000, got %d", id.MtimeNs) + } + if id.Size != 4096 { + t.Fatalf("Size want 4096, got %d", id.Size) + } + if id.HeadFid != "1,abc" { + t.Fatalf("HeadFid want 1,abc, got %s", id.HeadFid) + } +} + +func TestComputeEntryIdentity_NilSafeMissingChunks(t *testing.T) { + if got := computeEntryIdentity(nil); got != nil { + t.Fatalf("nil entry should return nil, got %v", got) + } + id := computeEntryIdentity(&filer_pb.Entry{}) + if id == nil { + t.Fatalf("entry with nil Attributes should still produce identity") + } + if id.HeadFid != "" { + t.Fatalf("missing chunks should yield empty HeadFid, got %s", id.HeadFid) + } +} + +func TestHashExtended_OrderStable(t *testing.T) { + a := map[string][]byte{"k1": []byte("v1"), "k2": []byte("v2")} + b := map[string][]byte{"k2": []byte("v2"), "k1": []byte("v1")} + if !bytes.Equal(hashExtended(a), hashExtended(b)) { + t.Fatalf("hash should be insensitive to map iteration order") + } +} + +func TestHashExtended_DelimiterCollisionResistant(t *testing.T) { + // Naively concatenated: "k1=v1k2v2" could collide with "k1=v1k" / "2v2". + // Length-prefix encoding must keep them apart. + a := map[string][]byte{"k1": []byte("v1"), "k2": []byte("v2")} + b := map[string][]byte{"k1": []byte("v1k2v2")} + if bytes.Equal(hashExtended(a), hashExtended(b)) { + t.Fatalf("delimiter-forged Extended payloads must not collide") + } +} + +func TestHashExtended_NilEqualsEmpty(t *testing.T) { + if got := hashExtended(nil); len(got) != 0 { + t.Fatalf("nil should produce zero-length hash, got %d bytes", len(got)) + } + if got := hashExtended(map[string][]byte{}); len(got) != 0 { + t.Fatalf("empty map should produce zero-length hash, got %d bytes", len(got)) + } +} + +func TestIdentityMatches_NilWantTreatedAsMatch(t *testing.T) { + // Bootstrap callers that don't yet have an identity to CAS against + // pass nil expected_identity; the server treats this as "no CAS". + live := &s3_lifecycle_pb.EntryIdentity{MtimeNs: 1, Size: 2} + if !identityMatches(live, nil) { + t.Fatalf("nil want should match") + } +} + +func TestIdentityMatches_NilLiveDoesNotMatch(t *testing.T) { + if identityMatches(nil, &s3_lifecycle_pb.EntryIdentity{MtimeNs: 1}) { + t.Fatalf("nil live should not match a populated want") + } +} + +func TestIdentityMatches_AllFieldsCompared(t *testing.T) { + base := &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2048, HeadFid: "1,abc", ExtendedHash: []byte{0x01, 0x02}} + cases := []struct { + name string + live *s3_lifecycle_pb.EntryIdentity + want bool + }{ + {"identical", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2048, HeadFid: "1,abc", ExtendedHash: []byte{0x01, 0x02}}, true}, + {"mtime-drift", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 101, Size: 2048, HeadFid: "1,abc", ExtendedHash: []byte{0x01, 0x02}}, false}, + {"size-drift", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2049, HeadFid: "1,abc", ExtendedHash: []byte{0x01, 0x02}}, false}, + {"fid-drift", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2048, HeadFid: "1,xyz", ExtendedHash: []byte{0x01, 0x02}}, false}, + {"extended-drift", &s3_lifecycle_pb.EntryIdentity{MtimeNs: 100, Size: 2048, HeadFid: "1,abc", ExtendedHash: []byte{0x03, 0x04}}, false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := identityMatches(c.live, base); got != c.want { + t.Fatalf("want %v, got %v", c.want, got) + } + }) + } +} + +func TestLifecycleDelete_RejectsEmptyRequest(t *testing.T) { + s := &S3ApiServer{} + resp, err := s.LifecycleDelete(nil, &s3_lifecycle_pb.LifecycleDeleteRequest{}) + if err != nil { + t.Fatalf("unexpected gRPC error: %v", err) + } + if resp.Outcome != s3_lifecycle_pb.LifecycleDeleteOutcome_BLOCKED { + t.Fatalf("empty request should be BLOCKED, got %v", resp.Outcome) + } +} diff --git a/weed/s3api/s3api_lifecycle_canonical.go b/weed/s3api/s3api_lifecycle_canonical.go index a0e4ac5a6..adf72fb2b 100644 --- a/weed/s3api/s3api_lifecycle_canonical.go +++ b/weed/s3api/s3api_lifecycle_canonical.go @@ -4,17 +4,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" ) -// LifecycleToCanonical converts the XML-deserialized Lifecycle into the flat -// s3lifecycle.Rule shape the engine compiles against. One XML maps to -// exactly one s3lifecycle.Rule (with potentially multiple action sub-fields -// populated); the engine then expands each into its compiled actions via -// s3lifecycle.RuleActionKinds. -// -// Filter resolution mirrors AWS semantics: the optional element may -// contain a single | | , or be absent (in which case the -// older top-level applies). When is used, its sub-elements -// (Prefix + multiple Tags + size filters) are flattened into the canonical -// Rule's individual fields. +// LifecycleToCanonical flattens the XML-deserialized Lifecycle into the +// engine's flat Rule shape. The optional element may contain +// | | , or be absent (in which case the older top-level +// applies). func LifecycleToCanonical(lc *Lifecycle) []*s3lifecycle.Rule { if lc == nil { return nil @@ -32,7 +25,6 @@ func ruleToCanonical(r *Rule) *s3lifecycle.Rule { Status: string(r.Status), } - // Prefix: or or top-level . prefix, tags, sizeGT, sizeLT := flattenFilter(&r.Filter) if prefix == "" && r.Prefix.set { prefix = r.Prefix.val @@ -44,7 +36,6 @@ func ruleToCanonical(r *Rule) *s3lifecycle.Rule { out.FilterSizeGreaterThan = sizeGT out.FilterSizeLessThan = sizeLT - // Expiration sub-fields. if r.Expiration.set { out.ExpirationDays = r.Expiration.Days if !r.Expiration.Date.Time.IsZero() { @@ -55,13 +46,11 @@ func ruleToCanonical(r *Rule) *s3lifecycle.Rule { } } - // Non-current version expiration. if r.NoncurrentVersionExpiration.set { out.NoncurrentVersionExpirationDays = r.NoncurrentVersionExpiration.NoncurrentDays out.NewerNoncurrentVersions = r.NoncurrentVersionExpiration.NewerNoncurrentVersions } - // Abort multipart. if r.AbortIncompleteMultipartUpload.set { out.AbortMPUDaysAfterInitiation = r.AbortIncompleteMultipartUpload.DaysAfterInitiation } @@ -69,9 +58,6 @@ func ruleToCanonical(r *Rule) *s3lifecycle.Rule { return out } -// flattenFilter pulls Prefix / Tags / Size constraints out of the XML Filter -// element. Returns zero values when the field is unset; the caller falls back -// to the rule's top-level Prefix when prefix is "". func flattenFilter(f *Filter) (prefix string, tags map[string]string, sizeGT, sizeLT int64) { if !f.set { return