diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index 83c04f0f4..5db4ddf11 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -186,8 +186,8 @@ func (r *Plugin) runLaneSchedulerIterationLocked(ls *schedulerLaneState, jobType } } - r.pruneSchedulerState(active) - r.pruneDetectorLeases(active) + r.pruneSchedulerState(ls.lane, active) + r.pruneDetectorLeases(ls.lane, active) r.setLaneLoopState(ls, "", "idle") return hadJobs } @@ -213,8 +213,8 @@ func (r *Plugin) runLaneSchedulerIterationConcurrent(ls *schedulerLaneState, job } wg.Wait() - r.pruneSchedulerState(active) - r.pruneDetectorLeases(active) + r.pruneSchedulerState(ls.lane, active) + r.pruneDetectorLeases(ls.lane, active) r.setLaneLoopState(ls, "", "idle") return hadJobs.Load() } @@ -746,11 +746,16 @@ func (r *Plugin) finishDetection(jobType string) { r.schedulerMu.Unlock() } -func (r *Plugin) pruneSchedulerState(activeJobTypes map[string]struct{}) { +// pruneSchedulerState removes next-detection state for inactive job types +// in lane only. Prune must not touch other lanes' entries in the global map. +func (r *Plugin) pruneSchedulerState(lane SchedulerLane, activeJobTypes map[string]struct{}) { r.schedulerMu.Lock() defer r.schedulerMu.Unlock() for jobType := range r.nextDetectionAt { + if JobTypeLane(jobType) != lane { + continue + } if _, ok := activeJobTypes[jobType]; !ok { delete(r.nextDetectionAt, jobType) delete(r.detectionInFlight, jobType) @@ -766,11 +771,15 @@ func (r *Plugin) clearSchedulerJobType(jobType string) { r.clearDetectorLease(jobType, "") } -func (r *Plugin) pruneDetectorLeases(activeJobTypes map[string]struct{}) { +// pruneDetectorLeases removes detector leases for inactive job types in lane only. +func (r *Plugin) pruneDetectorLeases(lane SchedulerLane, activeJobTypes map[string]struct{}) { r.detectorLeaseMu.Lock() defer r.detectorLeaseMu.Unlock() for jobType := range r.detectorLeases { + if JobTypeLane(jobType) != lane { + continue + } if _, ok := activeJobTypes[jobType]; !ok { delete(r.detectorLeases, jobType) } diff --git a/weed/admin/plugin/plugin_scheduler_test.go b/weed/admin/plugin/plugin_scheduler_test.go index cf876f3b4..99b374b77 100644 --- a/weed/admin/plugin/plugin_scheduler_test.go +++ b/weed/admin/plugin/plugin_scheduler_test.go @@ -673,3 +673,118 @@ func TestRunLaneSchedulerIterationLockBehavior(t *testing.T) { }) } } + +// ---------- lane-scoped prune ---------- + +func TestPruneSchedulerState_DefaultLaneKeepsForeignLanesAndPrunesOwnStale(t *testing.T) { + p, err := New(Options{}) + if err != nil { + t.Fatalf("New: %v", err) + } + defer p.Shutdown() + + now := time.Now().UTC() + p.schedulerMu.Lock() + p.nextDetectionAt["s3_lifecycle"] = now.Add(24 * time.Hour) // lifecycle lane + p.nextDetectionAt["vacuum"] = now.Add(time.Minute) // default lane, active + p.nextDetectionAt["ec_balance"] = now.Add(time.Minute) // default lane, stale + p.detectionInFlight["ec_balance"] = true + p.schedulerMu.Unlock() + + // Default-lane iteration prunes with only its own active job types. + p.pruneSchedulerState(LaneDefault, map[string]struct{}{"vacuum": {}}) + + p.schedulerMu.Lock() + defer p.schedulerMu.Unlock() + if _, ok := p.nextDetectionAt["s3_lifecycle"]; !ok { + t.Fatal("default-lane prune must not delete lifecycle-lane nextDetectionAt[s3_lifecycle]") + } + if _, ok := p.nextDetectionAt["vacuum"]; !ok { + t.Fatal("active default-lane job (vacuum) must be kept") + } + if _, ok := p.nextDetectionAt["ec_balance"]; ok { + t.Fatal("stale default-lane job (ec_balance) must still be pruned within its own lane") + } + if _, ok := p.detectionInFlight["ec_balance"]; ok { + t.Fatal("pruned job must also drop its detectionInFlight entry") + } +} + +func TestPruneSchedulerState_LifecycleLaneLeavesDefaultLane(t *testing.T) { + p, err := New(Options{}) + if err != nil { + t.Fatalf("New: %v", err) + } + defer p.Shutdown() + + now := time.Now().UTC() + p.schedulerMu.Lock() + p.nextDetectionAt["vacuum"] = now.Add(time.Minute) // default lane + p.nextDetectionAt["s3_lifecycle"] = now.Add(24 * time.Hour) // lifecycle lane, active + p.schedulerMu.Unlock() + + p.pruneSchedulerState(LaneLifecycle, map[string]struct{}{"s3_lifecycle": {}}) + + p.schedulerMu.Lock() + defer p.schedulerMu.Unlock() + if _, ok := p.nextDetectionAt["vacuum"]; !ok { + t.Fatal("lifecycle-lane prune must not delete default-lane nextDetectionAt[vacuum]") + } + if _, ok := p.nextDetectionAt["s3_lifecycle"]; !ok { + t.Fatal("active lifecycle job (s3_lifecycle) must be kept") + } +} + +func TestPruneDetectorLeases_IsLaneScoped(t *testing.T) { + p, err := New(Options{}) + if err != nil { + t.Fatalf("New: %v", err) + } + defer p.Shutdown() + + p.detectorLeaseMu.Lock() + p.detectorLeases["s3_lifecycle"] = "worker-a" // lifecycle lane + p.detectorLeases["vacuum"] = "worker-b" // default lane, active + p.detectorLeases["ec_balance"] = "worker-c" // default lane, stale + p.detectorLeaseMu.Unlock() + + p.pruneDetectorLeases(LaneDefault, map[string]struct{}{"vacuum": {}}) + + p.detectorLeaseMu.Lock() + defer p.detectorLeaseMu.Unlock() + if _, ok := p.detectorLeases["s3_lifecycle"]; !ok { + t.Fatal("default-lane prune must not delete lifecycle-lane detector lease") + } + if _, ok := p.detectorLeases["vacuum"]; !ok { + t.Fatal("active default-lane detector lease (vacuum) must be kept") + } + if _, ok := p.detectorLeases["ec_balance"]; ok { + t.Fatal("stale default-lane detector lease (ec_balance) must still be pruned within its own lane") + } +} + +func TestLaneStatus_LifecycleNextDetectionSurvivesDefaultLanePrune(t *testing.T) { + p, err := New(Options{}) + if err != nil { + t.Fatalf("New: %v", err) + } + defer p.Shutdown() + + now := time.Now().UTC() + expected := now.Add(24 * time.Hour) + p.schedulerMu.Lock() + p.nextDetectionAt["s3_lifecycle"] = expected + p.nextDetectionAt["vacuum"] = now.Add(time.Minute) + p.schedulerMu.Unlock() + + p.pruneSchedulerState(LaneDefault, map[string]struct{}{"vacuum": {}}) + + status := p.GetLaneSchedulerStatus(LaneLifecycle) + if status.NextDetectionAt == nil { + t.Fatal("lifecycle lane status lost next_detection_at after a default-lane prune") + } + if !status.NextDetectionAt.Equal(expected) { + t.Fatalf("next_detection_at = %v, want %v (must be the 24h schedule, not the idle-sleep fallback)", + status.NextDetectionAt, expected) + } +}