feat: pass expected_data_size from clients for size-aware assignment (#9032)

* feat: pass expected_data_size from clients for size-aware assignment

Add expected_data_size field to AssignRequest (master proto) and
AssignVolumeRequest (filer proto) so clients can hint how large the
data will be. The master uses this instead of the 1MB default when
tracking pending volume sizes for weighted assignment.

- Add expected_data_size to master.proto AssignRequest
- Add expected_data_size to filer.proto AssignVolumeRequest
- Wire through filer AssignVolume handler
- Wire through HTTP submit handler (uses actual upload size)
- Add ExpectedDataSize to VolumeAssignRequest in operation package
- Topology.PickForWrite accepts optional expectedDataSize parameter

* fix: guard integer conversions in expected_data_size path

- common.go: clamp OriginalDataSize to non-negative before uint64 cast
- topology.go: cap expectedDataSize at math.MaxInt64 before int64 cast

* fix: parse dataSize hint in HTTP /dir/assign and test non-zero expectedDataSize

- HTTP /dir/assign now parses optional "dataSize" query parameter
  and passes it to PickForWrite instead of hardcoded 0
- Add test assertion for PickForWrite with non-zero expectedDataSize
This commit is contained in:
Chris Lu
2026-04-11 11:30:47 -07:00
committed by GitHub
parent e2c79af6ec
commit 10b0bdce02
13 changed files with 83 additions and 33 deletions
@@ -294,6 +294,7 @@ message AssignVolumeRequest {
string rack = 7;
string data_node = 9;
string disk_type = 8;
uint64 expected_data_size = 10; // hint for size-aware volume selection
}
message AssignVolumeResponse {
+1
View File
@@ -231,6 +231,7 @@ message AssignRequest {
uint32 memory_map_max_size_mb = 8;
uint32 writable_volume_count = 9;
string disk_type = 10;
uint64 expected_data_size = 11; // hint for size-aware volume selection
}
message VolumeGrowRequest {
+2
View File
@@ -26,6 +26,7 @@ type VolumeAssignRequest struct {
Rack string
DataNode string
WritableVolumeCount uint32
ExpectedDataSize uint64
}
type AssignResult struct {
@@ -86,6 +87,7 @@ func Assign(ctx context.Context, masterFn GetMasterFn, grpcDialOption grpc.DialO
Rack: request.Rack,
DataNode: request.DataNode,
WritableVolumeCount: request.WritableVolumeCount,
ExpectedDataSize: request.ExpectedDataSize,
}
resp, grpcErr := masterClient.Assign(attemptCtx, req)
if grpcErr != nil {
+1
View File
@@ -297,6 +297,7 @@ message AssignVolumeRequest {
string rack = 7;
string data_node = 9;
string disk_type = 8;
uint64 expected_data_size = 10; // hint for size-aware volume selection
}
message AssignVolumeResponse {
+24 -14
View File
@@ -1879,18 +1879,19 @@ func (x *StreamRenameEntryResponse) GetTsNs() int64 {
}
type AssignVolumeRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Count int32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection,proto3" json:"collection,omitempty"`
Replication string `protobuf:"bytes,3,opt,name=replication,proto3" json:"replication,omitempty"`
TtlSec int32 `protobuf:"varint,4,opt,name=ttl_sec,json=ttlSec,proto3" json:"ttl_sec,omitempty"`
DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"`
Path string `protobuf:"bytes,6,opt,name=path,proto3" json:"path,omitempty"`
Rack string `protobuf:"bytes,7,opt,name=rack,proto3" json:"rack,omitempty"`
DataNode string `protobuf:"bytes,9,opt,name=data_node,json=dataNode,proto3" json:"data_node,omitempty"`
DiskType string `protobuf:"bytes,8,opt,name=disk_type,json=diskType,proto3" json:"disk_type,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
Count int32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection,proto3" json:"collection,omitempty"`
Replication string `protobuf:"bytes,3,opt,name=replication,proto3" json:"replication,omitempty"`
TtlSec int32 `protobuf:"varint,4,opt,name=ttl_sec,json=ttlSec,proto3" json:"ttl_sec,omitempty"`
DataCenter string `protobuf:"bytes,5,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"`
Path string `protobuf:"bytes,6,opt,name=path,proto3" json:"path,omitempty"`
Rack string `protobuf:"bytes,7,opt,name=rack,proto3" json:"rack,omitempty"`
DataNode string `protobuf:"bytes,9,opt,name=data_node,json=dataNode,proto3" json:"data_node,omitempty"`
DiskType string `protobuf:"bytes,8,opt,name=disk_type,json=diskType,proto3" json:"disk_type,omitempty"`
ExpectedDataSize uint64 `protobuf:"varint,10,opt,name=expected_data_size,json=expectedDataSize,proto3" json:"expected_data_size,omitempty"` // hint for size-aware volume selection
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *AssignVolumeRequest) Reset() {
@@ -1986,6 +1987,13 @@ func (x *AssignVolumeRequest) GetDiskType() string {
return ""
}
func (x *AssignVolumeRequest) GetExpectedDataSize() uint64 {
if x != nil {
return x.ExpectedDataSize
}
return 0
}
type AssignVolumeResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
FileId string `protobuf:"bytes,1,opt,name=file_id,json=fileId,proto3" json:"file_id,omitempty"`
@@ -5248,7 +5256,7 @@ const file_filer_proto_rawDesc = "" +
"\x19StreamRenameEntryResponse\x12\x1c\n" +
"\tdirectory\x18\x01 \x01(\tR\tdirectory\x12J\n" +
"\x12event_notification\x18\x02 \x01(\v2\x1b.filer_pb.EventNotificationR\x11eventNotification\x12\x13\n" +
"\x05ts_ns\x18\x03 \x01(\x03R\x04tsNs\"\x89\x02\n" +
"\x05ts_ns\x18\x03 \x01(\x03R\x04tsNs\"\xb7\x02\n" +
"\x13AssignVolumeRequest\x12\x14\n" +
"\x05count\x18\x01 \x01(\x05R\x05count\x12\x1e\n" +
"\n" +
@@ -5261,7 +5269,9 @@ const file_filer_proto_rawDesc = "" +
"\x04path\x18\x06 \x01(\tR\x04path\x12\x12\n" +
"\x04rack\x18\a \x01(\tR\x04rack\x12\x1b\n" +
"\tdata_node\x18\t \x01(\tR\bdataNode\x12\x1b\n" +
"\tdisk_type\x18\b \x01(\tR\bdiskType\"\xe1\x01\n" +
"\tdisk_type\x18\b \x01(\tR\bdiskType\x12,\n" +
"\x12expected_data_size\x18\n" +
" \x01(\x04R\x10expectedDataSize\"\xe1\x01\n" +
"\x14AssignVolumeResponse\x12\x17\n" +
"\afile_id\x18\x01 \x01(\tR\x06fileId\x12\x14\n" +
"\x05count\x18\x04 \x01(\x05R\x05count\x12\x12\n" +
+1
View File
@@ -242,6 +242,7 @@ message AssignRequest {
uint32 memory_map_max_size_mb = 8;
uint32 writable_volume_count = 9;
string disk_type = 10;
uint64 expected_data_size = 11; // hint for size-aware volume selection
}
message VolumeGrowRequest {
+11 -2
View File
@@ -1430,6 +1430,7 @@ type AssignRequest struct {
MemoryMapMaxSizeMb uint32 `protobuf:"varint,8,opt,name=memory_map_max_size_mb,json=memoryMapMaxSizeMb,proto3" json:"memory_map_max_size_mb,omitempty"`
WritableVolumeCount uint32 `protobuf:"varint,9,opt,name=writable_volume_count,json=writableVolumeCount,proto3" json:"writable_volume_count,omitempty"`
DiskType string `protobuf:"bytes,10,opt,name=disk_type,json=diskType,proto3" json:"disk_type,omitempty"`
ExpectedDataSize uint64 `protobuf:"varint,11,opt,name=expected_data_size,json=expectedDataSize,proto3" json:"expected_data_size,omitempty"` // hint for size-aware volume selection
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -1534,6 +1535,13 @@ func (x *AssignRequest) GetDiskType() string {
return ""
}
func (x *AssignRequest) GetExpectedDataSize() uint64 {
if x != nil {
return x.ExpectedDataSize
}
return 0
}
type VolumeGrowRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
WritableVolumeCount uint32 `protobuf:"varint,1,opt,name=writable_volume_count,json=writableVolumeCount,proto3" json:"writable_volume_count,omitempty"`
@@ -4500,7 +4508,7 @@ const file_master_proto_rawDesc = "" +
"public_url\x18\x02 \x01(\tR\tpublicUrl\x12\x1b\n" +
"\tgrpc_port\x18\x03 \x01(\rR\bgrpcPort\x12\x1f\n" +
"\vdata_center\x18\x04 \x01(\tR\n" +
"dataCenter\"\xd0\x02\n" +
"dataCenter\"\xfe\x02\n" +
"\rAssignRequest\x12\x14\n" +
"\x05count\x18\x01 \x01(\x04R\x05count\x12 \n" +
"\vreplication\x18\x02 \x01(\tR\vreplication\x12\x1e\n" +
@@ -4515,7 +4523,8 @@ const file_master_proto_rawDesc = "" +
"\x16memory_map_max_size_mb\x18\b \x01(\rR\x12memoryMapMaxSizeMb\x122\n" +
"\x15writable_volume_count\x18\t \x01(\rR\x13writableVolumeCount\x12\x1b\n" +
"\tdisk_type\x18\n" +
" \x01(\tR\bdiskType\"\xbe\x02\n" +
" \x01(\tR\bdiskType\x12,\n" +
"\x12expected_data_size\x18\v \x01(\x04R\x10expectedDataSize\"\xbe\x02\n" +
"\x11VolumeGrowRequest\x122\n" +
"\x15writable_volume_count\x18\x01 \x01(\rR\x13writableVolumeCount\x12 \n" +
"\vreplication\x18\x02 \x01(\tR\vreplication\x12\x1e\n" +
+8 -7
View File
@@ -154,13 +154,14 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
}
}
ar := &operation.VolumeAssignRequest{
Count: count,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
DiskType: r.FormValue("disk"),
Count: count,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
DiskType: r.FormValue("disk"),
ExpectedDataSize: uint64(max(int64(0), int64(pu.OriginalDataSize))),
}
assignResult, ae := operation.Assign(ctx, masterFn, grpcDialOption, ar)
if ae != nil {
+4
View File
@@ -395,6 +395,10 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
}
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
assignRequest.ExpectedDataSize = req.ExpectedDataSize
if altRequest != nil {
altRequest.ExpectedDataSize = req.ExpectedDataSize
}
assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
+1 -1
View File
@@ -111,7 +111,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
)
for time.Now().Sub(startTime) < maxTimeout {
fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(req.Count, option, vl)
fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(req.Count, option, vl, req.ExpectedDataSize)
if shouldGrow && !vl.HasGrowRequest() && !ms.option.VolumeGrowthDisabled {
if err != nil && ms.Topo.AvailableSpaceFor(option) <= 0 {
err = fmt.Errorf("%s and no free volumes left for %s", err.Error(), option.String())
+6 -1
View File
@@ -144,6 +144,11 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
writableVolumeCount = 0
}
expectedDataSize, e := strconv.ParseUint(r.FormValue("dataSize"), 10, 64)
if e != nil {
expectedDataSize = 0
}
option, err := ms.getVolumeGrowOption(r)
if err != nil {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
@@ -166,7 +171,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
for time.Since(startTime) < maxTimeout {
fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(requestedCount, option, vl)
fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(requestedCount, option, vl, expectedDataSize)
if shouldGrow && !vl.HasGrowRequest() && !ms.option.VolumeGrowthDisabled {
glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr)
if err != nil && ms.Topo.AvailableSpaceFor(option) <= 0 {
+10 -7
View File
@@ -320,12 +320,11 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
return next, nil
}
// EstimatedNeedleSizeBytes is the assumed size per assigned file ID, used to
// estimate pending bytes between heartbeats. Intentionally coarse — it only
// needs to spread load, not be precise.
const EstimatedNeedleSizeBytes = 1024 * 1024 // 1 MB
// DefaultNeedleSizeEstimate is the fallback per-file-ID size estimate when
// the client does not provide an expected data size.
const DefaultNeedleSizeEstimate uint64 = 1024 * 1024 // 1 MB
func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, volumeLayout *VolumeLayout) (fileId string, count uint64, volumeLocationList *VolumeLocationList, shouldGrow bool, err error) {
func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, volumeLayout *VolumeLayout, expectedDataSize uint64) (fileId string, count uint64, volumeLocationList *VolumeLocationList, shouldGrow bool, err error) {
var vid needle.VolumeId
vid, count, volumeLocationList, shouldGrow, err = volumeLayout.PickForWrite(requestedCount, option)
if err != nil {
@@ -335,8 +334,12 @@ func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption,
return "", 0, nil, shouldGrow, fmt.Errorf("%s available for collection:%s replication:%s ttl:%s", NoWritableVolumes, option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
// Track estimated assigned bytes to spread load between heartbeats.
// Compute in uint64 and cap to avoid overflow on the int64 cast.
pendingBytes := min(uint64(count)*EstimatedNeedleSizeBytes, uint64(math.MaxInt64))
// Use the client hint if provided, otherwise fall back to 1MB estimate.
sizePerFile := DefaultNeedleSizeEstimate
if expectedDataSize > 0 {
sizePerFile = expectedDataSize
}
pendingBytes := min(uint64(count)*sizePerFile, uint64(math.MaxInt64))
volumeLayout.RecordAssign(vid, int64(pendingBytes))
nextFileId := t.Sequence.NextFileId(requestedCount)
fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String()
+13 -1
View File
@@ -433,7 +433,7 @@ func TestPickForWrite(t *testing.T) {
continue
}
volumeGrowOption.DataNode = dn
fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl)
fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl, 0)
if dc == "dc0" {
if err == nil || count != 0 || !shouldGrow {
fmt.Println(dc, r, dn, "pick for write should be with error")
@@ -452,6 +452,18 @@ func TestPickForWrite(t *testing.T) {
fmt.Println(dc, r, dn, "pick for write error : not should grow")
t.Fail()
}
// Also verify with a non-zero expectedDataSize hint
if dc != "dc0" {
fileId2, count2, _, shouldGrow2, err2 := topo.PickForWrite(1, volumeGrowOption, vl, 1024)
if err2 != nil {
fmt.Println(dc, r, dn, "pick for write with size hint error:", err2)
t.Fail()
} else if count2 == 0 || len(fileId2) == 0 || shouldGrow2 {
fmt.Println(dc, r, dn, "pick for write with size hint unexpected result")
t.Fail()
}
}
}
}
}