fix(admin): make scheduler pruning lane-aware (#9790)

This commit is contained in:
ahalaun
2026-06-02 20:17:52 +02:00
committed by GitHub
parent e3e02d3364
commit bcd2c958e1
2 changed files with 130 additions and 6 deletions
+15 -6
View File
@@ -186,8 +186,8 @@ func (r *Plugin) runLaneSchedulerIterationLocked(ls *schedulerLaneState, jobType
}
}
r.pruneSchedulerState(active)
r.pruneDetectorLeases(active)
r.pruneSchedulerState(ls.lane, active)
r.pruneDetectorLeases(ls.lane, active)
r.setLaneLoopState(ls, "", "idle")
return hadJobs
}
@@ -213,8 +213,8 @@ func (r *Plugin) runLaneSchedulerIterationConcurrent(ls *schedulerLaneState, job
}
wg.Wait()
r.pruneSchedulerState(active)
r.pruneDetectorLeases(active)
r.pruneSchedulerState(ls.lane, active)
r.pruneDetectorLeases(ls.lane, active)
r.setLaneLoopState(ls, "", "idle")
return hadJobs.Load()
}
@@ -746,11 +746,16 @@ func (r *Plugin) finishDetection(jobType string) {
r.schedulerMu.Unlock()
}
func (r *Plugin) pruneSchedulerState(activeJobTypes map[string]struct{}) {
// pruneSchedulerState removes next-detection state for inactive job types
// in lane only. Prune must not touch other lanes' entries in the global map.
func (r *Plugin) pruneSchedulerState(lane SchedulerLane, activeJobTypes map[string]struct{}) {
r.schedulerMu.Lock()
defer r.schedulerMu.Unlock()
for jobType := range r.nextDetectionAt {
if JobTypeLane(jobType) != lane {
continue
}
if _, ok := activeJobTypes[jobType]; !ok {
delete(r.nextDetectionAt, jobType)
delete(r.detectionInFlight, jobType)
@@ -766,11 +771,15 @@ func (r *Plugin) clearSchedulerJobType(jobType string) {
r.clearDetectorLease(jobType, "")
}
func (r *Plugin) pruneDetectorLeases(activeJobTypes map[string]struct{}) {
// pruneDetectorLeases removes detector leases for inactive job types in lane only.
func (r *Plugin) pruneDetectorLeases(lane SchedulerLane, activeJobTypes map[string]struct{}) {
r.detectorLeaseMu.Lock()
defer r.detectorLeaseMu.Unlock()
for jobType := range r.detectorLeases {
if JobTypeLane(jobType) != lane {
continue
}
if _, ok := activeJobTypes[jobType]; !ok {
delete(r.detectorLeases, jobType)
}
+115
View File
@@ -673,3 +673,118 @@ func TestRunLaneSchedulerIterationLockBehavior(t *testing.T) {
})
}
}
// ---------- lane-scoped prune ----------
func TestPruneSchedulerState_DefaultLaneKeepsForeignLanesAndPrunesOwnStale(t *testing.T) {
p, err := New(Options{})
if err != nil {
t.Fatalf("New: %v", err)
}
defer p.Shutdown()
now := time.Now().UTC()
p.schedulerMu.Lock()
p.nextDetectionAt["s3_lifecycle"] = now.Add(24 * time.Hour) // lifecycle lane
p.nextDetectionAt["vacuum"] = now.Add(time.Minute) // default lane, active
p.nextDetectionAt["ec_balance"] = now.Add(time.Minute) // default lane, stale
p.detectionInFlight["ec_balance"] = true
p.schedulerMu.Unlock()
// Default-lane iteration prunes with only its own active job types.
p.pruneSchedulerState(LaneDefault, map[string]struct{}{"vacuum": {}})
p.schedulerMu.Lock()
defer p.schedulerMu.Unlock()
if _, ok := p.nextDetectionAt["s3_lifecycle"]; !ok {
t.Fatal("default-lane prune must not delete lifecycle-lane nextDetectionAt[s3_lifecycle]")
}
if _, ok := p.nextDetectionAt["vacuum"]; !ok {
t.Fatal("active default-lane job (vacuum) must be kept")
}
if _, ok := p.nextDetectionAt["ec_balance"]; ok {
t.Fatal("stale default-lane job (ec_balance) must still be pruned within its own lane")
}
if _, ok := p.detectionInFlight["ec_balance"]; ok {
t.Fatal("pruned job must also drop its detectionInFlight entry")
}
}
func TestPruneSchedulerState_LifecycleLaneLeavesDefaultLane(t *testing.T) {
p, err := New(Options{})
if err != nil {
t.Fatalf("New: %v", err)
}
defer p.Shutdown()
now := time.Now().UTC()
p.schedulerMu.Lock()
p.nextDetectionAt["vacuum"] = now.Add(time.Minute) // default lane
p.nextDetectionAt["s3_lifecycle"] = now.Add(24 * time.Hour) // lifecycle lane, active
p.schedulerMu.Unlock()
p.pruneSchedulerState(LaneLifecycle, map[string]struct{}{"s3_lifecycle": {}})
p.schedulerMu.Lock()
defer p.schedulerMu.Unlock()
if _, ok := p.nextDetectionAt["vacuum"]; !ok {
t.Fatal("lifecycle-lane prune must not delete default-lane nextDetectionAt[vacuum]")
}
if _, ok := p.nextDetectionAt["s3_lifecycle"]; !ok {
t.Fatal("active lifecycle job (s3_lifecycle) must be kept")
}
}
func TestPruneDetectorLeases_IsLaneScoped(t *testing.T) {
p, err := New(Options{})
if err != nil {
t.Fatalf("New: %v", err)
}
defer p.Shutdown()
p.detectorLeaseMu.Lock()
p.detectorLeases["s3_lifecycle"] = "worker-a" // lifecycle lane
p.detectorLeases["vacuum"] = "worker-b" // default lane, active
p.detectorLeases["ec_balance"] = "worker-c" // default lane, stale
p.detectorLeaseMu.Unlock()
p.pruneDetectorLeases(LaneDefault, map[string]struct{}{"vacuum": {}})
p.detectorLeaseMu.Lock()
defer p.detectorLeaseMu.Unlock()
if _, ok := p.detectorLeases["s3_lifecycle"]; !ok {
t.Fatal("default-lane prune must not delete lifecycle-lane detector lease")
}
if _, ok := p.detectorLeases["vacuum"]; !ok {
t.Fatal("active default-lane detector lease (vacuum) must be kept")
}
if _, ok := p.detectorLeases["ec_balance"]; ok {
t.Fatal("stale default-lane detector lease (ec_balance) must still be pruned within its own lane")
}
}
func TestLaneStatus_LifecycleNextDetectionSurvivesDefaultLanePrune(t *testing.T) {
p, err := New(Options{})
if err != nil {
t.Fatalf("New: %v", err)
}
defer p.Shutdown()
now := time.Now().UTC()
expected := now.Add(24 * time.Hour)
p.schedulerMu.Lock()
p.nextDetectionAt["s3_lifecycle"] = expected
p.nextDetectionAt["vacuum"] = now.Add(time.Minute)
p.schedulerMu.Unlock()
p.pruneSchedulerState(LaneDefault, map[string]struct{}{"vacuum": {}})
status := p.GetLaneSchedulerStatus(LaneLifecycle)
if status.NextDetectionAt == nil {
t.Fatal("lifecycle lane status lost next_detection_at after a default-lane prune")
}
if !status.NextDetectionAt.Equal(expected) {
t.Fatalf("next_detection_at = %v, want %v (must be the 24h schedule, not the idle-sleep fallback)",
status.NextDetectionAt, expected)
}
}