diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index 816a96c27..53a30508a 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -452,6 +452,13 @@ func (s *AdminServer) UpdatePluginJobTypeConfigAPI(w http.ResponseWriter, r *htt } config.UpdatedBy = username + // Reapply descriptor defaults so a save from an older form (or a UI + // that omits new fields) cannot silently clear a baseline like + // execution_timeout_seconds back to zero. + if descriptor, err := s.LoadPluginJobTypeDescriptor(jobType); err == nil && descriptor != nil { + applyDescriptorDefaultsToPersistedConfig(config, descriptor) + } + if err := s.SavePluginJobTypeConfig(config); err != nil { writeJSONError(w, http.StatusInternalServerError, err.Error()) return @@ -916,6 +923,9 @@ func applyDescriptorDefaultsToPersistedConfig( if runtime.JobTypeMaxRuntimeSeconds <= 0 { runtime.JobTypeMaxRuntimeSeconds = defaults.JobTypeMaxRuntimeSeconds } + if runtime.ExecutionTimeoutSeconds <= 0 { + runtime.ExecutionTimeoutSeconds = defaults.ExecutionTimeoutSeconds + } if runtime.RetryBackoffSeconds <= 0 { runtime.RetryBackoffSeconds = defaults.RetryBackoffSeconds } diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index 094a15830..32a472484 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -1044,6 +1044,7 @@ func (r *Plugin) ensureJobTypeConfigFromDescriptor(jobType string, descriptor *p RetryLimit: defaults.RetryLimit, RetryBackoffSeconds: defaults.RetryBackoffSeconds, JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds, + ExecutionTimeoutSeconds: defaults.ExecutionTimeoutSeconds, } } diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 121074913..6628f444f 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -468,7 +468,7 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err policy := schedulerPolicy{ DetectionInterval: durationFromSeconds(adminRuntime.DetectionIntervalSeconds, defaultScheduledDetectionInterval), DetectionTimeout: durationFromSeconds(adminRuntime.DetectionTimeoutSeconds, defaultScheduledDetectionTimeout), - ExecutionTimeout: defaultScheduledExecutionTimeout, + ExecutionTimeout: durationFromSeconds(adminRuntime.ExecutionTimeoutSeconds, defaultScheduledExecutionTimeout), JobTypeMaxRuntime: durationFromSeconds(adminRuntime.JobTypeMaxRuntimeSeconds, defaultScheduledJobTypeMaxRuntime), RetryBackoff: durationFromSeconds(adminRuntime.RetryBackoffSeconds, defaultScheduledRetryBackoff), MaxResults: adminRuntime.MaxJobsPerDetection, @@ -503,12 +503,9 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err policy.JobTypeMaxRuntime = defaultScheduledJobTypeMaxRuntime } - // Plugin protocol currently has only detection timeout in admin settings. - execTimeout := time.Duration(adminRuntime.DetectionTimeoutSeconds*2) * time.Second - if execTimeout < defaultScheduledExecutionTimeout { - execTimeout = defaultScheduledExecutionTimeout + if policy.ExecutionTimeout < defaultScheduledExecutionTimeout { + policy.ExecutionTimeout = defaultScheduledExecutionTimeout } - policy.ExecutionTimeout = execTimeout return policy, true, nil } @@ -610,6 +607,37 @@ func deriveSchedulerAdminRuntime( ) *plugin_pb.AdminRuntimeConfig { if cfg != nil && cfg.AdminRuntime != nil { adminConfig := *cfg.AdminRuntime + // Overlay descriptor defaults for any zero numeric fields. Persisted + // configs from older versions have no execution_timeout_seconds, and + // without this overlay the scheduler would fall back to the 90s + // default instead of the handler's declared baseline. + if descriptor != nil && descriptor.AdminRuntimeDefaults != nil { + defaults := descriptor.AdminRuntimeDefaults + if adminConfig.DetectionIntervalSeconds <= 0 { + adminConfig.DetectionIntervalSeconds = defaults.DetectionIntervalSeconds + } + if adminConfig.DetectionTimeoutSeconds <= 0 { + adminConfig.DetectionTimeoutSeconds = defaults.DetectionTimeoutSeconds + } + if adminConfig.MaxJobsPerDetection <= 0 { + adminConfig.MaxJobsPerDetection = defaults.MaxJobsPerDetection + } + if adminConfig.GlobalExecutionConcurrency <= 0 { + adminConfig.GlobalExecutionConcurrency = defaults.GlobalExecutionConcurrency + } + if adminConfig.PerWorkerExecutionConcurrency <= 0 { + adminConfig.PerWorkerExecutionConcurrency = defaults.PerWorkerExecutionConcurrency + } + if adminConfig.RetryBackoffSeconds <= 0 { + adminConfig.RetryBackoffSeconds = defaults.RetryBackoffSeconds + } + if adminConfig.JobTypeMaxRuntimeSeconds <= 0 { + adminConfig.JobTypeMaxRuntimeSeconds = defaults.JobTypeMaxRuntimeSeconds + } + if adminConfig.ExecutionTimeoutSeconds <= 0 { + adminConfig.ExecutionTimeoutSeconds = defaults.ExecutionTimeoutSeconds + } + } return &adminConfig } @@ -628,6 +656,7 @@ func deriveSchedulerAdminRuntime( RetryLimit: defaults.RetryLimit, RetryBackoffSeconds: defaults.RetryBackoffSeconds, JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds, + ExecutionTimeoutSeconds: defaults.ExecutionTimeoutSeconds, } } diff --git a/weed/admin/view/app/plugin.templ b/weed/admin/view/app/plugin.templ index 25aa883f7..472747fdb 100644 --- a/weed/admin/view/app/plugin.templ +++ b/weed/admin/view/app/plugin.templ @@ -266,6 +266,11 @@ templ Plugin(page string, initialJob string, lane string) { +
+ + +
Per-attempt deadline for one job. Size-aware tasks may extend this automatically.
+
@@ -2519,6 +2524,7 @@ templ Plugin(page string, initialJob string, lane string) { document.getElementById('plugin-admin-enabled').checked = pickBool('enabled'); document.getElementById('plugin-admin-detection-interval').value = String(pickNumber('detection_interval_seconds')); document.getElementById('plugin-admin-detection-timeout').value = String(pickNumber('detection_timeout_seconds')); + document.getElementById('plugin-admin-execution-timeout').value = String(pickNumber('execution_timeout_seconds')); document.getElementById('plugin-admin-max-runtime').value = String(pickNumber('job_type_max_runtime_seconds')); document.getElementById('plugin-admin-max-results').value = String(pickNumber('max_jobs_per_detection')); document.getElementById('plugin-admin-global-exec').value = String(pickNumber('global_execution_concurrency')); @@ -2544,6 +2550,7 @@ templ Plugin(page string, initialJob string, lane string) { enabled: !!document.getElementById('plugin-admin-enabled').checked, detection_interval_seconds: getInt('plugin-admin-detection-interval'), detection_timeout_seconds: getInt('plugin-admin-detection-timeout'), + execution_timeout_seconds: getInt('plugin-admin-execution-timeout'), job_type_max_runtime_seconds: getInt('plugin-admin-max-runtime'), max_jobs_per_detection: getInt('plugin-admin-max-results'), global_execution_concurrency: getInt('plugin-admin-global-exec'), diff --git a/weed/admin/view/app/plugin_templ.go b/weed/admin/view/app/plugin_templ.go index fe214514d..1e809f89e 100644 --- a/weed/admin/view/app/plugin_templ.go +++ b/weed/admin/view/app/plugin_templ.go @@ -46,7 +46,7 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var2 string templ_7745c5c3_Var2, templ_7745c5c3_Err = templ.JoinStringErrs(currentPage) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 16, Col: 80} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 16, Col: 80} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var2)) if templ_7745c5c3_Err != nil { @@ -59,7 +59,7 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var3 string templ_7745c5c3_Var3, templ_7745c5c3_Err = templ.JoinStringErrs(initialJob) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 16, Col: 111} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 16, Col: 111} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var3)) if templ_7745c5c3_Err != nil { @@ -72,7 +72,7 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var4 string templ_7745c5c3_Var4, templ_7745c5c3_Err = templ.JoinStringErrs(currentLane) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 16, Col: 144} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 16, Col: 144} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var4)) if templ_7745c5c3_Err != nil { @@ -85,7 +85,7 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var5 string templ_7745c5c3_Var5, templ_7745c5c3_Err = templ.JoinStringErrs(laneTitle) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 21, Col: 84} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 21, Col: 84} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var5)) if templ_7745c5c3_Err != nil { @@ -98,13 +98,13 @@ func Plugin(page string, initialJob string, lane string) templ.Component { var templ_7745c5c3_Var6 string templ_7745c5c3_Var6, templ_7745c5c3_Err = templ.JoinStringErrs(laneDescription) if templ_7745c5c3_Err != nil { - return templ.Error{Err: templ_7745c5c3_Err, FileName: `view/app/plugin.templ`, Line: 22, Col: 68} + return templ.Error{Err: templ_7745c5c3_Err, FileName: `weed/admin/view/app/plugin.templ`, Line: 22, Col: 68} } _, templ_7745c5c3_Err = templ_7745c5c3_Buffer.WriteString(templ.EscapeString(templ_7745c5c3_Var6)) if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 6, "

Workers

0

Active Jobs

0

Activities (recent)

0

Next Run

-

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Sequential scheduler with per-job runtime limits
Job TypeEnabledDetectorIn FlightMax RuntimeExec GlobalExec/WorkerExecutor WorkersEffective ExecLast Run
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
How often to check for new work.
Next Run
Scheduler
-
Not scheduled
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 6, "

Workers

0

Active Jobs

0

Activities (recent)

0

Next Run

-

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Sequential scheduler with per-job runtime limits
Job TypeEnabledDetectorIn FlightMax RuntimeExec GlobalExec/WorkerExecutor WorkersEffective ExecLast Run
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
How often to check for new work.
Per-attempt deadline for one job. Size-aware tasks may extend this automatically.
Next Run
Scheduler
-
Not scheduled
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } diff --git a/weed/pb/plugin.proto b/weed/pb/plugin.proto index 6a4bc7a3c..1647a6dcf 100644 --- a/weed/pb/plugin.proto +++ b/weed/pb/plugin.proto @@ -233,6 +233,10 @@ message AdminRuntimeDefaults { int32 retry_limit = 7; int32 retry_backoff_seconds = 8; int32 job_type_max_runtime_seconds = 9; + // Per-attempt execution deadline for one job. Distinct from + // detection_timeout (which bounds the detection scan) and + // job_type_max_runtime (a lane-wide budget across all attempts). + int32 execution_timeout_seconds = 10; } message AdminRuntimeConfig { @@ -245,6 +249,7 @@ message AdminRuntimeConfig { int32 retry_limit = 7; int32 retry_backoff_seconds = 8; int32 job_type_max_runtime_seconds = 9; + int32 execution_timeout_seconds = 10; } message RunDetectionRequest { diff --git a/weed/pb/plugin_pb/plugin.pb.go b/weed/pb/plugin_pb/plugin.pb.go index e89a01084..555837e71 100644 --- a/weed/pb/plugin_pb/plugin.pb.go +++ b/weed/pb/plugin_pb/plugin.pb.go @@ -2493,8 +2493,12 @@ type AdminRuntimeDefaults struct { RetryLimit int32 `protobuf:"varint,7,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"` RetryBackoffSeconds int32 `protobuf:"varint,8,opt,name=retry_backoff_seconds,json=retryBackoffSeconds,proto3" json:"retry_backoff_seconds,omitempty"` JobTypeMaxRuntimeSeconds int32 `protobuf:"varint,9,opt,name=job_type_max_runtime_seconds,json=jobTypeMaxRuntimeSeconds,proto3" json:"job_type_max_runtime_seconds,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Per-attempt execution deadline for one job. Distinct from + // detection_timeout (which bounds the detection scan) and + // job_type_max_runtime (a lane-wide budget across all attempts). + ExecutionTimeoutSeconds int32 `protobuf:"varint,10,opt,name=execution_timeout_seconds,json=executionTimeoutSeconds,proto3" json:"execution_timeout_seconds,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *AdminRuntimeDefaults) Reset() { @@ -2590,6 +2594,13 @@ func (x *AdminRuntimeDefaults) GetJobTypeMaxRuntimeSeconds() int32 { return 0 } +func (x *AdminRuntimeDefaults) GetExecutionTimeoutSeconds() int32 { + if x != nil { + return x.ExecutionTimeoutSeconds + } + return 0 +} + type AdminRuntimeConfig struct { state protoimpl.MessageState `protogen:"open.v1"` Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"` @@ -2601,6 +2612,7 @@ type AdminRuntimeConfig struct { RetryLimit int32 `protobuf:"varint,7,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"` RetryBackoffSeconds int32 `protobuf:"varint,8,opt,name=retry_backoff_seconds,json=retryBackoffSeconds,proto3" json:"retry_backoff_seconds,omitempty"` JobTypeMaxRuntimeSeconds int32 `protobuf:"varint,9,opt,name=job_type_max_runtime_seconds,json=jobTypeMaxRuntimeSeconds,proto3" json:"job_type_max_runtime_seconds,omitempty"` + ExecutionTimeoutSeconds int32 `protobuf:"varint,10,opt,name=execution_timeout_seconds,json=executionTimeoutSeconds,proto3" json:"execution_timeout_seconds,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2698,6 +2710,13 @@ func (x *AdminRuntimeConfig) GetJobTypeMaxRuntimeSeconds() int32 { return 0 } +func (x *AdminRuntimeConfig) GetExecutionTimeoutSeconds() int32 { + if x != nil { + return x.ExecutionTimeoutSeconds + } + return 0 +} + type RunDetectionRequest struct { state protoimpl.MessageState `protogen:"open.v1"` RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` @@ -4091,7 +4110,7 @@ const file_plugin_proto_rawDesc = "" + "\x06fields\x18\x01 \x03(\v2\x1c.plugin.ValueMap.FieldsEntryR\x06fields\x1aN\n" + "\vFieldsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12)\n" + - "\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xff\x03\n" + + "\x05value\x18\x02 \x01(\v2\x13.plugin.ConfigValueR\x05value:\x028\x01\"\xbb\x04\n" + "\x14AdminRuntimeDefaults\x12\x18\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12<\n" + "\x1adetection_interval_seconds\x18\x02 \x01(\x05R\x18detectionIntervalSeconds\x12:\n" + @@ -4102,7 +4121,9 @@ const file_plugin_proto_rawDesc = "" + "\vretry_limit\x18\a \x01(\x05R\n" + "retryLimit\x122\n" + "\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\x12>\n" + - "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\"\xfd\x03\n" + + "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\x12:\n" + + "\x19execution_timeout_seconds\x18\n" + + " \x01(\x05R\x17executionTimeoutSeconds\"\xb9\x04\n" + "\x12AdminRuntimeConfig\x12\x18\n" + "\aenabled\x18\x01 \x01(\bR\aenabled\x12<\n" + "\x1adetection_interval_seconds\x18\x02 \x01(\x05R\x18detectionIntervalSeconds\x12:\n" + @@ -4113,7 +4134,9 @@ const file_plugin_proto_rawDesc = "" + "\vretry_limit\x18\a \x01(\x05R\n" + "retryLimit\x122\n" + "\x15retry_backoff_seconds\x18\b \x01(\x05R\x13retryBackoffSeconds\x12>\n" + - "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\"\xef\x05\n" + + "\x1cjob_type_max_runtime_seconds\x18\t \x01(\x05R\x18jobTypeMaxRuntimeSeconds\x12:\n" + + "\x19execution_timeout_seconds\x18\n" + + " \x01(\x05R\x17executionTimeoutSeconds\"\xef\x05\n" + "\x13RunDetectionRequest\x12\x1d\n" + "\n" + "request_id\x18\x01 \x01(\tR\trequestId\x12\x19\n" + diff --git a/weed/plugin/worker/admin_script_handler.go b/weed/plugin/worker/admin_script_handler.go index 9c2e4c310..26f7c4a3a 100644 --- a/weed/plugin/worker/admin_script_handler.go +++ b/weed/plugin/worker/admin_script_handler.go @@ -123,6 +123,7 @@ func (h *AdminScriptHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 0, RetryBackoffSeconds: 30, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{}, } diff --git a/weed/plugin/worker/ec_balance_handler.go b/weed/plugin/worker/ec_balance_handler.go index 99825ce1e..092bf5ac4 100644 --- a/weed/plugin/worker/ec_balance_handler.go +++ b/weed/plugin/worker/ec_balance_handler.go @@ -167,6 +167,7 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 30, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}}, @@ -456,6 +457,12 @@ func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb summary = fmt.Sprintf("Move EC shard of volume %d: %s → %s", result.VolumeID, sourceNode, targetNode) } + // EC shard moves only relocate one shard (1/14 of the volume). Budget + // 5 min/GB of full volume size, which conservatively covers the shard + // transfer plus mount/registration overhead. + volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1 + estimatedRuntimeSeconds := volumeSizeGB * 5 * 60 + return &plugin_pb.JobProposal{ ProposalId: proposalID, DedupeKey: dedupeKey, @@ -479,6 +486,9 @@ func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb "collection": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "ec_balance", diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index 14715c212..abd7fd5ca 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -166,6 +166,7 @@ func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 30, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "quiet_for_seconds": { @@ -634,6 +635,12 @@ func buildErasureCodingProposal( summary = fmt.Sprintf("Erasure code volume %d from %s", result.VolumeID, sourceNode) } + // EC encoding reads the full volume, computes shards, and writes 14 + // shards out to target nodes. Budget 10 min/GB (roughly 2x a plain copy) + // so the scheduler grants a deadline scaled to volume size. + volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1 + estimatedRuntimeSeconds := volumeSizeGB * 10 * 60 + return &plugin_pb.JobProposal{ ProposalId: proposalID, DedupeKey: dedupeKey, @@ -657,6 +664,9 @@ func buildErasureCodingProposal( "target_count": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "erasure_coding", diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go index 8dd121895..ea9bde7b2 100644 --- a/weed/plugin/worker/iceberg/handler.go +++ b/weed/plugin/worker/iceberg/handler.go @@ -332,6 +332,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 60, JobTypeMaxRuntimeSeconds: 3600, // 1 hour max + ExecutionTimeoutSeconds: 3600, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}}, diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index 1853d223a..56cb22e73 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -200,6 +200,7 @@ func (h *VacuumHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 10, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "garbage_threshold": { diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index ce8da5e93..b12fb207c 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -246,6 +246,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { RetryLimit: 1, RetryBackoffSeconds: 15, JobTypeMaxRuntimeSeconds: 1800, + ExecutionTimeoutSeconds: 1800, }, WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{ "imbalance_threshold": { @@ -1160,6 +1161,12 @@ func buildVolumeBalanceProposal( summary = fmt.Sprintf("Move volume %d from %s to %s", result.VolumeID, sourceNode, targetNode) } + // Estimate runtime at 5 min/GB (matches vacuum) so the scheduler grants + // a per-attempt deadline that scales with volume size instead of the + // 2*DetectionTimeout default. + volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1 + estimatedRuntimeSeconds := volumeSizeGB * 5 * 60 + return &plugin_pb.JobProposal{ ProposalId: proposalID, DedupeKey: dedupeKey, @@ -1183,6 +1190,9 @@ func buildVolumeBalanceProposal( "collection": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "balance", @@ -1311,6 +1321,36 @@ func buildBatchVolumeBalanceProposals( continue } + // Estimate runtime so the scheduler grants a per-attempt deadline + // large enough for the whole batch. Mirrors vacuum's 5 min/GB budget. + // Use max(largest single move, total / concurrency) so a skewed batch + // with one big move isn't underestimated by the average. + var totalSeconds, maxMoveSeconds int64 + for _, m := range moves { + gb := int64(m.VolumeSize/1024/1024/1024) + 1 + s := gb * 5 * 60 + totalSeconds += s + if s > maxMoveSeconds { + maxMoveSeconds = s + } + } + // Round up to whole scheduling rounds: with N moves and C slots, + // the busiest slot processes ceil(N/C) moves, not N/C. Using + // avg * ceil(N/C) avoids underestimating when N is not a multiple + // of C (e.g. 6 moves at concurrency 5 → 2 rounds, not 1.2). + avgSeconds := totalSeconds / int64(len(moves)) + numRounds := (int64(len(moves)) + int64(maxConcurrentMoves) - 1) / int64(maxConcurrentMoves) + estimatedRuntimeSeconds := avgSeconds * numRounds + if maxMoveSeconds > estimatedRuntimeSeconds { + estimatedRuntimeSeconds = maxMoveSeconds + } + // Floor for orchestration overhead — scales per wave (one round of + // concurrent moves) rather than per move, since setup/teardown + // happens once per wave, not once per move. + if minBudget := numRounds * 60; estimatedRuntimeSeconds < minBudget { + estimatedRuntimeSeconds = minBudget + } + proposalID := fmt.Sprintf("volume-balance-batch-%d-%d", batchStart, time.Now().UnixNano()) summary := fmt.Sprintf("Batch balance %d volumes (%s)", len(moves), strings.Join(volumeIDs, ",")) if len(summary) > maxProposalStringLength { @@ -1341,6 +1381,9 @@ func buildBatchVolumeBalanceProposals( "batch_size": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(moves))}, }, + "estimated_runtime_seconds": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds}, + }, }, Labels: map[string]string{ "task_type": "balance",