From ae08e7797935e08ee48b3bdf72f8234cd2e96cd9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 13 Apr 2026 01:15:53 -0700 Subject: [PATCH] fix(scheduler): give worker tasks a real per-attempt execution deadline (#9041) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(scheduler): give worker tasks a real per-attempt execution deadline The plugin scheduler derived the per-attempt execution deadline as DetectionTimeoutSeconds * 2, which capped every worker task at twice the cluster-scan budget regardless of actual work. For volume_balance batches this was 240s — far too short for 20 large volume copies, so every attempt died at "context deadline exceeded" and all in-flight sub-RPCs surfaced as "context canceled". Retries restarted from move 1 and hit the same wall. Add an explicit ExecutionTimeoutSeconds field to the plugin proto and make each handler declare its own baseline (1800s for vacuum, balance, EC; 3600s for iceberg). Size-aware handlers also emit an estimated_runtime_seconds parameter on each proposal so the scheduler extends the per-attempt deadline based on actual workload: - volume_balance batch: max(largest single move, total / concurrency) at 5 min/GB, so a skewed batch with one big volume isn't averaged away. - volume_balance single, vacuum (already), erasure_coding (10 min/GB), ec_balance (5 min/GB): per-volume budgets. admin_script and iceberg keep the configurable handler default since their workloads are opaque to the detector. * fix(scheduler): apply descriptor defaults to existing persisted configs The previous commit added execution_timeout_seconds to the proto and each handler's descriptor defaults, but two paths still left existing deployments broken: 1. deriveSchedulerAdminRuntime returned stored AdminRuntime configs as-is. Persisted configs from older versions have no execution_timeout_seconds, so the scheduler fell back to the 90s default — worse than the prior 240s behavior. Overlay descriptor defaults for any zero numeric fields when loading. 2. The admin form did not round-trip execution_timeout_seconds, so a normal save would clear it back to zero. Add the input field, the fillAdminSettings/collectAdminSettings hooks, and as defense in depth reapply descriptor defaults in UpdatePluginJobTypeConfigAPI before persisting so a stale form can never silently clobber a baseline. * fix(volume_balance): account for partial scheduling rounds in batch estimate With N moves and C slots, the busiest slot processes ceil(N/C) moves, not N/C. Dividing total seconds by C underestimates wall-clock time whenever N is not a multiple of C — e.g. 6 moves at concurrency 5 needs 2 rounds, not 1.2. Use avg * ceil(N/C) so partial rounds are counted as full ones. * fix(volume_balance): scale minBudget per wave instead of per move Orchestration overhead (setup/teardown for the parallel move runner) happens once per wave, not once per move. Use numRounds*60 as the floor instead of len(moves)*60 so the minimum doesn't inflate linearly with batch size when individual moves are tiny. --- weed/admin/dash/plugin_api.go | 10 +++++ weed/admin/plugin/plugin.go | 1 + weed/admin/plugin/plugin_scheduler.go | 41 ++++++++++++++++--- weed/admin/view/app/plugin.templ | 7 ++++ weed/admin/view/app/plugin_templ.go | 12 +++--- weed/pb/plugin.proto | 5 +++ weed/pb/plugin_pb/plugin.pb.go | 33 ++++++++++++--- weed/plugin/worker/admin_script_handler.go | 1 + weed/plugin/worker/ec_balance_handler.go | 10 +++++ weed/plugin/worker/erasure_coding_handler.go | 10 +++++ weed/plugin/worker/iceberg/handler.go | 1 + weed/plugin/worker/vacuum_handler.go | 1 + weed/plugin/worker/volume_balance_handler.go | 43 ++++++++++++++++++++ 13 files changed, 158 insertions(+), 17 deletions(-) diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index 816a96c27..53a30508a 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -452,6 +452,13 @@ func (s *AdminServer) UpdatePluginJobTypeConfigAPI(w http.ResponseWriter, r *htt } config.UpdatedBy = username + // Reapply descriptor defaults so a save from an older form (or a UI + // that omits new fields) cannot silently clear a baseline like + // execution_timeout_seconds back to zero. + if descriptor, err := s.LoadPluginJobTypeDescriptor(jobType); err == nil && descriptor != nil { + applyDescriptorDefaultsToPersistedConfig(config, descriptor) + } + if err := s.SavePluginJobTypeConfig(config); err != nil { writeJSONError(w, http.StatusInternalServerError, err.Error()) return @@ -916,6 +923,9 @@ func applyDescriptorDefaultsToPersistedConfig( if runtime.JobTypeMaxRuntimeSeconds <= 0 { runtime.JobTypeMaxRuntimeSeconds = defaults.JobTypeMaxRuntimeSeconds } + if runtime.ExecutionTimeoutSeconds <= 0 { + runtime.ExecutionTimeoutSeconds = defaults.ExecutionTimeoutSeconds + } if runtime.RetryBackoffSeconds <= 0 { runtime.RetryBackoffSeconds = defaults.RetryBackoffSeconds } diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index 094a15830..32a472484 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -1044,6 +1044,7 @@ func (r *Plugin) ensureJobTypeConfigFromDescriptor(jobType string, descriptor *p RetryLimit: defaults.RetryLimit, RetryBackoffSeconds: defaults.RetryBackoffSeconds, JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds, + ExecutionTimeoutSeconds: defaults.ExecutionTimeoutSeconds, } } diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 121074913..6628f444f 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -468,7 +468,7 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err policy := schedulerPolicy{ DetectionInterval: durationFromSeconds(adminRuntime.DetectionIntervalSeconds, defaultScheduledDetectionInterval), DetectionTimeout: durationFromSeconds(adminRuntime.DetectionTimeoutSeconds, defaultScheduledDetectionTimeout), - ExecutionTimeout: defaultScheduledExecutionTimeout, + ExecutionTimeout: durationFromSeconds(adminRuntime.ExecutionTimeoutSeconds, defaultScheduledExecutionTimeout), JobTypeMaxRuntime: durationFromSeconds(adminRuntime.JobTypeMaxRuntimeSeconds, defaultScheduledJobTypeMaxRuntime), RetryBackoff: durationFromSeconds(adminRuntime.RetryBackoffSeconds, defaultScheduledRetryBackoff), MaxResults: adminRuntime.MaxJobsPerDetection, @@ -503,12 +503,9 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err policy.JobTypeMaxRuntime = defaultScheduledJobTypeMaxRuntime } - // Plugin protocol currently has only detection timeout in admin settings. - execTimeout := time.Duration(adminRuntime.DetectionTimeoutSeconds*2) * time.Second - if execTimeout < defaultScheduledExecutionTimeout { - execTimeout = defaultScheduledExecutionTimeout + if policy.ExecutionTimeout < defaultScheduledExecutionTimeout { + policy.ExecutionTimeout = defaultScheduledExecutionTimeout } - policy.ExecutionTimeout = execTimeout return policy, true, nil } @@ -610,6 +607,37 @@ func deriveSchedulerAdminRuntime( ) *plugin_pb.AdminRuntimeConfig { if cfg != nil && cfg.AdminRuntime != nil { adminConfig := *cfg.AdminRuntime + // Overlay descriptor defaults for any zero numeric fields. Persisted + // configs from older versions have no execution_timeout_seconds, and + // without this overlay the scheduler would fall back to the 90s + // default instead of the handler's declared baseline. + if descriptor != nil && descriptor.AdminRuntimeDefaults != nil { + defaults := descriptor.AdminRuntimeDefaults + if adminConfig.DetectionIntervalSeconds <= 0 { + adminConfig.DetectionIntervalSeconds = defaults.DetectionIntervalSeconds + } + if adminConfig.DetectionTimeoutSeconds <= 0 { + adminConfig.DetectionTimeoutSeconds = defaults.DetectionTimeoutSeconds + } + if adminConfig.MaxJobsPerDetection <= 0 { + adminConfig.MaxJobsPerDetection = defaults.MaxJobsPerDetection + } + if adminConfig.GlobalExecutionConcurrency <= 0 { + adminConfig.GlobalExecutionConcurrency = defaults.GlobalExecutionConcurrency + } + if adminConfig.PerWorkerExecutionConcurrency <= 0 { + adminConfig.PerWorkerExecutionConcurrency = defaults.PerWorkerExecutionConcurrency + } + if adminConfig.RetryBackoffSeconds <= 0 { + adminConfig.RetryBackoffSeconds = defaults.RetryBackoffSeconds + } + if adminConfig.JobTypeMaxRuntimeSeconds <= 0 { + adminConfig.JobTypeMaxRuntimeSeconds = defaults.JobTypeMaxRuntimeSeconds + } + if adminConfig.ExecutionTimeoutSeconds <= 0 { + adminConfig.ExecutionTimeoutSeconds = defaults.ExecutionTimeoutSeconds + } + } return &adminConfig } @@ -628,6 +656,7 @@ func deriveSchedulerAdminRuntime( RetryLimit: defaults.RetryLimit, RetryBackoffSeconds: defaults.RetryBackoffSeconds, JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds, + ExecutionTimeoutSeconds: defaults.ExecutionTimeoutSeconds, } } diff --git a/weed/admin/view/app/plugin.templ b/weed/admin/view/app/plugin.templ index 25aa883f7..472747fdb 100644 --- a/weed/admin/view/app/plugin.templ +++ b/weed/admin/view/app/plugin.templ @@ -266,6 +266,11 @@ templ Plugin(page string, initialJob string, lane string) { +
+ + +
Per-attempt deadline for one job. Size-aware tasks may extend this automatically.
+
@@ -2519,6 +2524,7 @@ templ Plugin(page string, initialJob string, lane string) { document.getElementById('plugin-admin-enabled').checked = pickBool('enabled'); document.getElementById('plugin-admin-detection-interval').value = String(pickNumber('detection_interval_seconds')); document.getElementById('plugin-admin-detection-timeout').value = String(pickNumber('detection_timeout_seconds')); + document.getElementById('plugin-admin-execution-timeout').value = String(pickNumber('execution_timeout_seconds')); document.getElementById('plugin-admin-max-runtime').value = String(pickNumber('job_type_max_runtime_seconds')); document.getElementById('plugin-admin-max-results').value = String(pickNumber('max_jobs_per_detection')); document.getElementById('plugin-admin-global-exec').value = String(pickNumber('global_execution_concurrency')); @@ -2544,6 +2550,7 @@ templ Plugin(page string, initialJob string, lane string) { enabled: !!document.getElementById('plugin-admin-enabled').checked, detection_interval_seconds: getInt('plugin-admin-detection-interval'), detection_timeout_seconds: getInt('plugin-admin-detection-timeout'), + execution_timeout_seconds: getInt('plugin-admin-execution-timeout'), job_type_max_runtime_seconds: getInt('plugin-admin-max-runtime'), max_jobs_per_detection: getInt('plugin-admin-max-results'), global_execution_concurrency: getInt('plugin-admin-global-exec'), diff --git a/weed/admin/view/app/plugin_templ.go b/weed/admin/view/app/plugin_templ.go index fe214514d..1e809f89e 100644 --- a/weed/admin/view/app/plugin_templ.go +++ b/weed/admin/view/app/plugin_templ.go @@ -46,7 +46,7 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var2 string templ_7745c5c3_Var2, templ_7745c5c3_Err = templ.JoinStringErrs(currentPage) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 16, Col: 80} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 16, Col: 80} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var2)) if templ_7745c5c3_Err != nil { @@ -59,7 +59,7 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var3 string templ_7745c5c3_Var3, templ_7745c5c3_Err = templ.JoinStringErrs(initialJob) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 16, Col: 111} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 16, Col: 111} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var3)) if templ_7745c5c3_Err != nil { @@ -72,7 +72,7 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var4 string templ_7745c5c3_Var4, templ_7745c5c3_Err = templ.JoinStringErrs(currentLane) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 16, Col: 144} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 16, Col: 144} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var4)) if templ_7745c5c3_Err != nil { @@ -85,7 +85,7 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var5 string templ_7745c5c3_Var5, templ_7745c5c3_Err = templ.JoinStringErrs(laneTitle) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 21, Col: 84} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 21, Col: 84} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var5)) if templ_7745c5c3_Err != nil { @@ -98,13 +98,13 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var6 string templ_7745c5c3_Var6, templ_7745c5c3_Err = templ.JoinStringErrs(laneDescription) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 22, Col: 68} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 22, Col: 68} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var6)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 6, "

Workers

0

Active Jobs

0

Activities (recent)

0

Next Run

-

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Sequential scheduler with per-job runtime limits
Job TypeEnabledDetectorIn FlightMax RuntimeExec GlobalExec/WorkerExecutor WorkersEffective ExecLast Run
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
How often to check for new work.
Next Run
Scheduler
-
Not scheduled
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 6, "

Workers

0

Active Jobs

0

Activities (recent)

0

Next Run

-

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Sequential scheduler with per-job runtime limits
Job TypeEnabledDetectorIn FlightMax RuntimeExec GlobalExec/WorkerExecutor WorkersEffective ExecLast Run
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
How often to check for new work.
Per-attempt deadline for one job. Size-aware tasks may extend this automatically.
Next Run
Scheduler
-
Not scheduled
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } diff --git a/weed/pb/plugin.proto b/weed/pb/plugin.proto index 6a4bc7a3c..1647a6dcf 100644 --- a/weed/pb/plugin.proto +++ b/weed/pb/plugin.proto @@ -233,6 +233,10 @@ message AdminRuntimeDefaults { int32 retry_limit = 7; int32 retry_backoff_seconds = 8; int32 job_type_max_runtime_seconds = 9; + // Per-attempt execution deadline for one job. Distinct from + // detection_timeout (which bounds the detection scan) and + // job_type_max_runtime (a lane-wide budget across all attempts). + int32 execution_timeout_seconds = 10; } message AdminRuntimeConfig { @@ -245,6 +249,7 @@ message AdminRuntimeConfig { int32 retry_limit = 7; int32 retry_backoff_seconds = 8; int32 job_type_max_runtime_seconds = 9; + int32 execution_timeout_seconds = 10; } message RunDetectionRequest { diff --git a/weed/pb/plugin_pb/plugin.pb.go b/weed/pb/plugin_pb/plugin.pb.go index e89a01084..555837e71 100644 --- a/weed/pb/plugin_pb/plugin.pb.go +++ b/weed/pb/plugin_pb/plugin.pb.go @@ -2493,8 +2493,12 @@ type AdminRuntimeDefaults struct { RetryLimit int32 `protobuf:"varint,7,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"` RetryBackoffSeconds int32 `protobuf:"varint,8,opt,name=retry_backoff_seconds,json=retryBackoffSeconds,proto3" json:"retry_backoff_seconds,omitempty"` JobTypeMaxRuntimeSeconds int32 `protobuf:"varint,9,opt,name=job_type_max_runtime_seconds,json=jobTypeMaxRuntimeSeconds,proto3" json:"job_type_max_runtime_seconds,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Per-attempt execution deadline for one job. Distinct from + // detection_timeout (which bounds the detection scan) and + // job_type_max_runtime (a lane-wide budget across all attempts). + ExecutionTimeoutSeconds int32 `protobuf:"varint,10,opt,name=execution_timeout_seconds,json=executionTimeoutSeconds,proto3" json:"execution_timeout_seconds,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *AdminRuntimeDefaults) Reset() { @@ -2590,6 +2594,13 @@ func (x *AdminRuntimeDefaults) GetJobTypeMaxRuntimeSeconds() int32 { return 0 } +func (x *AdminRuntimeDefaults) GetExecutionTimeoutSeconds() int32 { + if x != nil { + return x.ExecutionTimeoutSeconds + } + return 0 +} + type AdminRuntimeConfig struct { state protoimpl.MessageState `protogen:"open.v1"` Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"` @@ -2601,6 +2612,7 @@ type AdminRuntimeConfig struct { RetryLimit int32 `protobuf:"varint,7,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"` RetryBackoffSeconds int32 `protobuf:"varint,8,opt,name=retry_backoff_seconds,json=retryBackoffSeconds,proto3" json:"retry_backoff_seconds,omitempty"` JobTypeMaxRuntimeSeconds int32 `protobuf:"varint,9,opt,name=job_type_max_runtime_seconds,json=jobTypeMaxRuntimeSeconds,proto3" json:"job_type_max_runtime_seconds,omitempty"` + ExecutionTimeoutSeconds int32 `protobuf:"varint,10,opt,name=execution_timeout_seconds,json=executionTimeoutSeconds,proto3" json:"execution_timeout_seconds,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2698,6 +2710,13 @@ func (x *AdminRuntimeConfig) GetJobTypeMaxRuntimeSeconds() int32 { return 0 } +func (x *AdminRuntimeConfig) GetExecutionTimeoutSeconds() int32 { + if x != nil { + return x.ExecutionTimeoutSeconds + } + return 0 +} + type RunDetectionRequest struct { state protoimpl.MessageState `protogen:"open.v1"` RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` @@ -4091,7 +4110,7 @@ const file_plugin_proto_rawDesc = "" + "\x06fields\x18\x01 \x03(\v2\x1c.plugin.ValueMap.FieldsEntryR\x06fields\x1aN\n" + "\vFieldsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12)\n" + - "\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xff\x03\n" + + "\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xbb\x04\n" + "\x14AdminRuntimeDefaults\x12\x18\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12<\n" + "\x1adetection_interval_seconds\x18\x02 \x01(\x05R\x18detectionIntervalSeconds\x12:\n" + @@ -4102,7 +4121,9 @@ const file_plugin_proto_rawDesc = "" + "\vretry_limit\x18\a \x01(\x05R\n" + "retryLimit\x122\n" + "\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\x12>\n" + - "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\"\xfd\x03\n" + + "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\x12:\n" + + "\x19execution_timeout_seconds\x18\n" + + " \x01(\x05R\x17executionTimeoutSeconds\"\xb9\x04\n" + "\x12AdminRuntimeConfig\x12\x18\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12<\n" + "\x1adetection_interval_seconds\x18\x02 \x01(\x05R\x18detectionIntervalSeconds\x12:\n" + @@ -4113,7 +4134,9 @@ const file_plugin_proto_rawDesc = "" + "\vretry_limit\x18\a \x01(\x05R\n" + "retryLimit\x122\n" + "\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\x12>\n" + - "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\"\xef\x05\n" + + "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\x12:\n" + + "\x19execution_timeout_seconds\x18\n" + + " \x01(\x05R\x17executionTimeoutSeconds\"\xef\x05\n" + "\x13RunDetectionRequest\x12\x1d\n" + "\n" + "request_id\x18\x01 \x01(\tR\trequestId\x12\x19\n" + diff --git a/weed/plugin/worker/admin_script_handler.go b/weed/plugin/worker/admin_script_handler.go index 9c2e4c310..26f7c4a3a 100644 --- a/weed/plugin/worker/admin_script_handler.go +++ b/weed/plugin/worker/admin_script_handler.go @@ -123,6 +123,7 @@ func (h *AdminScriptHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 0, RetryBackoffSeconds: 30, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{}, } diff --git a/weed/plugin/worker/ec_balance_handler.go b/weed/plugin/worker/ec_balance_handler.go index 99825ce1e..092bf5ac4 100644 --- a/weed/plugin/worker/ec_balance_handler.go +++ b/weed/plugin/worker/ec_balance_handler.go @@ -167,6 +167,7 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 30, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}}, @@ -456,6 +457,12 @@ func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb summary = fmt.Sprintf("Move EC shard of volume %d: %s → %s", result.VolumeID, sourceNode, targetNode) } + // EC shard moves only relocate one shard (1/14 of the volume). Budget + // 5 min/GB of full volume size, which conservatively covers the shard + // transfer plus mount/registration overhead. + volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1 + estimatedRuntimeSeconds := volumeSizeGB * 5 * 60 + return &plugin_pb.JobProposal{ ProposalId: proposalID, DedupeKey: dedupeKey, @@ -479,6 +486,9 @@ func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb "collection": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "ec_balance", diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index 14715c212..abd7fd5ca 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -166,6 +166,7 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 30, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "quiet_for_seconds": { @@ -634,6 +635,12 @@ func buildErasureCodingProposal( summary = fmt.Sprintf("Erasure code volume %d from %s", result.VolumeID, sourceNode) } + // EC encoding reads the full volume, computes shards, and writes 14 + // shards out to target nodes. Budget 10 min/GB (roughly 2x a plain copy) + // so the scheduler grants a deadline scaled to volume size. + volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1 + estimatedRuntimeSeconds := volumeSizeGB * 10 * 60 + return &plugin_pb.JobProposal{ ProposalId: proposalID, DedupeKey: dedupeKey, @@ -657,6 +664,9 @@ func buildErasureCodingProposal( "target_count": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "erasure_coding", diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go index 8dd121895..ea9bde7b2 100644 --- a/weed/plugin/worker/iceberg/handler.go +++ b/weed/plugin/worker/iceberg/handler.go @@ -332,6 +332,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 60, JobTypeMaxRuntimeSeconds: 3600, // 1 hour max + ExecutionTimeoutSeconds: 3600, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index 1853d223a..56cb22e73 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -200,6 +200,7 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 10, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "garbage_threshold": { diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index ce8da5e93..b12fb207c 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -246,6 +246,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 15, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "imbalance_threshold": { @@ -1160,6 +1161,12 @@ func buildVolumeBalanceProposal( summary = fmt.Sprintf("Move volume %d from %s to %s", result.VolumeID, sourceNode, targetNode) } + // Estimate runtime at 5 min/GB (matches vacuum) so the scheduler grants + // a per-attempt deadline that scales with volume size instead of the + // 2*DetectionTimeout default. + volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1 + estimatedRuntimeSeconds := volumeSizeGB * 5 * 60 + return &plugin_pb.JobProposal{ ProposalId: proposalID, DedupeKey: dedupeKey, @@ -1183,6 +1190,9 @@ func buildVolumeBalanceProposal( "collection": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "balance", @@ -1311,6 +1321,36 @@ func buildBatchVolumeBalanceProposals( continue } + // Estimate runtime so the scheduler grants a per-attempt deadline + // large enough for the whole batch. Mirrors vacuum's 5 min/GB budget. + // Use max(largest single move, total / concurrency) so a skewed batch + // with one big move isn't underestimated by the average. + var totalSeconds, maxMoveSeconds int64 + for _, m := range moves { + gb := int64(m.VolumeSize/1024/1024/1024) + 1 + s := gb * 5 * 60 + totalSeconds += s + if s > maxMoveSeconds { + maxMoveSeconds = s + } + } + // Round up to whole scheduling rounds: with N moves and C slots, + // the busiest slot processes ceil(N/C) moves, not N/C. Using + // avg * ceil(N/C) avoids underestimating when N is not a multiple + // of C (e.g. 6 moves at concurrency 5 → 2 rounds, not 1.2). + avgSeconds := totalSeconds / int64(len(moves)) + numRounds := (int64(len(moves)) + int64(maxConcurrentMoves) - 1) / int64(maxConcurrentMoves) + estimatedRuntimeSeconds := avgSeconds * numRounds + if maxMoveSeconds > estimatedRuntimeSeconds { + estimatedRuntimeSeconds = maxMoveSeconds + } + // Floor for orchestration overhead — scales per wave (one round of + // concurrent moves) rather than per move, since setup/teardown + // happens once per wave, not once per move. + if minBudget := numRounds * 60; estimatedRuntimeSeconds < minBudget { + estimatedRuntimeSeconds = minBudget + } + proposalID := fmt.Sprintf("volume-balance-batch-%d-%d", batchStart, time.Now().UnixNano()) summary := fmt.Sprintf("Batch balance %d volumes (%s)", len(moves), strings.Join(volumeIDs, ",")) if len(summary) > maxProposalStringLength { @@ -1341,6 +1381,9 @@ func buildBatchVolumeBalanceProposals( "batch_size": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(moves))}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "balance",