Files
seaweedfs/test/plugin_workers/vacuum/execution_test.go
T
Jaehoon Kim 4b23204023 fix(vacuum): writable volume re-notification after worker VACUUM (#9732)
* fix(vacuum): notify master writable after worker vacuum commit

Add Phase 3 (markWritableOne) that walks vacuumTargets and calls
VolumeMarkWritable on each replica's volume server, mirroring
batchVacuumVolumeCommit's per-replica SetVolumeAvailable. Failures are
logged at WARN; the task does not fail because the vacuum itself
already succeeded. See upstream seaweedfs#9685.

* fix(vacuum): delay Phase 3 to let post-commit heartbeats settle

Phase 3's VolumeMarkWritable can race with the volume server's first
post-commit heartbeat. SetVolumeWritable adds the vid to writables,
but a racing heartbeat whose ReadOnly value changed re-runs
EnsureCorrectWritables against the master's per-replica cache, and any
replica still cached as ReadOnly=true silently removes the vid again
— with no further heartbeat change to trigger another recovery.

Sleep 30s after Phase 2 (Commit) so every replica's post-vacuum
heartbeat has reached the master before Phase 3 fires. Cancel cleanly
on ctx.Done so a shutdown during the wait still exits.

* fix(vacuum): reduce post-commit settle from 30s to 10s

VolumePulsePeriod is 5s, so 10s (2x) is enough margin for every
replica's post-commit heartbeat to reach the master before Phase 3
fires. 30s was overly conservative and made TestVacuumExecutionIntegration
hit its 30s context deadline.

* fix(vacuum): use flat 1m timeout for VolumeMarkWritable RPC

VolumeMarkWritable on the volume server is a metadata operation
(reopen idx + flags + master ReadOnly=false heartbeat), independent
of volume size. Scaling via vacuumTimeout(time.Minute) gave it tens
of minutes — even hours on TB volumes — so a single unresponsive
replica could block Phase 3 indefinitely. Use a flat 1m cap.

* fix(vacuum): gate post-vacuum mark-writable on commit read-only state

Phase 3 force-called VolumeMarkWritable on every replica unconditionally,
clearing the read-only flag and persisting ReadOnly=false even for a
replica left read-only by an operator, an EIO quarantine, or low disk.
That overrode states the master deliberately keeps out of writables;
master built-in vacuum gates the same step on the commit's IsReadOnly via
SetVolumeAvailable.

Capture the VacuumVolumeCommit response and skip Phase 3 when any replica
came back read-only, letting it recover on its own ReadOnly=false
heartbeat. Drop the 10s post-commit settle sleep: the heartbeat race it
guarded needed a replica cached read-only at the master, which the gate
now excludes.

---------

Co-authored-by: Chris Lu <chris.lu@gmail.com>
2026-05-29 23:43:24 -07:00

109 lines
3.7 KiB
Go

package vacuum_test
import (
"context"
"fmt"
"testing"
"time"
pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func vacuumJobSpec(volumeID uint32, server string) *plugin_pb.JobSpec {
return &plugin_pb.JobSpec{
JobId: fmt.Sprintf("vacuum-job-%d", volumeID),
JobType: "vacuum",
Parameters: map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(volumeID)},
},
"server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: server},
},
"collection": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "vac-test"},
},
},
}
}
func TestVacuumExecutionIntegration(t *testing.T) {
volumeID := uint32(202)
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
handler := vacuum.NewVacuumHandler(dialOption, 1)
harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{
WorkerOptions: pluginworker.WorkerOptions{
GrpcDialOption: dialOption,
},
Handlers: []pluginworker.JobHandler{handler},
})
harness.WaitForJobType("vacuum")
source := pluginworkers.NewVolumeServer(t, "")
source.SetVacuumGarbageRatio(0.6)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := harness.Plugin().ExecuteJob(ctx, vacuumJobSpec(volumeID, source.Address()), nil, 1)
require.NoError(t, err)
require.NotNil(t, result)
require.True(t, result.Success)
checkCalls, compactCalls, commitCalls, cleanupCalls := source.VacuumStats()
require.GreaterOrEqual(t, checkCalls, 2)
require.GreaterOrEqual(t, compactCalls, 1)
require.GreaterOrEqual(t, commitCalls, 1)
// Cleanup is only invoked when Phase 1 (Compact) fails to roll back
// the .cpd/.cpx/.cpldb temp files; on the success path Commit
// consumes them (rename .cpd → .dat, .cpx → .idx, .cpldb → .ldb via
// the leveldb needle map) so no Cleanup call is needed. Matches
// topology.vacuumOneVolumeId which only calls batchVacuumVolumeCleanup
// on the Compact-failure branch.
require.Equal(t, 0, cleanupCalls)
// Phase 3 marks each replica writable so master returns it to the
// writables layout. See upstream seaweedfs#9685.
require.GreaterOrEqual(t, source.MarkWritableCount(), 1)
}
// A replica that commits still read-only (operator-set, EIO-quarantined,
// disk-space-low) must not be force-marked writable: master built-in vacuum
// skips it via SetVolumeAvailable, and it recovers on its own ReadOnly=false
// heartbeat.
func TestVacuumExecutionSkipsMarkWritableWhenReadOnly(t *testing.T) {
volumeID := uint32(203)
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
handler := vacuum.NewVacuumHandler(dialOption, 1)
harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{
WorkerOptions: pluginworker.WorkerOptions{
GrpcDialOption: dialOption,
},
Handlers: []pluginworker.JobHandler{handler},
})
harness.WaitForJobType("vacuum")
source := pluginworkers.NewVolumeServer(t, "")
source.SetVacuumGarbageRatio(0.6)
source.SetVacuumCommitReadOnly(true)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := harness.Plugin().ExecuteJob(ctx, vacuumJobSpec(volumeID, source.Address()), nil, 1)
require.NoError(t, err)
require.NotNil(t, result)
require.True(t, result.Success)
_, _, commitCalls, _ := source.VacuumStats()
require.GreaterOrEqual(t, commitCalls, 1)
require.Equal(t, 0, source.MarkWritableCount())
}