mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(scheduler): give worker tasks a real per-attempt execution deadline (#9041)
* fix(scheduler): give worker tasks a real per-attempt execution deadline The plugin scheduler derived the per-attempt execution deadline as DetectionTimeoutSeconds * 2, which capped every worker task at twice the cluster-scan budget regardless of actual work. For volume_balance batches this was 240s — far too short for 20 large volume copies, so every attempt died at "context deadline exceeded" and all in-flight sub-RPCs surfaced as "context canceled". Retries restarted from move 1 and hit the same wall. Add an explicit ExecutionTimeoutSeconds field to the plugin proto and make each handler declare its own baseline (1800s for vacuum, balance, EC; 3600s for iceberg). Size-aware handlers also emit an estimated_runtime_seconds parameter on each proposal so the scheduler extends the per-attempt deadline based on actual workload: - volume_balance batch: max(largest single move, total / concurrency) at 5 min/GB, so a skewed batch with one big volume isn't averaged away. - volume_balance single, vacuum (already), erasure_coding (10 min/GB), ec_balance (5 min/GB): per-volume budgets. admin_script and iceberg keep the configurable handler default since their workloads are opaque to the detector. * fix(scheduler): apply descriptor defaults to existing persisted configs The previous commit added execution_timeout_seconds to the proto and each handler's descriptor defaults, but two paths still left existing deployments broken: 1. deriveSchedulerAdminRuntime returned stored AdminRuntime configs as-is. Persisted configs from older versions have no execution_timeout_seconds, so the scheduler fell back to the 90s default — worse than the prior 240s behavior. Overlay descriptor defaults for any zero numeric fields when loading. 2. The admin form did not round-trip execution_timeout_seconds, so a normal save would clear it back to zero. Add the input field, the fillAdminSettings/collectAdminSettings hooks, and as defense in depth reapply descriptor defaults in UpdatePluginJobTypeConfigAPI before persisting so a stale form can never silently clobber a baseline. * fix(volume_balance): account for partial scheduling rounds in batch estimate With N moves and C slots, the busiest slot processes ceil(N/C) moves, not N/C. Dividing total seconds by C underestimates wall-clock time whenever N is not a multiple of C — e.g. 6 moves at concurrency 5 needs 2 rounds, not 1.2. Use avg * ceil(N/C) so partial rounds are counted as full ones. * fix(volume_balance): scale minBudget per wave instead of per move Orchestration overhead (setup/teardown for the parallel move runner) happens once per wave, not once per move. Use numRounds*60 as the floor instead of len(moves)*60 so the minimum doesn't inflate linearly with batch size when individual moves are tiny.
This commit is contained in:
@@ -452,6 +452,13 @@ func (s *AdminServer) UpdatePluginJobTypeConfigAPI(w http.ResponseWriter, r *htt
|
||||
}
|
||||
config.UpdatedBy = username
|
||||
|
||||
// Reapply descriptor defaults so a save from an older form (or a UI
|
||||
// that omits new fields) cannot silently clear a baseline like
|
||||
// execution_timeout_seconds back to zero.
|
||||
if descriptor, err := s.LoadPluginJobTypeDescriptor(jobType); err == nil && descriptor != nil {
|
||||
applyDescriptorDefaultsToPersistedConfig(config, descriptor)
|
||||
}
|
||||
|
||||
if err := s.SavePluginJobTypeConfig(config); err != nil {
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
@@ -916,6 +923,9 @@ func applyDescriptorDefaultsToPersistedConfig(
|
||||
if runtime.JobTypeMaxRuntimeSeconds <= 0 {
|
||||
runtime.JobTypeMaxRuntimeSeconds = defaults.JobTypeMaxRuntimeSeconds
|
||||
}
|
||||
if runtime.ExecutionTimeoutSeconds <= 0 {
|
||||
runtime.ExecutionTimeoutSeconds = defaults.ExecutionTimeoutSeconds
|
||||
}
|
||||
if runtime.RetryBackoffSeconds <= 0 {
|
||||
runtime.RetryBackoffSeconds = defaults.RetryBackoffSeconds
|
||||
}
|
||||
|
||||
@@ -1044,6 +1044,7 @@ func (r *Plugin) ensureJobTypeConfigFromDescriptor(jobType string, descriptor *p
|
||||
RetryLimit: defaults.RetryLimit,
|
||||
RetryBackoffSeconds: defaults.RetryBackoffSeconds,
|
||||
JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds,
|
||||
ExecutionTimeoutSeconds: defaults.ExecutionTimeoutSeconds,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -468,7 +468,7 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err
|
||||
policy := schedulerPolicy{
|
||||
DetectionInterval: durationFromSeconds(adminRuntime.DetectionIntervalSeconds, defaultScheduledDetectionInterval),
|
||||
DetectionTimeout: durationFromSeconds(adminRuntime.DetectionTimeoutSeconds, defaultScheduledDetectionTimeout),
|
||||
ExecutionTimeout: defaultScheduledExecutionTimeout,
|
||||
ExecutionTimeout: durationFromSeconds(adminRuntime.ExecutionTimeoutSeconds, defaultScheduledExecutionTimeout),
|
||||
JobTypeMaxRuntime: durationFromSeconds(adminRuntime.JobTypeMaxRuntimeSeconds, defaultScheduledJobTypeMaxRuntime),
|
||||
RetryBackoff: durationFromSeconds(adminRuntime.RetryBackoffSeconds, defaultScheduledRetryBackoff),
|
||||
MaxResults: adminRuntime.MaxJobsPerDetection,
|
||||
@@ -503,12 +503,9 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err
|
||||
policy.JobTypeMaxRuntime = defaultScheduledJobTypeMaxRuntime
|
||||
}
|
||||
|
||||
// Plugin protocol currently has only detection timeout in admin settings.
|
||||
execTimeout := time.Duration(adminRuntime.DetectionTimeoutSeconds*2) * time.Second
|
||||
if execTimeout < defaultScheduledExecutionTimeout {
|
||||
execTimeout = defaultScheduledExecutionTimeout
|
||||
if policy.ExecutionTimeout < defaultScheduledExecutionTimeout {
|
||||
policy.ExecutionTimeout = defaultScheduledExecutionTimeout
|
||||
}
|
||||
policy.ExecutionTimeout = execTimeout
|
||||
|
||||
return policy, true, nil
|
||||
}
|
||||
@@ -610,6 +607,37 @@ func deriveSchedulerAdminRuntime(
|
||||
) *plugin_pb.AdminRuntimeConfig {
|
||||
if cfg != nil && cfg.AdminRuntime != nil {
|
||||
adminConfig := *cfg.AdminRuntime
|
||||
// Overlay descriptor defaults for any zero numeric fields. Persisted
|
||||
// configs from older versions have no execution_timeout_seconds, and
|
||||
// without this overlay the scheduler would fall back to the 90s
|
||||
// default instead of the handler's declared baseline.
|
||||
if descriptor != nil && descriptor.AdminRuntimeDefaults != nil {
|
||||
defaults := descriptor.AdminRuntimeDefaults
|
||||
if adminConfig.DetectionIntervalSeconds <= 0 {
|
||||
adminConfig.DetectionIntervalSeconds = defaults.DetectionIntervalSeconds
|
||||
}
|
||||
if adminConfig.DetectionTimeoutSeconds <= 0 {
|
||||
adminConfig.DetectionTimeoutSeconds = defaults.DetectionTimeoutSeconds
|
||||
}
|
||||
if adminConfig.MaxJobsPerDetection <= 0 {
|
||||
adminConfig.MaxJobsPerDetection = defaults.MaxJobsPerDetection
|
||||
}
|
||||
if adminConfig.GlobalExecutionConcurrency <= 0 {
|
||||
adminConfig.GlobalExecutionConcurrency = defaults.GlobalExecutionConcurrency
|
||||
}
|
||||
if adminConfig.PerWorkerExecutionConcurrency <= 0 {
|
||||
adminConfig.PerWorkerExecutionConcurrency = defaults.PerWorkerExecutionConcurrency
|
||||
}
|
||||
if adminConfig.RetryBackoffSeconds <= 0 {
|
||||
adminConfig.RetryBackoffSeconds = defaults.RetryBackoffSeconds
|
||||
}
|
||||
if adminConfig.JobTypeMaxRuntimeSeconds <= 0 {
|
||||
adminConfig.JobTypeMaxRuntimeSeconds = defaults.JobTypeMaxRuntimeSeconds
|
||||
}
|
||||
if adminConfig.ExecutionTimeoutSeconds <= 0 {
|
||||
adminConfig.ExecutionTimeoutSeconds = defaults.ExecutionTimeoutSeconds
|
||||
}
|
||||
}
|
||||
return &adminConfig
|
||||
}
|
||||
|
||||
@@ -628,6 +656,7 @@ func deriveSchedulerAdminRuntime(
|
||||
RetryLimit: defaults.RetryLimit,
|
||||
RetryBackoffSeconds: defaults.RetryBackoffSeconds,
|
||||
JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds,
|
||||
ExecutionTimeoutSeconds: defaults.ExecutionTimeoutSeconds,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -266,6 +266,11 @@ templ Plugin(page string, initialJob string, lane string) {
|
||||
<label class="form-label" for="plugin-admin-detection-timeout">Detection Timeout (s)</label>
|
||||
<input type="number" class="form-control" id="plugin-admin-detection-timeout" min="0"/>
|
||||
</div>
|
||||
<div class="col-12">
|
||||
<label class="form-label" for="plugin-admin-execution-timeout">Execution Timeout (s)</label>
|
||||
<input type="number" class="form-control" id="plugin-admin-execution-timeout" min="0"/>
|
||||
<div class="form-text">Per-attempt deadline for one job. Size-aware tasks may extend this automatically.</div>
|
||||
</div>
|
||||
<div class="col-12">
|
||||
<label class="form-label" for="plugin-admin-max-runtime">Job Type Max Runtime (s)</label>
|
||||
<input type="number" class="form-control" id="plugin-admin-max-runtime" min="0"/>
|
||||
@@ -2519,6 +2524,7 @@ templ Plugin(page string, initialJob string, lane string) {
|
||||
document.getElementById('plugin-admin-enabled').checked = pickBool('enabled');
|
||||
document.getElementById('plugin-admin-detection-interval').value = String(pickNumber('detection_interval_seconds'));
|
||||
document.getElementById('plugin-admin-detection-timeout').value = String(pickNumber('detection_timeout_seconds'));
|
||||
document.getElementById('plugin-admin-execution-timeout').value = String(pickNumber('execution_timeout_seconds'));
|
||||
document.getElementById('plugin-admin-max-runtime').value = String(pickNumber('job_type_max_runtime_seconds'));
|
||||
document.getElementById('plugin-admin-max-results').value = String(pickNumber('max_jobs_per_detection'));
|
||||
document.getElementById('plugin-admin-global-exec').value = String(pickNumber('global_execution_concurrency'));
|
||||
@@ -2544,6 +2550,7 @@ templ Plugin(page string, initialJob string, lane string) {
|
||||
enabled: !!document.getElementById('plugin-admin-enabled').checked,
|
||||
detection_interval_seconds: getInt('plugin-admin-detection-interval'),
|
||||
detection_timeout_seconds: getInt('plugin-admin-detection-timeout'),
|
||||
execution_timeout_seconds: getInt('plugin-admin-execution-timeout'),
|
||||
job_type_max_runtime_seconds: getInt('plugin-admin-max-runtime'),
|
||||
max_jobs_per_detection: getInt('plugin-admin-max-results'),
|
||||
global_execution_concurrency: getInt('plugin-admin-global-exec'),
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -233,6 +233,10 @@ message AdminRuntimeDefaults {
|
||||
int32 retry_limit = 7;
|
||||
int32 retry_backoff_seconds = 8;
|
||||
int32 job_type_max_runtime_seconds = 9;
|
||||
// Per-attempt execution deadline for one job. Distinct from
|
||||
// detection_timeout (which bounds the detection scan) and
|
||||
// job_type_max_runtime (a lane-wide budget across all attempts).
|
||||
int32 execution_timeout_seconds = 10;
|
||||
}
|
||||
|
||||
message AdminRuntimeConfig {
|
||||
@@ -245,6 +249,7 @@ message AdminRuntimeConfig {
|
||||
int32 retry_limit = 7;
|
||||
int32 retry_backoff_seconds = 8;
|
||||
int32 job_type_max_runtime_seconds = 9;
|
||||
int32 execution_timeout_seconds = 10;
|
||||
}
|
||||
|
||||
message RunDetectionRequest {
|
||||
|
||||
@@ -2493,8 +2493,12 @@ type AdminRuntimeDefaults struct {
|
||||
RetryLimit int32 `protobuf:"varint,7,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"`
|
||||
RetryBackoffSeconds int32 `protobuf:"varint,8,opt,name=retry_backoff_seconds,json=retryBackoffSeconds,proto3" json:"retry_backoff_seconds,omitempty"`
|
||||
JobTypeMaxRuntimeSeconds int32 `protobuf:"varint,9,opt,name=job_type_max_runtime_seconds,json=jobTypeMaxRuntimeSeconds,proto3" json:"job_type_max_runtime_seconds,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
// Per-attempt execution deadline for one job. Distinct from
|
||||
// detection_timeout (which bounds the detection scan) and
|
||||
// job_type_max_runtime (a lane-wide budget across all attempts).
|
||||
ExecutionTimeoutSeconds int32 `protobuf:"varint,10,opt,name=execution_timeout_seconds,json=executionTimeoutSeconds,proto3" json:"execution_timeout_seconds,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *AdminRuntimeDefaults) Reset() {
|
||||
@@ -2590,6 +2594,13 @@ func (x *AdminRuntimeDefaults) GetJobTypeMaxRuntimeSeconds() int32 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *AdminRuntimeDefaults) GetExecutionTimeoutSeconds() int32 {
|
||||
if x != nil {
|
||||
return x.ExecutionTimeoutSeconds
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type AdminRuntimeConfig struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"`
|
||||
@@ -2601,6 +2612,7 @@ type AdminRuntimeConfig struct {
|
||||
RetryLimit int32 `protobuf:"varint,7,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"`
|
||||
RetryBackoffSeconds int32 `protobuf:"varint,8,opt,name=retry_backoff_seconds,json=retryBackoffSeconds,proto3" json:"retry_backoff_seconds,omitempty"`
|
||||
JobTypeMaxRuntimeSeconds int32 `protobuf:"varint,9,opt,name=job_type_max_runtime_seconds,json=jobTypeMaxRuntimeSeconds,proto3" json:"job_type_max_runtime_seconds,omitempty"`
|
||||
ExecutionTimeoutSeconds int32 `protobuf:"varint,10,opt,name=execution_timeout_seconds,json=executionTimeoutSeconds,proto3" json:"execution_timeout_seconds,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -2698,6 +2710,13 @@ func (x *AdminRuntimeConfig) GetJobTypeMaxRuntimeSeconds() int32 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *AdminRuntimeConfig) GetExecutionTimeoutSeconds() int32 {
|
||||
if x != nil {
|
||||
return x.ExecutionTimeoutSeconds
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type RunDetectionRequest struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"`
|
||||
@@ -4091,7 +4110,7 @@ const file_plugin_proto_rawDesc = "" +
|
||||
"\x06fields\x18\x01 \x03(\v2\x1c.plugin.ValueMap.FieldsEntryR\x06fields\x1aN\n" +
|
||||
"\vFieldsEntry\x12\x10\n" +
|
||||
"\x03key\x18\x01 \x01(\tR\x03key\x12)\n" +
|
||||
"\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xff\x03\n" +
|
||||
"\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xbb\x04\n" +
|
||||
"\x14AdminRuntimeDefaults\x12\x18\n" +
|
||||
"\aenabled\x18\x01 \x01(\bR\aenabled\x12<\n" +
|
||||
"\x1adetection_interval_seconds\x18\x02 \x01(\x05R\x18detectionIntervalSeconds\x12:\n" +
|
||||
@@ -4102,7 +4121,9 @@ const file_plugin_proto_rawDesc = "" +
|
||||
"\vretry_limit\x18\a \x01(\x05R\n" +
|
||||
"retryLimit\x122\n" +
|
||||
"\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\x12>\n" +
|
||||
"\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\"\xfd\x03\n" +
|
||||
"\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\x12:\n" +
|
||||
"\x19execution_timeout_seconds\x18\n" +
|
||||
" \x01(\x05R\x17executionTimeoutSeconds\"\xb9\x04\n" +
|
||||
"\x12AdminRuntimeConfig\x12\x18\n" +
|
||||
"\aenabled\x18\x01 \x01(\bR\aenabled\x12<\n" +
|
||||
"\x1adetection_interval_seconds\x18\x02 \x01(\x05R\x18detectionIntervalSeconds\x12:\n" +
|
||||
@@ -4113,7 +4134,9 @@ const file_plugin_proto_rawDesc = "" +
|
||||
"\vretry_limit\x18\a \x01(\x05R\n" +
|
||||
"retryLimit\x122\n" +
|
||||
"\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\x12>\n" +
|
||||
"\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\"\xef\x05\n" +
|
||||
"\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\x12:\n" +
|
||||
"\x19execution_timeout_seconds\x18\n" +
|
||||
" \x01(\x05R\x17executionTimeoutSeconds\"\xef\x05\n" +
|
||||
"\x13RunDetectionRequest\x12\x1d\n" +
|
||||
"\n" +
|
||||
"request_id\x18\x01 \x01(\tR\trequestId\x12\x19\n" +
|
||||
|
||||
@@ -123,6 +123,7 @@ func (h *AdminScriptHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
||||
RetryLimit: 0,
|
||||
RetryBackoffSeconds: 30,
|
||||
JobTypeMaxRuntimeSeconds: 1800,
|
||||
ExecutionTimeoutSeconds: 1800,
|
||||
},
|
||||
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{},
|
||||
}
|
||||
|
||||
@@ -167,6 +167,7 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
||||
RetryLimit: 1,
|
||||
RetryBackoffSeconds: 30,
|
||||
JobTypeMaxRuntimeSeconds: 1800,
|
||||
ExecutionTimeoutSeconds: 1800,
|
||||
},
|
||||
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}},
|
||||
@@ -456,6 +457,12 @@ func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb
|
||||
summary = fmt.Sprintf("Move EC shard of volume %d: %s → %s", result.VolumeID, sourceNode, targetNode)
|
||||
}
|
||||
|
||||
// EC shard moves only relocate one shard (1/14 of the volume). Budget
|
||||
// 5 min/GB of full volume size, which conservatively covers the shard
|
||||
// transfer plus mount/registration overhead.
|
||||
volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1
|
||||
estimatedRuntimeSeconds := volumeSizeGB * 5 * 60
|
||||
|
||||
return &plugin_pb.JobProposal{
|
||||
ProposalId: proposalID,
|
||||
DedupeKey: dedupeKey,
|
||||
@@ -479,6 +486,9 @@ func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb
|
||||
"collection": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
|
||||
},
|
||||
"estimated_runtime_seconds": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
|
||||
},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"task_type": "ec_balance",
|
||||
|
||||
@@ -166,6 +166,7 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
||||
RetryLimit: 1,
|
||||
RetryBackoffSeconds: 30,
|
||||
JobTypeMaxRuntimeSeconds: 1800,
|
||||
ExecutionTimeoutSeconds: 1800,
|
||||
},
|
||||
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"quiet_for_seconds": {
|
||||
@@ -634,6 +635,12 @@ func buildErasureCodingProposal(
|
||||
summary = fmt.Sprintf("Erasure code volume %d from %s", result.VolumeID, sourceNode)
|
||||
}
|
||||
|
||||
// EC encoding reads the full volume, computes shards, and writes 14
|
||||
// shards out to target nodes. Budget 10 min/GB (roughly 2x a plain copy)
|
||||
// so the scheduler grants a deadline scaled to volume size.
|
||||
volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1
|
||||
estimatedRuntimeSeconds := volumeSizeGB * 10 * 60
|
||||
|
||||
return &plugin_pb.JobProposal{
|
||||
ProposalId: proposalID,
|
||||
DedupeKey: dedupeKey,
|
||||
@@ -657,6 +664,9 @@ func buildErasureCodingProposal(
|
||||
"target_count": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
|
||||
},
|
||||
"estimated_runtime_seconds": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
|
||||
},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"task_type": "erasure_coding",
|
||||
|
||||
@@ -332,6 +332,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
||||
RetryLimit: 1,
|
||||
RetryBackoffSeconds: 60,
|
||||
JobTypeMaxRuntimeSeconds: 3600, // 1 hour max
|
||||
ExecutionTimeoutSeconds: 3600,
|
||||
},
|
||||
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}},
|
||||
|
||||
@@ -200,6 +200,7 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
||||
RetryLimit: 1,
|
||||
RetryBackoffSeconds: 10,
|
||||
JobTypeMaxRuntimeSeconds: 1800,
|
||||
ExecutionTimeoutSeconds: 1800,
|
||||
},
|
||||
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"garbage_threshold": {
|
||||
|
||||
@@ -246,6 +246,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
||||
RetryLimit: 1,
|
||||
RetryBackoffSeconds: 15,
|
||||
JobTypeMaxRuntimeSeconds: 1800,
|
||||
ExecutionTimeoutSeconds: 1800,
|
||||
},
|
||||
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"imbalance_threshold": {
|
||||
@@ -1160,6 +1161,12 @@ func buildVolumeBalanceProposal(
|
||||
summary = fmt.Sprintf("Move volume %d from %s to %s", result.VolumeID, sourceNode, targetNode)
|
||||
}
|
||||
|
||||
// Estimate runtime at 5 min/GB (matches vacuum) so the scheduler grants
|
||||
// a per-attempt deadline that scales with volume size instead of the
|
||||
// 2*DetectionTimeout default.
|
||||
volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1
|
||||
estimatedRuntimeSeconds := volumeSizeGB * 5 * 60
|
||||
|
||||
return &plugin_pb.JobProposal{
|
||||
ProposalId: proposalID,
|
||||
DedupeKey: dedupeKey,
|
||||
@@ -1183,6 +1190,9 @@ func buildVolumeBalanceProposal(
|
||||
"collection": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
|
||||
},
|
||||
"estimated_runtime_seconds": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
|
||||
},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"task_type": "balance",
|
||||
@@ -1311,6 +1321,36 @@ func buildBatchVolumeBalanceProposals(
|
||||
continue
|
||||
}
|
||||
|
||||
// Estimate runtime so the scheduler grants a per-attempt deadline
|
||||
// large enough for the whole batch. Mirrors vacuum's 5 min/GB budget.
|
||||
// Use max(largest single move, total / concurrency) so a skewed batch
|
||||
// with one big move isn't underestimated by the average.
|
||||
var totalSeconds, maxMoveSeconds int64
|
||||
for _, m := range moves {
|
||||
gb := int64(m.VolumeSize/1024/1024/1024) + 1
|
||||
s := gb * 5 * 60
|
||||
totalSeconds += s
|
||||
if s > maxMoveSeconds {
|
||||
maxMoveSeconds = s
|
||||
}
|
||||
}
|
||||
// Round up to whole scheduling rounds: with N moves and C slots,
|
||||
// the busiest slot processes ceil(N/C) moves, not N/C. Using
|
||||
// avg * ceil(N/C) avoids underestimating when N is not a multiple
|
||||
// of C (e.g. 6 moves at concurrency 5 → 2 rounds, not 1.2).
|
||||
avgSeconds := totalSeconds / int64(len(moves))
|
||||
numRounds := (int64(len(moves)) + int64(maxConcurrentMoves) - 1) / int64(maxConcurrentMoves)
|
||||
estimatedRuntimeSeconds := avgSeconds * numRounds
|
||||
if maxMoveSeconds > estimatedRuntimeSeconds {
|
||||
estimatedRuntimeSeconds = maxMoveSeconds
|
||||
}
|
||||
// Floor for orchestration overhead — scales per wave (one round of
|
||||
// concurrent moves) rather than per move, since setup/teardown
|
||||
// happens once per wave, not once per move.
|
||||
if minBudget := numRounds * 60; estimatedRuntimeSeconds < minBudget {
|
||||
estimatedRuntimeSeconds = minBudget
|
||||
}
|
||||
|
||||
proposalID := fmt.Sprintf("volume-balance-batch-%d-%d", batchStart, time.Now().UnixNano())
|
||||
summary := fmt.Sprintf("Batch balance %d volumes (%s)", len(moves), strings.Join(volumeIDs, ","))
|
||||
if len(summary) > maxProposalStringLength {
|
||||
@@ -1341,6 +1381,9 @@ func buildBatchVolumeBalanceProposals(
|
||||
"batch_size": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(moves))},
|
||||
},
|
||||
"estimated_runtime_seconds": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
|
||||
},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"task_type": "balance",
|
||||
|
||||
Reference in New Issue
Block a user