mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
add dynamic timeouts to plugin worker vacuum gRPC calls
All vacuum gRPC calls used context.Background() with no deadline, so the plugin scheduler's execution timeout could kill a job while a large volume compact was still in progress. Use volume-size-scaled timeouts matching the topology vacuum approach: 3 min/GB for compact, 1 min/GB for check, commit, and cleanup. Fixes #8591
This commit is contained in:
@@ -25,6 +25,7 @@ type VacuumTask struct {
|
||||
garbageThreshold float64
|
||||
progress float64
|
||||
grpcDialOption grpc.DialOption
|
||||
volumeSize uint64
|
||||
}
|
||||
|
||||
// NewVacuumTask creates a new unified vacuum task instance
|
||||
@@ -51,6 +52,7 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams)
|
||||
}
|
||||
|
||||
t.garbageThreshold = vacuumParams.GarbageThreshold
|
||||
t.volumeSize = params.VolumeSize
|
||||
|
||||
t.GetLogger().WithFields(map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
@@ -146,6 +148,14 @@ func (t *VacuumTask) GetProgress() float64 {
|
||||
return t.progress
|
||||
}
|
||||
|
||||
// vacuumTimeout returns a dynamic timeout scaled by volume size, matching the
|
||||
// topology vacuum approach. base is the per-GB multiplier (e.g. 1 minute for
|
||||
// check, 3 minutes for compact).
|
||||
func (t *VacuumTask) vacuumTimeout(base time.Duration) time.Duration {
|
||||
sizeGB := int64(t.volumeSize/1024/1024/1024) + 1
|
||||
return base * time.Duration(sizeGB)
|
||||
}
|
||||
|
||||
// Helper methods for real vacuum operations
|
||||
|
||||
// checkVacuumEligibility checks if the volume meets vacuum criteria
|
||||
@@ -154,7 +164,9 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
|
||||
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
|
||||
ctx, cancel := context.WithTimeout(context.Background(), t.vacuumTimeout(time.Minute))
|
||||
defer cancel()
|
||||
resp, err := client.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -181,9 +193,11 @@ func (t *VacuumTask) checkVacuumEligibility() (bool, float64, error) {
|
||||
func (t *VacuumTask) performVacuum() error {
|
||||
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
// Step 1: Compact the volume
|
||||
// Step 1: Compact the volume (3 min per GB, matching topology vacuum)
|
||||
t.GetLogger().Info("Compacting volume")
|
||||
stream, err := client.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
|
||||
compactCtx, compactCancel := context.WithTimeout(context.Background(), t.vacuumTimeout(3*time.Minute))
|
||||
defer compactCancel()
|
||||
stream, err := client.VacuumVolumeCompact(compactCtx, &volume_server_pb.VacuumVolumeCompactRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -202,18 +216,22 @@ func (t *VacuumTask) performVacuum() error {
|
||||
glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes)
|
||||
}
|
||||
|
||||
// Step 2: Commit the vacuum
|
||||
// Step 2: Commit the vacuum (1 min per GB)
|
||||
t.GetLogger().Info("Committing vacuum operation")
|
||||
_, err = client.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
|
||||
commitCtx, commitCancel := context.WithTimeout(context.Background(), t.vacuumTimeout(time.Minute))
|
||||
defer commitCancel()
|
||||
_, err = client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("vacuum commit failed: %v", err)
|
||||
}
|
||||
|
||||
// Step 3: Cleanup old files
|
||||
// Step 3: Cleanup old files (1 min per GB)
|
||||
t.GetLogger().Info("Cleaning up vacuum files")
|
||||
_, err = client.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
|
||||
cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), t.vacuumTimeout(time.Minute))
|
||||
defer cleanupCancel()
|
||||
_, err = client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -229,7 +247,9 @@ func (t *VacuumTask) performVacuum() error {
|
||||
func (t *VacuumTask) verifyVacuumResults() error {
|
||||
return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
resp, err := client.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
|
||||
ctx, cancel := context.WithTimeout(context.Background(), t.vacuumTimeout(time.Minute))
|
||||
defer cancel()
|
||||
resp, err := client.VacuumVolumeCheck(ctx, &volume_server_pb.VacuumVolumeCheckRequest{
|
||||
VolumeId: t.volumeID,
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user