From 80951934c37416bc4f6c1472a5d3f8d204a637d9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 10 Mar 2026 12:56:57 -0700 Subject: [PATCH] add dynamic timeouts to plugin worker vacuum gRPC calls All vacuum gRPC calls used context.Background() with no deadline, so the plugin scheduler's execution timeout could kill a job while a large volume compact was still in progress. Use volume-size-scaled timeouts matching the topology vacuum approach: 3 min/GB for compact, 1 min/GB for check, commit, and cleanup. Fixes #8591 --- weed/worker/tasks/vacuum/vacuum_task.go | 36 +++++++++++++++++++------ 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/weed/worker/tasks/vacuum/vacuum_task.go b/weed/worker/tasks/vacuum/vacuum_task.go index 4b890fada..b6c032136 100644 --- a/weed/worker/tasks/vacuum/vacuum_task.go +++ b/weed/worker/tasks/vacuum/vacuum_task.go @@ -25,6 +25,7 @@ type VacuumTask struct { garbageThreshold float64 progress float64 grpcDialOption grpc.DialOption + volumeSize uint64 } // NewVacuumTask creates a new unified vacuum task instance @@ -51,6 +52,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) } t.garbageThreshold = vacuumParams.GarbageThreshold + t.volumeSize = params.VolumeSize t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, @@ -146,6 +148,14 @@ func (t *VacuumTask) GetProgress() float64 { return t.progress } +// vacuumTimeout returns a dynamic timeout scaled by volume size, matching the +// topology vacuum approach. base is the per-GB multiplier (e.g. 1 minute for +// check, 3 minutes for compact). +func (t *VacuumTask) vacuumTimeout(base time.Duration) time.Duration { + sizeGB := int64(t.volumeSize/1024/1024/1024) + 1 + return base * time.Duration(sizeGB) +} + // Helper methods for real vacuum operations // checkVacuumEligibility checks if the volume meets vacuum criteria @@ -154,7 +164,9 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + ctx, cancel := context.WithTimeout(context.Background(), t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -181,9 +193,11 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) { func (t *VacuumTask) performVacuum() error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - // Step 1: Compact the volume + // Step 1: Compact the volume (3 min per GB, matching topology vacuum) t.GetLogger().Info("Compacting volume") - stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{ + compactCtx, compactCancel := context.WithTimeout(context.Background(), t.vacuumTimeout(3*time.Minute)) + defer compactCancel() + stream, err := client.VacuumVolumeCompact(compactCtx, &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -202,18 +216,22 @@ func (t *VacuumTask) performVacuum() error { glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes) } - // Step 2: Commit the vacuum + // Step 2: Commit the vacuum (1 min per GB) t.GetLogger().Info("Committing vacuum operation") - _, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ + commitCtx, commitCancel := context.WithTimeout(context.Background(), t.vacuumTimeout(time.Minute)) + defer commitCancel() + _, err = client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: t.volumeID, }) if err != nil { return fmt.Errorf("vacuum commit failed: %v", err) } - // Step 3: Cleanup old files + // Step 3: Cleanup old files (1 min per GB) t.GetLogger().Info("Cleaning up vacuum files") - _, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), t.vacuumTimeout(time.Minute)) + defer cleanupCancel() + _, err = client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: t.volumeID, }) if err != nil { @@ -229,7 +247,9 @@ func (t *VacuumTask) performVacuum() error { func (t *VacuumTask) verifyVacuumResults() error { return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ + ctx, cancel := context.WithTimeout(context.Background(), t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: t.volumeID, }) if err != nil {