diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 2269c563c..4c9eaefe7 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -86,6 +86,13 @@ service SeaweedFiler { } rpc ReplicateLock(ReplicateLockRequest) returns (ReplicateLockResponse) { } + + // Peer chunk sharing — tier 1: mount-server registry. + // See design-weed-mount-peer-chunk-sharing.md for details. + rpc MountRegister (MountRegisterRequest) returns (MountRegisterResponse) { + } + rpc MountList (MountListRequest) returns (MountListResponse) { + } } ////////////////////////////////////////////////// @@ -197,6 +204,10 @@ message FuseAttributes { bytes md5 = 14; uint32 rdev = 16; uint64 inode = 17; + int64 ctime = 18; // unix time in seconds, inode change time + int32 mtime_ns = 19; // nanosecond component of mtime (0-999999999) + int32 ctime_ns = 20; // nanosecond component of ctime (0-999999999) + int32 crtime_ns = 21; // nanosecond component of crtime (0-999999999) } message CreateEntryRequest { @@ -597,3 +608,31 @@ message StreamMutateEntryResponse { string error = 7; // human-readable error message when the operation failed int32 errno = 8; // POSIX errno (e.g. ENOENT=2, ENOTEMPTY=66) for direct FUSE status mapping } + +////////////////////////////////////////////////// +// Peer chunk sharing — mount-server registry +////////////////////////////////////////////////// + +message MountRegisterRequest { + string peer_addr = 1; // host:port where this mount serves peer chunk requests + string rack = 2; // locality label (rack); used for peer ranking + int32 ttl_seconds = 3; // how long the filer should keep this entry without a heartbeat + string data_center = 4; // locality label (data center); coarser than rack +} + +message MountRegisterResponse { +} + +message MountListRequest { +} + +message MountListResponse { + repeated MountInfo mounts = 1; +} + +message MountInfo { + string peer_addr = 1; + string rack = 2; + int64 last_seen_ns = 3; + string data_center = 4; +} diff --git a/weed/pb/Makefile b/weed/pb/Makefile index 94f5f668d..7b0a9dd6b 100644 --- a/weed/pb/Makefile +++ b/weed/pb/Makefile @@ -9,6 +9,8 @@ gen: protoc remote.proto --go_out=./remote_pb --go-grpc_out=./remote_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc iam.proto --go_out=./iam_pb --go-grpc_out=./iam_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc mount.proto --go_out=./mount_pb --go-grpc_out=./mount_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative + mkdir -p ./mount_peer_pb + protoc mount_peer.proto --go_out=./mount_peer_pb --go-grpc_out=./mount_peer_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc s3.proto --go_out=./s3_pb --go-grpc_out=./s3_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc mq_broker.proto --go_out=./mq_pb --go-grpc_out=./mq_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc mq_schema.proto --go_out=./schema_pb --go-grpc_out=./schema_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 7dd8c6ab2..4c9eaefe7 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -86,6 +86,13 @@ service SeaweedFiler { } rpc ReplicateLock(ReplicateLockRequest) returns (ReplicateLockResponse) { } + + // Peer chunk sharing — tier 1: mount-server registry. + // See design-weed-mount-peer-chunk-sharing.md for details. + rpc MountRegister (MountRegisterRequest) returns (MountRegisterResponse) { + } + rpc MountList (MountListRequest) returns (MountListResponse) { + } } ////////////////////////////////////////////////// @@ -200,6 +207,7 @@ message FuseAttributes { int64 ctime = 18; // unix time in seconds, inode change time int32 mtime_ns = 19; // nanosecond component of mtime (0-999999999) int32 ctime_ns = 20; // nanosecond component of ctime (0-999999999) + int32 crtime_ns = 21; // nanosecond component of crtime (0-999999999) } message CreateEntryRequest { @@ -600,3 +608,31 @@ message StreamMutateEntryResponse { string error = 7; // human-readable error message when the operation failed int32 errno = 8; // POSIX errno (e.g. ENOENT=2, ENOTEMPTY=66) for direct FUSE status mapping } + +////////////////////////////////////////////////// +// Peer chunk sharing — mount-server registry +////////////////////////////////////////////////// + +message MountRegisterRequest { + string peer_addr = 1; // host:port where this mount serves peer chunk requests + string rack = 2; // locality label (rack); used for peer ranking + int32 ttl_seconds = 3; // how long the filer should keep this entry without a heartbeat + string data_center = 4; // locality label (data center); coarser than rack +} + +message MountRegisterResponse { +} + +message MountListRequest { +} + +message MountListResponse { + repeated MountInfo mounts = 1; +} + +message MountInfo { + string peer_addr = 1; + string rack = 2; + int64 last_seen_ns = 3; + string data_center = 4; +} diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index e82c1b865..3ca11482e 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -961,9 +961,10 @@ type FuseAttributes struct { Md5 []byte `protobuf:"bytes,14,opt,name=md5,proto3" json:"md5,omitempty"` Rdev uint32 `protobuf:"varint,16,opt,name=rdev,proto3" json:"rdev,omitempty"` Inode uint64 `protobuf:"varint,17,opt,name=inode,proto3" json:"inode,omitempty"` - Ctime int64 `protobuf:"varint,18,opt,name=ctime,proto3" json:"ctime,omitempty"` // unix time in seconds, inode change time - MtimeNs int32 `protobuf:"varint,19,opt,name=mtime_ns,json=mtimeNs,proto3" json:"mtime_ns,omitempty"` // nanosecond component of mtime (0-999999999) - CtimeNs int32 `protobuf:"varint,20,opt,name=ctime_ns,json=ctimeNs,proto3" json:"ctime_ns,omitempty"` // nanosecond component of ctime (0-999999999) + Ctime int64 `protobuf:"varint,18,opt,name=ctime,proto3" json:"ctime,omitempty"` // unix time in seconds, inode change time + MtimeNs int32 `protobuf:"varint,19,opt,name=mtime_ns,json=mtimeNs,proto3" json:"mtime_ns,omitempty"` // nanosecond component of mtime (0-999999999) + CtimeNs int32 `protobuf:"varint,20,opt,name=ctime_ns,json=ctimeNs,proto3" json:"ctime_ns,omitempty"` // nanosecond component of ctime (0-999999999) + CrtimeNs int32 `protobuf:"varint,21,opt,name=crtime_ns,json=crtimeNs,proto3" json:"crtime_ns,omitempty"` // nanosecond component of crtime (0-999999999) unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1117,6 +1118,13 @@ func (x *FuseAttributes) GetCtimeNs() int32 { return 0 } +func (x *FuseAttributes) GetCrtimeNs() int32 { + if x != nil { + return x.CrtimeNs + } + return 0 +} + type CreateEntryRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` @@ -4869,6 +4877,258 @@ func (*StreamMutateEntryResponse_DeleteResponse) isStreamMutateEntryResponse_Res func (*StreamMutateEntryResponse_RenameResponse) isStreamMutateEntryResponse_Response() {} +type MountRegisterRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + PeerAddr string `protobuf:"bytes,1,opt,name=peer_addr,json=peerAddr,proto3" json:"peer_addr,omitempty"` // host:port where this mount serves peer chunk requests + Rack string `protobuf:"bytes,2,opt,name=rack,proto3" json:"rack,omitempty"` // locality label (rack); used for peer ranking + TtlSeconds int32 `protobuf:"varint,3,opt,name=ttl_seconds,json=ttlSeconds,proto3" json:"ttl_seconds,omitempty"` // how long the filer should keep this entry without a heartbeat + DataCenter string `protobuf:"bytes,4,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"` // locality label (data center); coarser than rack + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MountRegisterRequest) Reset() { + *x = MountRegisterRequest{} + mi := &file_filer_proto_msgTypes[71] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MountRegisterRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MountRegisterRequest) ProtoMessage() {} + +func (x *MountRegisterRequest) ProtoReflect() protoreflect.Message { + mi := &file_filer_proto_msgTypes[71] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MountRegisterRequest.ProtoReflect.Descriptor instead. +func (*MountRegisterRequest) Descriptor() ([]byte, []int) { + return file_filer_proto_rawDescGZIP(), []int{71} +} + +func (x *MountRegisterRequest) GetPeerAddr() string { + if x != nil { + return x.PeerAddr + } + return "" +} + +func (x *MountRegisterRequest) GetRack() string { + if x != nil { + return x.Rack + } + return "" +} + +func (x *MountRegisterRequest) GetTtlSeconds() int32 { + if x != nil { + return x.TtlSeconds + } + return 0 +} + +func (x *MountRegisterRequest) GetDataCenter() string { + if x != nil { + return x.DataCenter + } + return "" +} + +type MountRegisterResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MountRegisterResponse) Reset() { + *x = MountRegisterResponse{} + mi := &file_filer_proto_msgTypes[72] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MountRegisterResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MountRegisterResponse) ProtoMessage() {} + +func (x *MountRegisterResponse) ProtoReflect() protoreflect.Message { + mi := &file_filer_proto_msgTypes[72] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MountRegisterResponse.ProtoReflect.Descriptor instead. +func (*MountRegisterResponse) Descriptor() ([]byte, []int) { + return file_filer_proto_rawDescGZIP(), []int{72} +} + +type MountListRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MountListRequest) Reset() { + *x = MountListRequest{} + mi := &file_filer_proto_msgTypes[73] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MountListRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MountListRequest) ProtoMessage() {} + +func (x *MountListRequest) ProtoReflect() protoreflect.Message { + mi := &file_filer_proto_msgTypes[73] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MountListRequest.ProtoReflect.Descriptor instead. +func (*MountListRequest) Descriptor() ([]byte, []int) { + return file_filer_proto_rawDescGZIP(), []int{73} +} + +type MountListResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Mounts []*MountInfo `protobuf:"bytes,1,rep,name=mounts,proto3" json:"mounts,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MountListResponse) Reset() { + *x = MountListResponse{} + mi := &file_filer_proto_msgTypes[74] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MountListResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MountListResponse) ProtoMessage() {} + +func (x *MountListResponse) ProtoReflect() protoreflect.Message { + mi := &file_filer_proto_msgTypes[74] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MountListResponse.ProtoReflect.Descriptor instead. +func (*MountListResponse) Descriptor() ([]byte, []int) { + return file_filer_proto_rawDescGZIP(), []int{74} +} + +func (x *MountListResponse) GetMounts() []*MountInfo { + if x != nil { + return x.Mounts + } + return nil +} + +type MountInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + PeerAddr string `protobuf:"bytes,1,opt,name=peer_addr,json=peerAddr,proto3" json:"peer_addr,omitempty"` + Rack string `protobuf:"bytes,2,opt,name=rack,proto3" json:"rack,omitempty"` + LastSeenNs int64 `protobuf:"varint,3,opt,name=last_seen_ns,json=lastSeenNs,proto3" json:"last_seen_ns,omitempty"` + DataCenter string `protobuf:"bytes,4,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MountInfo) Reset() { + *x = MountInfo{} + mi := &file_filer_proto_msgTypes[75] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MountInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MountInfo) ProtoMessage() {} + +func (x *MountInfo) ProtoReflect() protoreflect.Message { + mi := &file_filer_proto_msgTypes[75] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MountInfo.ProtoReflect.Descriptor instead. +func (*MountInfo) Descriptor() ([]byte, []int) { + return file_filer_proto_rawDescGZIP(), []int{75} +} + +func (x *MountInfo) GetPeerAddr() string { + if x != nil { + return x.PeerAddr + } + return "" +} + +func (x *MountInfo) GetRack() string { + if x != nil { + return x.Rack + } + return "" +} + +func (x *MountInfo) GetLastSeenNs() int64 { + if x != nil { + return x.LastSeenNs + } + return 0 +} + +func (x *MountInfo) GetDataCenter() string { + if x != nil { + return x.DataCenter + } + return "" +} + // if found, send the exact address // if not found, send the full list of existing brokers type LocateBrokerResponse_Resource struct { @@ -4881,7 +5141,7 @@ type LocateBrokerResponse_Resource struct { func (x *LocateBrokerResponse_Resource) Reset() { *x = LocateBrokerResponse_Resource{} - mi := &file_filer_proto_msgTypes[74] + mi := &file_filer_proto_msgTypes[79] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4893,7 +5153,7 @@ func (x *LocateBrokerResponse_Resource) String() string { func (*LocateBrokerResponse_Resource) ProtoMessage() {} func (x *LocateBrokerResponse_Resource) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[74] + mi := &file_filer_proto_msgTypes[79] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4947,7 +5207,7 @@ type FilerConf_PathConf struct { func (x *FilerConf_PathConf) Reset() { *x = FilerConf_PathConf{} - mi := &file_filer_proto_msgTypes[75] + mi := &file_filer_proto_msgTypes[80] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4959,7 +5219,7 @@ func (x *FilerConf_PathConf) String() string { func (*FilerConf_PathConf) ProtoMessage() {} func (x *FilerConf_PathConf) ProtoReflect() protoreflect.Message { - mi := &file_filer_proto_msgTypes[75] + mi := &file_filer_proto_msgTypes[80] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -5168,7 +5428,7 @@ const file_filer_proto_rawDesc = "" + "\x06FileId\x12\x1b\n" + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" + "\bfile_key\x18\x02 \x01(\x04R\afileKey\x12\x16\n" + - "\x06cookie\x18\x03 \x01(\aR\x06cookie\"\xb4\x03\n" + + "\x06cookie\x18\x03 \x01(\aR\x06cookie\"\xd1\x03\n" + "\x0eFuseAttributes\x12\x1b\n" + "\tfile_size\x18\x01 \x01(\x04R\bfileSize\x12\x14\n" + "\x05mtime\x18\x02 \x01(\x03R\x05mtime\x12\x1b\n" + @@ -5188,7 +5448,8 @@ const file_filer_proto_rawDesc = "" + "\x05inode\x18\x11 \x01(\x04R\x05inode\x12\x14\n" + "\x05ctime\x18\x12 \x01(\x03R\x05ctime\x12\x19\n" + "\bmtime_ns\x18\x13 \x01(\x05R\amtimeNs\x12\x19\n" + - "\bctime_ns\x18\x14 \x01(\x05R\actimeNs\"\x82\x02\n" + + "\bctime_ns\x18\x14 \x01(\x05R\actimeNs\x12\x1b\n" + + "\tcrtime_ns\x18\x15 \x01(\x05R\bcrtimeNs\"\x82\x02\n" + "\x12CreateEntryRequest\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12%\n" + "\x05entry\x18\x02 \x01(\v2\x0f.filer_pb.EntryR\x05entry\x12\x15\n" + @@ -5524,7 +5785,25 @@ const file_filer_proto_rawDesc = "" + "\x05error\x18\a \x01(\tR\x05error\x12\x14\n" + "\x05errno\x18\b \x01(\x05R\x05errnoB\n" + "\n" + - "\bresponse*7\n" + + "\bresponse\"\x89\x01\n" + + "\x14MountRegisterRequest\x12\x1b\n" + + "\tpeer_addr\x18\x01 \x01(\tR\bpeerAddr\x12\x12\n" + + "\x04rack\x18\x02 \x01(\tR\x04rack\x12\x1f\n" + + "\vttl_seconds\x18\x03 \x01(\x05R\n" + + "ttlSeconds\x12\x1f\n" + + "\vdata_center\x18\x04 \x01(\tR\n" + + "dataCenter\"\x17\n" + + "\x15MountRegisterResponse\"\x12\n" + + "\x10MountListRequest\"@\n" + + "\x11MountListResponse\x12+\n" + + "\x06mounts\x18\x01 \x03(\v2\x13.filer_pb.MountInfoR\x06mounts\"\x7f\n" + + "\tMountInfo\x12\x1b\n" + + "\tpeer_addr\x18\x01 \x01(\tR\bpeerAddr\x12\x12\n" + + "\x04rack\x18\x02 \x01(\tR\x04rack\x12 \n" + + "\flast_seen_ns\x18\x03 \x01(\x03R\n" + + "lastSeenNs\x12\x1f\n" + + "\vdata_center\x18\x04 \x01(\tR\n" + + "dataCenter*7\n" + "\aSSEType\x12\b\n" + "\x04NONE\x10\x00\x12\t\n" + "\x05SSE_C\x10\x01\x12\v\n" + @@ -5538,7 +5817,7 @@ const file_filer_proto_rawDesc = "" + "\x0ePARENT_IS_FILE\x10\x02\x12\x19\n" + "\x15EXISTING_IS_DIRECTORY\x10\x03\x12\x14\n" + "\x10EXISTING_IS_FILE\x10\x04\x12\x18\n" + - "\x14ENTRY_ALREADY_EXISTS\x10\x052\xaf\x12\n" + + "\x14ENTRY_ALREADY_EXISTS\x10\x052\xcb\x13\n" + "\fSeaweedFiler\x12g\n" + "\x14LookupDirectoryEntry\x12%.filer_pb.LookupDirectoryEntryRequest\x1a&.filer_pb.LookupDirectoryEntryResponse\"\x00\x12N\n" + "\vListEntries\x12\x1c.filer_pb.ListEntriesRequest\x1a\x1d.filer_pb.ListEntriesResponse\"\x000\x01\x12L\n" + @@ -5567,7 +5846,9 @@ const file_filer_proto_rawDesc = "" + "\x11DistributedUnlock\x12\x17.filer_pb.UnlockRequest\x1a\x18.filer_pb.UnlockResponse\"\x00\x12R\n" + "\rFindLockOwner\x12\x1e.filer_pb.FindLockOwnerRequest\x1a\x1f.filer_pb.FindLockOwnerResponse\"\x00\x12R\n" + "\rTransferLocks\x12\x1e.filer_pb.TransferLocksRequest\x1a\x1f.filer_pb.TransferLocksResponse\"\x00\x12R\n" + - "\rReplicateLock\x12\x1e.filer_pb.ReplicateLockRequest\x1a\x1f.filer_pb.ReplicateLockResponse\"\x00BO\n" + + "\rReplicateLock\x12\x1e.filer_pb.ReplicateLockRequest\x1a\x1f.filer_pb.ReplicateLockResponse\"\x00\x12R\n" + + "\rMountRegister\x12\x1e.filer_pb.MountRegisterRequest\x1a\x1f.filer_pb.MountRegisterResponse\"\x00\x12F\n" + + "\tMountList\x12\x1a.filer_pb.MountListRequest\x1a\x1b.filer_pb.MountListResponse\"\x00BO\n" + "\x10seaweedfs.clientB\n" + "FilerProtoZ/github.com/seaweedfs/seaweedfs/weed/pb/filer_pbb\x06proto3" @@ -5584,7 +5865,7 @@ func file_filer_proto_rawDescGZIP() []byte { } var file_filer_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 76) +var file_filer_proto_msgTypes = make([]protoimpl.MessageInfo, 81) var file_filer_proto_goTypes = []any{ (SSEType)(0), // 0: filer_pb.SSEType (FilerError)(0), // 1: filer_pb.FilerError @@ -5659,18 +5940,23 @@ var file_filer_proto_goTypes = []any{ (*ReplicateLockResponse)(nil), // 70: filer_pb.ReplicateLockResponse (*StreamMutateEntryRequest)(nil), // 71: filer_pb.StreamMutateEntryRequest (*StreamMutateEntryResponse)(nil), // 72: filer_pb.StreamMutateEntryResponse - nil, // 73: filer_pb.Entry.ExtendedEntry - nil, // 74: filer_pb.UpdateEntryRequest.ExpectedExtendedEntry - nil, // 75: filer_pb.LookupVolumeResponse.LocationsMapEntry - (*LocateBrokerResponse_Resource)(nil), // 76: filer_pb.LocateBrokerResponse.Resource - (*FilerConf_PathConf)(nil), // 77: filer_pb.FilerConf.PathConf + (*MountRegisterRequest)(nil), // 73: filer_pb.MountRegisterRequest + (*MountRegisterResponse)(nil), // 74: filer_pb.MountRegisterResponse + (*MountListRequest)(nil), // 75: filer_pb.MountListRequest + (*MountListResponse)(nil), // 76: filer_pb.MountListResponse + (*MountInfo)(nil), // 77: filer_pb.MountInfo + nil, // 78: filer_pb.Entry.ExtendedEntry + nil, // 79: filer_pb.UpdateEntryRequest.ExpectedExtendedEntry + nil, // 80: filer_pb.LookupVolumeResponse.LocationsMapEntry + (*LocateBrokerResponse_Resource)(nil), // 81: filer_pb.LocateBrokerResponse.Resource + (*FilerConf_PathConf)(nil), // 82: filer_pb.FilerConf.PathConf } var file_filer_proto_depIdxs = []int32{ 7, // 0: filer_pb.LookupDirectoryEntryResponse.entry:type_name -> filer_pb.Entry 7, // 1: filer_pb.ListEntriesResponse.entry:type_name -> filer_pb.Entry 10, // 2: filer_pb.Entry.chunks:type_name -> filer_pb.FileChunk 13, // 3: filer_pb.Entry.attributes:type_name -> filer_pb.FuseAttributes - 73, // 4: filer_pb.Entry.extended:type_name -> filer_pb.Entry.ExtendedEntry + 78, // 4: filer_pb.Entry.extended:type_name -> filer_pb.Entry.ExtendedEntry 6, // 5: filer_pb.Entry.remote_entry:type_name -> filer_pb.RemoteEntry 7, // 6: filer_pb.FullEntry.entry:type_name -> filer_pb.Entry 7, // 7: filer_pb.EventNotification.old_entry:type_name -> filer_pb.Entry @@ -5683,22 +5969,22 @@ var file_filer_proto_depIdxs = []int32{ 44, // 14: filer_pb.CreateEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse 1, // 15: filer_pb.CreateEntryResponse.error_code:type_name -> filer_pb.FilerError 7, // 16: filer_pb.UpdateEntryRequest.entry:type_name -> filer_pb.Entry - 74, // 17: filer_pb.UpdateEntryRequest.expected_extended:type_name -> filer_pb.UpdateEntryRequest.ExpectedExtendedEntry + 79, // 17: filer_pb.UpdateEntryRequest.expected_extended:type_name -> filer_pb.UpdateEntryRequest.ExpectedExtendedEntry 44, // 18: filer_pb.UpdateEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse 10, // 19: filer_pb.AppendToEntryRequest.chunks:type_name -> filer_pb.FileChunk 44, // 20: filer_pb.DeleteEntryResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse 9, // 21: filer_pb.StreamRenameEntryResponse.event_notification:type_name -> filer_pb.EventNotification 30, // 22: filer_pb.AssignVolumeResponse.location:type_name -> filer_pb.Location 30, // 23: filer_pb.Locations.locations:type_name -> filer_pb.Location - 75, // 24: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry + 80, // 24: filer_pb.LookupVolumeResponse.locations_map:type_name -> filer_pb.LookupVolumeResponse.LocationsMapEntry 32, // 25: filer_pb.CollectionListResponse.collections:type_name -> filer_pb.Collection 9, // 26: filer_pb.SubscribeMetadataResponse.event_notification:type_name -> filer_pb.EventNotification 44, // 27: filer_pb.SubscribeMetadataResponse.events:type_name -> filer_pb.SubscribeMetadataResponse 45, // 28: filer_pb.SubscribeMetadataResponse.log_file_refs:type_name -> filer_pb.LogFileChunkRef 10, // 29: filer_pb.LogFileChunkRef.chunks:type_name -> filer_pb.FileChunk 7, // 30: filer_pb.TraverseBfsMetadataResponse.entry:type_name -> filer_pb.Entry - 76, // 31: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource - 77, // 32: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf + 81, // 31: filer_pb.LocateBrokerResponse.resources:type_name -> filer_pb.LocateBrokerResponse.Resource + 82, // 32: filer_pb.FilerConf.locations:type_name -> filer_pb.FilerConf.PathConf 7, // 33: filer_pb.CacheRemoteObjectToLocalClusterResponse.entry:type_name -> filer_pb.Entry 44, // 34: filer_pb.CacheRemoteObjectToLocalClusterResponse.metadata_event:type_name -> filer_pb.SubscribeMetadataResponse 66, // 35: filer_pb.TransferLocksRequest.locks:type_name -> filer_pb.Lock @@ -5710,66 +5996,71 @@ var file_filer_proto_depIdxs = []int32{ 17, // 41: filer_pb.StreamMutateEntryResponse.update_response:type_name -> filer_pb.UpdateEntryResponse 21, // 42: filer_pb.StreamMutateEntryResponse.delete_response:type_name -> filer_pb.DeleteEntryResponse 25, // 43: filer_pb.StreamMutateEntryResponse.rename_response:type_name -> filer_pb.StreamRenameEntryResponse - 29, // 44: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations - 2, // 45: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest - 4, // 46: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest - 14, // 47: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest - 16, // 48: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest - 18, // 49: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest - 20, // 50: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest - 22, // 51: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest - 24, // 52: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest - 71, // 53: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest - 26, // 54: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest - 28, // 55: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest - 33, // 56: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest - 35, // 57: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest - 37, // 58: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest - 39, // 59: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest - 41, // 60: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest - 46, // 61: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest - 43, // 62: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 43, // 63: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest - 53, // 64: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest - 55, // 65: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest - 58, // 66: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest - 60, // 67: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest - 62, // 68: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest - 64, // 69: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest - 67, // 70: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest - 69, // 71: filer_pb.SeaweedFiler.ReplicateLock:input_type -> filer_pb.ReplicateLockRequest - 3, // 72: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse - 5, // 73: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse - 15, // 74: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse - 17, // 75: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse - 19, // 76: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse - 21, // 77: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse - 23, // 78: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse - 25, // 79: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse - 72, // 80: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse - 27, // 81: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse - 31, // 82: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse - 34, // 83: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse - 36, // 84: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse - 38, // 85: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse - 40, // 86: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse - 42, // 87: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse - 47, // 88: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse - 44, // 89: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 44, // 90: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse - 54, // 91: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse - 56, // 92: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse - 59, // 93: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse - 61, // 94: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse - 63, // 95: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse - 65, // 96: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse - 68, // 97: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse - 70, // 98: filer_pb.SeaweedFiler.ReplicateLock:output_type -> filer_pb.ReplicateLockResponse - 72, // [72:99] is the sub-list for method output_type - 45, // [45:72] is the sub-list for method input_type - 45, // [45:45] is the sub-list for extension type_name - 45, // [45:45] is the sub-list for extension extendee - 0, // [0:45] is the sub-list for field type_name + 77, // 44: filer_pb.MountListResponse.mounts:type_name -> filer_pb.MountInfo + 29, // 45: filer_pb.LookupVolumeResponse.LocationsMapEntry.value:type_name -> filer_pb.Locations + 2, // 46: filer_pb.SeaweedFiler.LookupDirectoryEntry:input_type -> filer_pb.LookupDirectoryEntryRequest + 4, // 47: filer_pb.SeaweedFiler.ListEntries:input_type -> filer_pb.ListEntriesRequest + 14, // 48: filer_pb.SeaweedFiler.CreateEntry:input_type -> filer_pb.CreateEntryRequest + 16, // 49: filer_pb.SeaweedFiler.UpdateEntry:input_type -> filer_pb.UpdateEntryRequest + 18, // 50: filer_pb.SeaweedFiler.AppendToEntry:input_type -> filer_pb.AppendToEntryRequest + 20, // 51: filer_pb.SeaweedFiler.DeleteEntry:input_type -> filer_pb.DeleteEntryRequest + 22, // 52: filer_pb.SeaweedFiler.AtomicRenameEntry:input_type -> filer_pb.AtomicRenameEntryRequest + 24, // 53: filer_pb.SeaweedFiler.StreamRenameEntry:input_type -> filer_pb.StreamRenameEntryRequest + 71, // 54: filer_pb.SeaweedFiler.StreamMutateEntry:input_type -> filer_pb.StreamMutateEntryRequest + 26, // 55: filer_pb.SeaweedFiler.AssignVolume:input_type -> filer_pb.AssignVolumeRequest + 28, // 56: filer_pb.SeaweedFiler.LookupVolume:input_type -> filer_pb.LookupVolumeRequest + 33, // 57: filer_pb.SeaweedFiler.CollectionList:input_type -> filer_pb.CollectionListRequest + 35, // 58: filer_pb.SeaweedFiler.DeleteCollection:input_type -> filer_pb.DeleteCollectionRequest + 37, // 59: filer_pb.SeaweedFiler.Statistics:input_type -> filer_pb.StatisticsRequest + 39, // 60: filer_pb.SeaweedFiler.Ping:input_type -> filer_pb.PingRequest + 41, // 61: filer_pb.SeaweedFiler.GetFilerConfiguration:input_type -> filer_pb.GetFilerConfigurationRequest + 46, // 62: filer_pb.SeaweedFiler.TraverseBfsMetadata:input_type -> filer_pb.TraverseBfsMetadataRequest + 43, // 63: filer_pb.SeaweedFiler.SubscribeMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 43, // 64: filer_pb.SeaweedFiler.SubscribeLocalMetadata:input_type -> filer_pb.SubscribeMetadataRequest + 53, // 65: filer_pb.SeaweedFiler.KvGet:input_type -> filer_pb.KvGetRequest + 55, // 66: filer_pb.SeaweedFiler.KvPut:input_type -> filer_pb.KvPutRequest + 58, // 67: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:input_type -> filer_pb.CacheRemoteObjectToLocalClusterRequest + 60, // 68: filer_pb.SeaweedFiler.DistributedLock:input_type -> filer_pb.LockRequest + 62, // 69: filer_pb.SeaweedFiler.DistributedUnlock:input_type -> filer_pb.UnlockRequest + 64, // 70: filer_pb.SeaweedFiler.FindLockOwner:input_type -> filer_pb.FindLockOwnerRequest + 67, // 71: filer_pb.SeaweedFiler.TransferLocks:input_type -> filer_pb.TransferLocksRequest + 69, // 72: filer_pb.SeaweedFiler.ReplicateLock:input_type -> filer_pb.ReplicateLockRequest + 73, // 73: filer_pb.SeaweedFiler.MountRegister:input_type -> filer_pb.MountRegisterRequest + 75, // 74: filer_pb.SeaweedFiler.MountList:input_type -> filer_pb.MountListRequest + 3, // 75: filer_pb.SeaweedFiler.LookupDirectoryEntry:output_type -> filer_pb.LookupDirectoryEntryResponse + 5, // 76: filer_pb.SeaweedFiler.ListEntries:output_type -> filer_pb.ListEntriesResponse + 15, // 77: filer_pb.SeaweedFiler.CreateEntry:output_type -> filer_pb.CreateEntryResponse + 17, // 78: filer_pb.SeaweedFiler.UpdateEntry:output_type -> filer_pb.UpdateEntryResponse + 19, // 79: filer_pb.SeaweedFiler.AppendToEntry:output_type -> filer_pb.AppendToEntryResponse + 21, // 80: filer_pb.SeaweedFiler.DeleteEntry:output_type -> filer_pb.DeleteEntryResponse + 23, // 81: filer_pb.SeaweedFiler.AtomicRenameEntry:output_type -> filer_pb.AtomicRenameEntryResponse + 25, // 82: filer_pb.SeaweedFiler.StreamRenameEntry:output_type -> filer_pb.StreamRenameEntryResponse + 72, // 83: filer_pb.SeaweedFiler.StreamMutateEntry:output_type -> filer_pb.StreamMutateEntryResponse + 27, // 84: filer_pb.SeaweedFiler.AssignVolume:output_type -> filer_pb.AssignVolumeResponse + 31, // 85: filer_pb.SeaweedFiler.LookupVolume:output_type -> filer_pb.LookupVolumeResponse + 34, // 86: filer_pb.SeaweedFiler.CollectionList:output_type -> filer_pb.CollectionListResponse + 36, // 87: filer_pb.SeaweedFiler.DeleteCollection:output_type -> filer_pb.DeleteCollectionResponse + 38, // 88: filer_pb.SeaweedFiler.Statistics:output_type -> filer_pb.StatisticsResponse + 40, // 89: filer_pb.SeaweedFiler.Ping:output_type -> filer_pb.PingResponse + 42, // 90: filer_pb.SeaweedFiler.GetFilerConfiguration:output_type -> filer_pb.GetFilerConfigurationResponse + 47, // 91: filer_pb.SeaweedFiler.TraverseBfsMetadata:output_type -> filer_pb.TraverseBfsMetadataResponse + 44, // 92: filer_pb.SeaweedFiler.SubscribeMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 44, // 93: filer_pb.SeaweedFiler.SubscribeLocalMetadata:output_type -> filer_pb.SubscribeMetadataResponse + 54, // 94: filer_pb.SeaweedFiler.KvGet:output_type -> filer_pb.KvGetResponse + 56, // 95: filer_pb.SeaweedFiler.KvPut:output_type -> filer_pb.KvPutResponse + 59, // 96: filer_pb.SeaweedFiler.CacheRemoteObjectToLocalCluster:output_type -> filer_pb.CacheRemoteObjectToLocalClusterResponse + 61, // 97: filer_pb.SeaweedFiler.DistributedLock:output_type -> filer_pb.LockResponse + 63, // 98: filer_pb.SeaweedFiler.DistributedUnlock:output_type -> filer_pb.UnlockResponse + 65, // 99: filer_pb.SeaweedFiler.FindLockOwner:output_type -> filer_pb.FindLockOwnerResponse + 68, // 100: filer_pb.SeaweedFiler.TransferLocks:output_type -> filer_pb.TransferLocksResponse + 70, // 101: filer_pb.SeaweedFiler.ReplicateLock:output_type -> filer_pb.ReplicateLockResponse + 74, // 102: filer_pb.SeaweedFiler.MountRegister:output_type -> filer_pb.MountRegisterResponse + 76, // 103: filer_pb.SeaweedFiler.MountList:output_type -> filer_pb.MountListResponse + 75, // [75:104] is the sub-list for method output_type + 46, // [46:75] is the sub-list for method input_type + 46, // [46:46] is the sub-list for extension type_name + 46, // [46:46] is the sub-list for extension extendee + 0, // [0:46] is the sub-list for field type_name } func init() { file_filer_proto_init() } @@ -5795,7 +6086,7 @@ func file_filer_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_filer_proto_rawDesc), len(file_filer_proto_rawDesc)), NumEnums: 2, - NumMessages: 76, + NumMessages: 81, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/filer_pb/filer_grpc.pb.go b/weed/pb/filer_pb/filer_grpc.pb.go index e7498280d..4712c3e5b 100644 --- a/weed/pb/filer_pb/filer_grpc.pb.go +++ b/weed/pb/filer_pb/filer_grpc.pb.go @@ -46,6 +46,8 @@ const ( SeaweedFiler_FindLockOwner_FullMethodName = "/filer_pb.SeaweedFiler/FindLockOwner" SeaweedFiler_TransferLocks_FullMethodName = "/filer_pb.SeaweedFiler/TransferLocks" SeaweedFiler_ReplicateLock_FullMethodName = "/filer_pb.SeaweedFiler/ReplicateLock" + SeaweedFiler_MountRegister_FullMethodName = "/filer_pb.SeaweedFiler/MountRegister" + SeaweedFiler_MountList_FullMethodName = "/filer_pb.SeaweedFiler/MountList" ) // SeaweedFilerClient is the client API for SeaweedFiler service. @@ -80,6 +82,10 @@ type SeaweedFilerClient interface { // distributed lock management internal use only TransferLocks(ctx context.Context, in *TransferLocksRequest, opts ...grpc.CallOption) (*TransferLocksResponse, error) ReplicateLock(ctx context.Context, in *ReplicateLockRequest, opts ...grpc.CallOption) (*ReplicateLockResponse, error) + // Peer chunk sharing — tier 1: mount-server registry. + // See design-weed-mount-peer-chunk-sharing.md for details. + MountRegister(ctx context.Context, in *MountRegisterRequest, opts ...grpc.CallOption) (*MountRegisterResponse, error) + MountList(ctx context.Context, in *MountListRequest, opts ...grpc.CallOption) (*MountListResponse, error) } type seaweedFilerClient struct { @@ -408,6 +414,26 @@ func (c *seaweedFilerClient) ReplicateLock(ctx context.Context, in *ReplicateLoc return out, nil } +func (c *seaweedFilerClient) MountRegister(ctx context.Context, in *MountRegisterRequest, opts ...grpc.CallOption) (*MountRegisterResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(MountRegisterResponse) + err := c.cc.Invoke(ctx, SeaweedFiler_MountRegister_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *seaweedFilerClient) MountList(ctx context.Context, in *MountListRequest, opts ...grpc.CallOption) (*MountListResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(MountListResponse) + err := c.cc.Invoke(ctx, SeaweedFiler_MountList_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // SeaweedFilerServer is the server API for SeaweedFiler service. // All implementations must embed UnimplementedSeaweedFilerServer // for forward compatibility. @@ -440,6 +466,10 @@ type SeaweedFilerServer interface { // distributed lock management internal use only TransferLocks(context.Context, *TransferLocksRequest) (*TransferLocksResponse, error) ReplicateLock(context.Context, *ReplicateLockRequest) (*ReplicateLockResponse, error) + // Peer chunk sharing — tier 1: mount-server registry. + // See design-weed-mount-peer-chunk-sharing.md for details. + MountRegister(context.Context, *MountRegisterRequest) (*MountRegisterResponse, error) + MountList(context.Context, *MountListRequest) (*MountListResponse, error) mustEmbedUnimplementedSeaweedFilerServer() } @@ -531,6 +561,12 @@ func (UnimplementedSeaweedFilerServer) TransferLocks(context.Context, *TransferL func (UnimplementedSeaweedFilerServer) ReplicateLock(context.Context, *ReplicateLockRequest) (*ReplicateLockResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ReplicateLock not implemented") } +func (UnimplementedSeaweedFilerServer) MountRegister(context.Context, *MountRegisterRequest) (*MountRegisterResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method MountRegister not implemented") +} +func (UnimplementedSeaweedFilerServer) MountList(context.Context, *MountListRequest) (*MountListResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method MountList not implemented") +} func (UnimplementedSeaweedFilerServer) mustEmbedUnimplementedSeaweedFilerServer() {} func (UnimplementedSeaweedFilerServer) testEmbeddedByValue() {} @@ -992,6 +1028,42 @@ func _SeaweedFiler_ReplicateLock_Handler(srv interface{}, ctx context.Context, d return interceptor(ctx, in, info, handler) } +func _SeaweedFiler_MountRegister_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MountRegisterRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SeaweedFilerServer).MountRegister(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SeaweedFiler_MountRegister_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SeaweedFilerServer).MountRegister(ctx, req.(*MountRegisterRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _SeaweedFiler_MountList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MountListRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SeaweedFilerServer).MountList(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SeaweedFiler_MountList_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SeaweedFilerServer).MountList(ctx, req.(*MountListRequest)) + } + return interceptor(ctx, in, info, handler) +} + // SeaweedFiler_ServiceDesc is the grpc.ServiceDesc for SeaweedFiler service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -1083,6 +1155,14 @@ var SeaweedFiler_ServiceDesc = grpc.ServiceDesc{ MethodName: "ReplicateLock", Handler: _SeaweedFiler_ReplicateLock_Handler, }, + { + MethodName: "MountRegister", + Handler: _SeaweedFiler_MountRegister_Handler, + }, + { + MethodName: "MountList", + Handler: _SeaweedFiler_MountList_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/weed/pb/mount_peer.proto b/weed/pb/mount_peer.proto new file mode 100644 index 000000000..b696f987c --- /dev/null +++ b/weed/pb/mount_peer.proto @@ -0,0 +1,95 @@ +syntax = "proto3"; + +package mount_peer_pb; + +option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/mount_peer_pb"; + +////////////////////////////////////////////////// +// Peer chunk sharing — mount-to-mount chunk directory +// +// Each weed mount exposes this service on its -peer.listen address. +// Ownership of fid -> holders tracking is sharded across the mount fleet +// via rendezvous (HRW) hashing on the registered mount list. See +// design-weed-mount-peer-chunk-sharing.md for protocol details. +////////////////////////////////////////////////// + +service MountPeer { + + // ChunkAnnounce: the caller asserts it currently holds the listed fids in + // its local chunk cache and is willing to serve them to peers. The + // receiver accepts only fids for which it is the HRW-assigned owner on + // its current seed view; others are returned in rejected_file_ids. + rpc ChunkAnnounce (ChunkAnnounceRequest) returns (ChunkAnnounceResponse) { + } + + // ChunkLookup: asks the receiver for known holders of each requested fid. + // The receiver responds only for fids it owns; others are listed in + // not_owner_file_ids so the caller can retry against the correct owner + // after refreshing its own seed view. + rpc ChunkLookup (ChunkLookupRequest) returns (ChunkLookupResponse) { + } + + // FetchChunk: server-streams the bytes of a cached chunk to a peer. + // Streaming avoids the default gRPC 4 MiB message cap for typical + // 16 MiB chunks and lets the receiver assemble into a preallocated + // buffer. Not-cached / cache-miss returns a gRPC NOT_FOUND status. + // The fetcher re-verifies MD5 against expected_etag end-to-end after + // the stream completes. + rpc FetchChunk (FetchChunkRequest) returns (stream FetchChunkResponse) { + } +} + +message ChunkAnnounceRequest { + repeated string file_ids = 1; + string peer_addr = 2; + string rack = 3; + int32 ttl_seconds = 4; + string data_center = 5; +} + +message ChunkAnnounceResponse { + repeated string rejected_file_ids = 1; // receiver is not the owner of these fids +} + +message ChunkLookupRequest { + repeated string file_ids = 1; +} + +message ChunkLookupResponse { + map peers_by_fid = 1; + repeated string not_owner_file_ids = 2; +} + +message PeerSet { + repeated PeerInfo peers = 1; +} + +message PeerInfo { + string peer_addr = 1; + string rack = 2; + string data_center = 3; +} + +message FetchChunkRequest { + string file_id = 1; + string expected_etag = 2; // caller's expected MD5 over the FULL chunk. + // Only meaningful when offset=0 and length=0 + // (a whole-chunk fetch); partial reads can't + // be verified against a whole-chunk MD5. + // Fetcher re-verifies end-to-end. + uint64 expected_size = 3; // filer-reported chunk byte count; server sizes + // its cache-read buffer to exactly this so a + // sub-max-part-size chunk doesn't trigger the + // cache wrapper's all-or-nothing miss path. + uint64 offset = 4; // optional: byte offset within the chunk to + // start reading from. 0 (default) means a + // whole-chunk transfer starting at byte zero. + uint64 length = 5; // optional: number of bytes to return from + // offset. 0 (default) means "until end of + // chunk". Range is capped server-side by + // maxFetchChunkBytes regardless. +} + +message FetchChunkResponse { + bytes data = 1; // next frame of chunk bytes; concatenate across stream +} diff --git a/weed/pb/mount_peer_pb/mount_peer.pb.go b/weed/pb/mount_peer_pb/mount_peer.pb.go new file mode 100644 index 000000000..32be93f7a --- /dev/null +++ b/weed/pb/mount_peer_pb/mount_peer.pb.go @@ -0,0 +1,581 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.6 +// protoc v6.33.4 +// source: mount_peer.proto + +package mount_peer_pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ChunkAnnounceRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + FileIds []string `protobuf:"bytes,1,rep,name=file_ids,json=fileIds,proto3" json:"file_ids,omitempty"` + PeerAddr string `protobuf:"bytes,2,opt,name=peer_addr,json=peerAddr,proto3" json:"peer_addr,omitempty"` + Rack string `protobuf:"bytes,3,opt,name=rack,proto3" json:"rack,omitempty"` + TtlSeconds int32 `protobuf:"varint,4,opt,name=ttl_seconds,json=ttlSeconds,proto3" json:"ttl_seconds,omitempty"` + DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ChunkAnnounceRequest) Reset() { + *x = ChunkAnnounceRequest{} + mi := &file_mount_peer_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ChunkAnnounceRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChunkAnnounceRequest) ProtoMessage() {} + +func (x *ChunkAnnounceRequest) ProtoReflect() protoreflect.Message { + mi := &file_mount_peer_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChunkAnnounceRequest.ProtoReflect.Descriptor instead. +func (*ChunkAnnounceRequest) Descriptor() ([]byte, []int) { + return file_mount_peer_proto_rawDescGZIP(), []int{0} +} + +func (x *ChunkAnnounceRequest) GetFileIds() []string { + if x != nil { + return x.FileIds + } + return nil +} + +func (x *ChunkAnnounceRequest) GetPeerAddr() string { + if x != nil { + return x.PeerAddr + } + return "" +} + +func (x *ChunkAnnounceRequest) GetRack() string { + if x != nil { + return x.Rack + } + return "" +} + +func (x *ChunkAnnounceRequest) GetTtlSeconds() int32 { + if x != nil { + return x.TtlSeconds + } + return 0 +} + +func (x *ChunkAnnounceRequest) GetDataCenter() string { + if x != nil { + return x.DataCenter + } + return "" +} + +type ChunkAnnounceResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + RejectedFileIds []string `protobuf:"bytes,1,rep,name=rejected_file_ids,json=rejectedFileIds,proto3" json:"rejected_file_ids,omitempty"` // receiver is not the owner of these fids + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ChunkAnnounceResponse) Reset() { + *x = ChunkAnnounceResponse{} + mi := &file_mount_peer_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ChunkAnnounceResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChunkAnnounceResponse) ProtoMessage() {} + +func (x *ChunkAnnounceResponse) ProtoReflect() protoreflect.Message { + mi := &file_mount_peer_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChunkAnnounceResponse.ProtoReflect.Descriptor instead. +func (*ChunkAnnounceResponse) Descriptor() ([]byte, []int) { + return file_mount_peer_proto_rawDescGZIP(), []int{1} +} + +func (x *ChunkAnnounceResponse) GetRejectedFileIds() []string { + if x != nil { + return x.RejectedFileIds + } + return nil +} + +type ChunkLookupRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + FileIds []string `protobuf:"bytes,1,rep,name=file_ids,json=fileIds,proto3" json:"file_ids,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ChunkLookupRequest) Reset() { + *x = ChunkLookupRequest{} + mi := &file_mount_peer_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ChunkLookupRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChunkLookupRequest) ProtoMessage() {} + +func (x *ChunkLookupRequest) ProtoReflect() protoreflect.Message { + mi := &file_mount_peer_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChunkLookupRequest.ProtoReflect.Descriptor instead. +func (*ChunkLookupRequest) Descriptor() ([]byte, []int) { + return file_mount_peer_proto_rawDescGZIP(), []int{2} +} + +func (x *ChunkLookupRequest) GetFileIds() []string { + if x != nil { + return x.FileIds + } + return nil +} + +type ChunkLookupResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + PeersByFid map[string]*PeerSet `protobuf:"bytes,1,rep,name=peers_by_fid,json=peersByFid,proto3" json:"peers_by_fid,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + NotOwnerFileIds []string `protobuf:"bytes,2,rep,name=not_owner_file_ids,json=notOwnerFileIds,proto3" json:"not_owner_file_ids,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ChunkLookupResponse) Reset() { + *x = ChunkLookupResponse{} + mi := &file_mount_peer_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ChunkLookupResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChunkLookupResponse) ProtoMessage() {} + +func (x *ChunkLookupResponse) ProtoReflect() protoreflect.Message { + mi := &file_mount_peer_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChunkLookupResponse.ProtoReflect.Descriptor instead. +func (*ChunkLookupResponse) Descriptor() ([]byte, []int) { + return file_mount_peer_proto_rawDescGZIP(), []int{3} +} + +func (x *ChunkLookupResponse) GetPeersByFid() map[string]*PeerSet { + if x != nil { + return x.PeersByFid + } + return nil +} + +func (x *ChunkLookupResponse) GetNotOwnerFileIds() []string { + if x != nil { + return x.NotOwnerFileIds + } + return nil +} + +type PeerSet struct { + state protoimpl.MessageState `protogen:"open.v1"` + Peers []*PeerInfo `protobuf:"bytes,1,rep,name=peers,proto3" json:"peers,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PeerSet) Reset() { + *x = PeerSet{} + mi := &file_mount_peer_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PeerSet) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PeerSet) ProtoMessage() {} + +func (x *PeerSet) ProtoReflect() protoreflect.Message { + mi := &file_mount_peer_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PeerSet.ProtoReflect.Descriptor instead. +func (*PeerSet) Descriptor() ([]byte, []int) { + return file_mount_peer_proto_rawDescGZIP(), []int{4} +} + +func (x *PeerSet) GetPeers() []*PeerInfo { + if x != nil { + return x.Peers + } + return nil +} + +type PeerInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + PeerAddr string `protobuf:"bytes,1,opt,name=peer_addr,json=peerAddr,proto3" json:"peer_addr,omitempty"` + Rack string `protobuf:"bytes,2,opt,name=rack,proto3" json:"rack,omitempty"` + DataCenter string `protobuf:"bytes,3,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PeerInfo) Reset() { + *x = PeerInfo{} + mi := &file_mount_peer_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PeerInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PeerInfo) ProtoMessage() {} + +func (x *PeerInfo) ProtoReflect() protoreflect.Message { + mi := &file_mount_peer_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PeerInfo.ProtoReflect.Descriptor instead. +func (*PeerInfo) Descriptor() ([]byte, []int) { + return file_mount_peer_proto_rawDescGZIP(), []int{5} +} + +func (x *PeerInfo) GetPeerAddr() string { + if x != nil { + return x.PeerAddr + } + return "" +} + +func (x *PeerInfo) GetRack() string { + if x != nil { + return x.Rack + } + return "" +} + +func (x *PeerInfo) GetDataCenter() string { + if x != nil { + return x.DataCenter + } + return "" +} + +type FetchChunkRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + FileId string `protobuf:"bytes,1,opt,name=file_id,json=fileId,proto3" json:"file_id,omitempty"` + ExpectedEtag string `protobuf:"bytes,2,opt,name=expected_etag,json=expectedEtag,proto3" json:"expected_etag,omitempty"` // caller's expected MD5 over the FULL chunk. + // Only meaningful when offset=0 and length=0 + // (a whole-chunk fetch); partial reads can't + // be verified against a whole-chunk MD5. + // Fetcher re-verifies end-to-end. + ExpectedSize uint64 `protobuf:"varint,3,opt,name=expected_size,json=expectedSize,proto3" json:"expected_size,omitempty"` // filer-reported chunk byte count; server sizes + // its cache-read buffer to exactly this so a + // sub-max-part-size chunk doesn't trigger the + // cache wrapper's all-or-nothing miss path. + Offset uint64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"` // optional: byte offset within the chunk to + // start reading from. 0 (default) means a + // whole-chunk transfer starting at byte zero. + Length uint64 `protobuf:"varint,5,opt,name=length,proto3" json:"length,omitempty"` // optional: number of bytes to return from + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FetchChunkRequest) Reset() { + *x = FetchChunkRequest{} + mi := &file_mount_peer_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FetchChunkRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FetchChunkRequest) ProtoMessage() {} + +func (x *FetchChunkRequest) ProtoReflect() protoreflect.Message { + mi := &file_mount_peer_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FetchChunkRequest.ProtoReflect.Descriptor instead. +func (*FetchChunkRequest) Descriptor() ([]byte, []int) { + return file_mount_peer_proto_rawDescGZIP(), []int{6} +} + +func (x *FetchChunkRequest) GetFileId() string { + if x != nil { + return x.FileId + } + return "" +} + +func (x *FetchChunkRequest) GetExpectedEtag() string { + if x != nil { + return x.ExpectedEtag + } + return "" +} + +func (x *FetchChunkRequest) GetExpectedSize() uint64 { + if x != nil { + return x.ExpectedSize + } + return 0 +} + +func (x *FetchChunkRequest) GetOffset() uint64 { + if x != nil { + return x.Offset + } + return 0 +} + +func (x *FetchChunkRequest) GetLength() uint64 { + if x != nil { + return x.Length + } + return 0 +} + +type FetchChunkResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` // next frame of chunk bytes; concatenate across stream + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FetchChunkResponse) Reset() { + *x = FetchChunkResponse{} + mi := &file_mount_peer_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FetchChunkResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FetchChunkResponse) ProtoMessage() {} + +func (x *FetchChunkResponse) ProtoReflect() protoreflect.Message { + mi := &file_mount_peer_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FetchChunkResponse.ProtoReflect.Descriptor instead. +func (*FetchChunkResponse) Descriptor() ([]byte, []int) { + return file_mount_peer_proto_rawDescGZIP(), []int{7} +} + +func (x *FetchChunkResponse) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_mount_peer_proto protoreflect.FileDescriptor + +const file_mount_peer_proto_rawDesc = "" + + "\n" + + "\x10mount_peer.proto\x12\rmount_peer_pb\"\xa4\x01\n" + + "\x14ChunkAnnounceRequest\x12\x19\n" + + "\bfile_ids\x18\x01 \x03(\tR\afileIds\x12\x1b\n" + + "\tpeer_addr\x18\x02 \x01(\tR\bpeerAddr\x12\x12\n" + + "\x04rack\x18\x03 \x01(\tR\x04rack\x12\x1f\n" + + "\vttl_seconds\x18\x04 \x01(\x05R\n" + + "ttlSeconds\x12\x1f\n" + + "\vdata_center\x18\x05 \x01(\tR\n" + + "dataCenter\"C\n" + + "\x15ChunkAnnounceResponse\x12*\n" + + "\x11rejected_file_ids\x18\x01 \x03(\tR\x0frejectedFileIds\"/\n" + + "\x12ChunkLookupRequest\x12\x19\n" + + "\bfile_ids\x18\x01 \x03(\tR\afileIds\"\xef\x01\n" + + "\x13ChunkLookupResponse\x12T\n" + + "\fpeers_by_fid\x18\x01 \x03(\v22.mount_peer_pb.ChunkLookupResponse.PeersByFidEntryR\n" + + "peersByFid\x12+\n" + + "\x12not_owner_file_ids\x18\x02 \x03(\tR\x0fnotOwnerFileIds\x1aU\n" + + "\x0fPeersByFidEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12,\n" + + "\x05value\x18\x02 \x01(\v2\x16.mount_peer_pb.PeerSetR\x05value:\x028\x01\"8\n" + + "\aPeerSet\x12-\n" + + "\x05peers\x18\x01 \x03(\v2\x17.mount_peer_pb.PeerInfoR\x05peers\"\\\n" + + "\bPeerInfo\x12\x1b\n" + + "\tpeer_addr\x18\x01 \x01(\tR\bpeerAddr\x12\x12\n" + + "\x04rack\x18\x02 \x01(\tR\x04rack\x12\x1f\n" + + "\vdata_center\x18\x03 \x01(\tR\n" + + "dataCenter\"\xa6\x01\n" + + "\x11FetchChunkRequest\x12\x17\n" + + "\afile_id\x18\x01 \x01(\tR\x06fileId\x12#\n" + + "\rexpected_etag\x18\x02 \x01(\tR\fexpectedEtag\x12#\n" + + "\rexpected_size\x18\x03 \x01(\x04R\fexpectedSize\x12\x16\n" + + "\x06offset\x18\x04 \x01(\x04R\x06offset\x12\x16\n" + + "\x06length\x18\x05 \x01(\x04R\x06length\"(\n" + + "\x12FetchChunkResponse\x12\x12\n" + + "\x04data\x18\x01 \x01(\fR\x04data2\x98\x02\n" + + "\tMountPeer\x12\\\n" + + "\rChunkAnnounce\x12#.mount_peer_pb.ChunkAnnounceRequest\x1a$.mount_peer_pb.ChunkAnnounceResponse\"\x00\x12V\n" + + "\vChunkLookup\x12!.mount_peer_pb.ChunkLookupRequest\x1a\".mount_peer_pb.ChunkLookupResponse\"\x00\x12U\n" + + "\n" + + "FetchChunk\x12 .mount_peer_pb.FetchChunkRequest\x1a!.mount_peer_pb.FetchChunkResponse\"\x000\x01B6Z4github.com/seaweedfs/seaweedfs/weed/pb/mount_peer_pbb\x06proto3" + +var ( + file_mount_peer_proto_rawDescOnce sync.Once + file_mount_peer_proto_rawDescData []byte +) + +func file_mount_peer_proto_rawDescGZIP() []byte { + file_mount_peer_proto_rawDescOnce.Do(func() { + file_mount_peer_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_mount_peer_proto_rawDesc), len(file_mount_peer_proto_rawDesc))) + }) + return file_mount_peer_proto_rawDescData +} + +var file_mount_peer_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_mount_peer_proto_goTypes = []any{ + (*ChunkAnnounceRequest)(nil), // 0: mount_peer_pb.ChunkAnnounceRequest + (*ChunkAnnounceResponse)(nil), // 1: mount_peer_pb.ChunkAnnounceResponse + (*ChunkLookupRequest)(nil), // 2: mount_peer_pb.ChunkLookupRequest + (*ChunkLookupResponse)(nil), // 3: mount_peer_pb.ChunkLookupResponse + (*PeerSet)(nil), // 4: mount_peer_pb.PeerSet + (*PeerInfo)(nil), // 5: mount_peer_pb.PeerInfo + (*FetchChunkRequest)(nil), // 6: mount_peer_pb.FetchChunkRequest + (*FetchChunkResponse)(nil), // 7: mount_peer_pb.FetchChunkResponse + nil, // 8: mount_peer_pb.ChunkLookupResponse.PeersByFidEntry +} +var file_mount_peer_proto_depIdxs = []int32{ + 8, // 0: mount_peer_pb.ChunkLookupResponse.peers_by_fid:type_name -> mount_peer_pb.ChunkLookupResponse.PeersByFidEntry + 5, // 1: mount_peer_pb.PeerSet.peers:type_name -> mount_peer_pb.PeerInfo + 4, // 2: mount_peer_pb.ChunkLookupResponse.PeersByFidEntry.value:type_name -> mount_peer_pb.PeerSet + 0, // 3: mount_peer_pb.MountPeer.ChunkAnnounce:input_type -> mount_peer_pb.ChunkAnnounceRequest + 2, // 4: mount_peer_pb.MountPeer.ChunkLookup:input_type -> mount_peer_pb.ChunkLookupRequest + 6, // 5: mount_peer_pb.MountPeer.FetchChunk:input_type -> mount_peer_pb.FetchChunkRequest + 1, // 6: mount_peer_pb.MountPeer.ChunkAnnounce:output_type -> mount_peer_pb.ChunkAnnounceResponse + 3, // 7: mount_peer_pb.MountPeer.ChunkLookup:output_type -> mount_peer_pb.ChunkLookupResponse + 7, // 8: mount_peer_pb.MountPeer.FetchChunk:output_type -> mount_peer_pb.FetchChunkResponse + 6, // [6:9] is the sub-list for method output_type + 3, // [3:6] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_mount_peer_proto_init() } +func file_mount_peer_proto_init() { + if File_mount_peer_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_mount_peer_proto_rawDesc), len(file_mount_peer_proto_rawDesc)), + NumEnums: 0, + NumMessages: 9, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_mount_peer_proto_goTypes, + DependencyIndexes: file_mount_peer_proto_depIdxs, + MessageInfos: file_mount_peer_proto_msgTypes, + }.Build() + File_mount_peer_proto = out.File + file_mount_peer_proto_goTypes = nil + file_mount_peer_proto_depIdxs = nil +} diff --git a/weed/pb/mount_peer_pb/mount_peer_grpc.pb.go b/weed/pb/mount_peer_pb/mount_peer_grpc.pb.go new file mode 100644 index 000000000..aea891f88 --- /dev/null +++ b/weed/pb/mount_peer_pb/mount_peer_grpc.pb.go @@ -0,0 +1,229 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.33.4 +// source: mount_peer.proto + +package mount_peer_pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + MountPeer_ChunkAnnounce_FullMethodName = "/mount_peer_pb.MountPeer/ChunkAnnounce" + MountPeer_ChunkLookup_FullMethodName = "/mount_peer_pb.MountPeer/ChunkLookup" + MountPeer_FetchChunk_FullMethodName = "/mount_peer_pb.MountPeer/FetchChunk" +) + +// MountPeerClient is the client API for MountPeer service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type MountPeerClient interface { + // ChunkAnnounce: the caller asserts it currently holds the listed fids in + // its local chunk cache and is willing to serve them to peers. The + // receiver accepts only fids for which it is the HRW-assigned owner on + // its current seed view; others are returned in rejected_file_ids. + ChunkAnnounce(ctx context.Context, in *ChunkAnnounceRequest, opts ...grpc.CallOption) (*ChunkAnnounceResponse, error) + // ChunkLookup: asks the receiver for known holders of each requested fid. + // The receiver responds only for fids it owns; others are listed in + // not_owner_file_ids so the caller can retry against the correct owner + // after refreshing its own seed view. + ChunkLookup(ctx context.Context, in *ChunkLookupRequest, opts ...grpc.CallOption) (*ChunkLookupResponse, error) + // FetchChunk: server-streams the bytes of a cached chunk to a peer. + // Streaming avoids the default gRPC 4 MiB message cap for typical + // 16 MiB chunks and lets the receiver assemble into a preallocated + // buffer. Not-cached / cache-miss returns a gRPC NOT_FOUND status. + // The fetcher re-verifies MD5 against expected_etag end-to-end after + // the stream completes. + FetchChunk(ctx context.Context, in *FetchChunkRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[FetchChunkResponse], error) +} + +type mountPeerClient struct { + cc grpc.ClientConnInterface +} + +func NewMountPeerClient(cc grpc.ClientConnInterface) MountPeerClient { + return &mountPeerClient{cc} +} + +func (c *mountPeerClient) ChunkAnnounce(ctx context.Context, in *ChunkAnnounceRequest, opts ...grpc.CallOption) (*ChunkAnnounceResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ChunkAnnounceResponse) + err := c.cc.Invoke(ctx, MountPeer_ChunkAnnounce_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *mountPeerClient) ChunkLookup(ctx context.Context, in *ChunkLookupRequest, opts ...grpc.CallOption) (*ChunkLookupResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ChunkLookupResponse) + err := c.cc.Invoke(ctx, MountPeer_ChunkLookup_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *mountPeerClient) FetchChunk(ctx context.Context, in *FetchChunkRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[FetchChunkResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &MountPeer_ServiceDesc.Streams[0], MountPeer_FetchChunk_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[FetchChunkRequest, FetchChunkResponse]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type MountPeer_FetchChunkClient = grpc.ServerStreamingClient[FetchChunkResponse] + +// MountPeerServer is the server API for MountPeer service. +// All implementations must embed UnimplementedMountPeerServer +// for forward compatibility. +type MountPeerServer interface { + // ChunkAnnounce: the caller asserts it currently holds the listed fids in + // its local chunk cache and is willing to serve them to peers. The + // receiver accepts only fids for which it is the HRW-assigned owner on + // its current seed view; others are returned in rejected_file_ids. + ChunkAnnounce(context.Context, *ChunkAnnounceRequest) (*ChunkAnnounceResponse, error) + // ChunkLookup: asks the receiver for known holders of each requested fid. + // The receiver responds only for fids it owns; others are listed in + // not_owner_file_ids so the caller can retry against the correct owner + // after refreshing its own seed view. + ChunkLookup(context.Context, *ChunkLookupRequest) (*ChunkLookupResponse, error) + // FetchChunk: server-streams the bytes of a cached chunk to a peer. + // Streaming avoids the default gRPC 4 MiB message cap for typical + // 16 MiB chunks and lets the receiver assemble into a preallocated + // buffer. Not-cached / cache-miss returns a gRPC NOT_FOUND status. + // The fetcher re-verifies MD5 against expected_etag end-to-end after + // the stream completes. + FetchChunk(*FetchChunkRequest, grpc.ServerStreamingServer[FetchChunkResponse]) error + mustEmbedUnimplementedMountPeerServer() +} + +// UnimplementedMountPeerServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedMountPeerServer struct{} + +func (UnimplementedMountPeerServer) ChunkAnnounce(context.Context, *ChunkAnnounceRequest) (*ChunkAnnounceResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ChunkAnnounce not implemented") +} +func (UnimplementedMountPeerServer) ChunkLookup(context.Context, *ChunkLookupRequest) (*ChunkLookupResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ChunkLookup not implemented") +} +func (UnimplementedMountPeerServer) FetchChunk(*FetchChunkRequest, grpc.ServerStreamingServer[FetchChunkResponse]) error { + return status.Errorf(codes.Unimplemented, "method FetchChunk not implemented") +} +func (UnimplementedMountPeerServer) mustEmbedUnimplementedMountPeerServer() {} +func (UnimplementedMountPeerServer) testEmbeddedByValue() {} + +// UnsafeMountPeerServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to MountPeerServer will +// result in compilation errors. +type UnsafeMountPeerServer interface { + mustEmbedUnimplementedMountPeerServer() +} + +func RegisterMountPeerServer(s grpc.ServiceRegistrar, srv MountPeerServer) { + // If the following call pancis, it indicates UnimplementedMountPeerServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&MountPeer_ServiceDesc, srv) +} + +func _MountPeer_ChunkAnnounce_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ChunkAnnounceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MountPeerServer).ChunkAnnounce(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: MountPeer_ChunkAnnounce_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MountPeerServer).ChunkAnnounce(ctx, req.(*ChunkAnnounceRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _MountPeer_ChunkLookup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ChunkLookupRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MountPeerServer).ChunkLookup(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: MountPeer_ChunkLookup_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MountPeerServer).ChunkLookup(ctx, req.(*ChunkLookupRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _MountPeer_FetchChunk_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(FetchChunkRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(MountPeerServer).FetchChunk(m, &grpc.GenericServerStream[FetchChunkRequest, FetchChunkResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type MountPeer_FetchChunkServer = grpc.ServerStreamingServer[FetchChunkResponse] + +// MountPeer_ServiceDesc is the grpc.ServiceDesc for MountPeer service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var MountPeer_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "mount_peer_pb.MountPeer", + HandlerType: (*MountPeerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ChunkAnnounce", + Handler: _MountPeer_ChunkAnnounce_Handler, + }, + { + MethodName: "ChunkLookup", + Handler: _MountPeer_ChunkLookup_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "FetchChunk", + Handler: _MountPeer_FetchChunk_Handler, + ServerStreams: true, + }, + }, + Metadata: "mount_peer.proto", +}