From 8a4fdf06c048a641cfb1a7b6d3d5ef2fcbfe3532 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 7 Jun 2026 22:45:38 -0700 Subject: [PATCH] admin/maintenance: reload in-flight tasks on startup instead of discarding them (#9857) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * admin/maintenance: reload in-flight tasks on startup instead of discarding LoadTasksFromPersistence deleted all persisted task files on startup and relied on the scanner to re-detect, so saved task state was never consumed — the persistence was effectively write-only. Reload non-terminal tasks (pending/assigned/in_progress) into the queue, resetting in-flight ones to pending since their worker is gone after a restart (maintenance tasks are idempotent). Terminal task files are dropped; the scanner still backfills anything not persisted. * address review: nil-guard reloaded tasks and SyncTask to ActiveTopology - skip nil entries from LoadAllTaskStates (corrupted state) - re-sync restored tasks with MaintenanceIntegration so ActiveTopology (in-memory, empty on startup) knows about them; otherwise GetNextTask's AssignTask rejects them as unknown and they never get assigned --- weed/admin/maintenance/maintenance_queue.go | 64 +++++++++++++++++-- .../maintenance/maintenance_queue_test.go | 37 ++++++----- 2 files changed, 78 insertions(+), 23 deletions(-) diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index 1cec742b6..8af9c8f90 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -34,16 +34,66 @@ func (mq *MaintenanceQueue) SetPersistence(persistence TaskPersistence) { glog.V(1).Infof("Maintenance queue configured with task persistence") } -// LoadTasksFromPersistence is called on startup. Previous task states are NOT loaded -// into memory — the maintenance scanner will re-detect current needs from the live -// cluster state. Stale task files from previous runs are deleted from disk. +// LoadTasksFromPersistence is called on startup to restore in-flight tasks +// across an admin restart. Non-terminal tasks (pending/assigned/in_progress) +// are re-queued as pending — the worker that held an in-flight task is gone +// after a restart, and maintenance tasks are idempotent, so re-running a +// partially-done one is safe. Terminal task files are deleted. Anything missed +// is still re-detected by the scanner from live cluster state. func (mq *MaintenanceQueue) LoadTasksFromPersistence() error { - if mq.persistence != nil { - if err := mq.persistence.DeleteAllTaskStates(); err != nil { - glog.Warningf("Failed to clean up old task files: %v", err) + if mq.persistence == nil { + glog.Infof("Task queue initialized (no persistence configured)") + return nil + } + tasks, err := mq.persistence.LoadAllTaskStates() + if err != nil { + glog.Warningf("Failed to load persisted task states: %v; starting with an empty queue", err) + return nil + } + + var terminal []string + var restored []*MaintenanceTask + mq.mutex.Lock() + requeued := 0 + for _, task := range tasks { + if task == nil { + continue + } + switch task.Status { + case TaskStatusPending, TaskStatusAssigned, TaskStatusInProgress: + task.Status = TaskStatusPending + task.WorkerID = "" + task.Progress = 0 + mq.tasks[task.ID] = task + mq.pendingTasks = append(mq.pendingTasks, task) + restored = append(restored, snapshotTask(task)) + requeued++ + default: + terminal = append(terminal, task.ID) } } - glog.Infof("Task queue initialized (previous tasks will be re-detected by scanner)") + sort.Slice(mq.pendingTasks, func(i, j int) bool { + if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority { + return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority + } + return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt) + }) + mq.mutex.Unlock() + + // ActiveTopology is in-memory and starts empty, so restored tasks must be + // re-synced or GetNextTask's AssignTask would reject them as unknown and + // they'd sit pending forever. Done outside the lock, like the retry path. + if mq.integration != nil { + for _, task := range restored { + mq.integration.SyncTask(task) + } + } + + // Delete stale terminal files outside the lock to avoid blocking on disk I/O. + for _, id := range terminal { + mq.deleteTaskState(id) + } + glog.Infof("Task queue initialized: re-queued %d in-flight task(s) from persistence; scanner will re-detect the rest", requeued) return nil } diff --git a/weed/admin/maintenance/maintenance_queue_test.go b/weed/admin/maintenance/maintenance_queue_test.go index bd8fe6839..7a1328919 100644 --- a/weed/admin/maintenance/maintenance_queue_test.go +++ b/weed/admin/maintenance/maintenance_queue_test.go @@ -697,8 +697,7 @@ func (m *MockPersistence) DeleteAllTaskStates() error func (m *MockPersistence) CleanupCompletedTasks() error { return nil } func (m *MockPersistence) SaveTaskPolicy(taskType string, policy *TaskPolicy) error { return nil } -func TestMaintenanceQueue_LoadTasksStartsEmpty(t *testing.T) { - // Setup +func TestMaintenanceQueue_LoadRequeuesInFlightTasks(t *testing.T) { policy := &MaintenancePolicy{ TaskPolicies: map[string]*worker_pb.TaskPolicy{ "balance": {MaxConcurrent: 1}, @@ -706,24 +705,30 @@ func TestMaintenanceQueue_LoadTasksStartsEmpty(t *testing.T) { } mq := NewMaintenanceQueue(policy) - // Setup mock persistence with tasks — these should NOT be loaded - mockTask := &MaintenanceTask{ - ID: "old_task_123", - Type: "balance", - Status: TaskStatusPending, - } - mq.SetPersistence(&MockPersistence{tasks: []*MaintenanceTask{mockTask}}) + // Non-terminal tasks must be re-queued across a restart; an in-progress + // task is reset to pending (its worker is gone). Terminal tasks are dropped. + mq.SetPersistence(&MockPersistence{tasks: []*MaintenanceTask{ + {ID: "pending_1", Type: "balance", Status: TaskStatusPending}, + {ID: "inprogress_1", Type: "balance", Status: TaskStatusInProgress, WorkerID: "gone", Progress: 42}, + {ID: "done_1", Type: "balance", Status: TaskStatusCompleted}, + }}) - // LoadTasksFromPersistence should be a no-op — scanner will re-detect - err := mq.LoadTasksFromPersistence() - if err != nil { + if err := mq.LoadTasksFromPersistence(); err != nil { t.Fatalf("LoadTasksFromPersistence failed: %v", err) } - // Queue should be empty — tasks will be re-detected by scanner - stats := mq.GetStats() - if stats.TotalTasks != 0 { - t.Errorf("Expected 0 tasks after startup, got %d", stats.TotalTasks) + if stats := mq.GetStats(); stats.TotalTasks != 2 { + t.Errorf("expected 2 re-queued tasks, got %d", stats.TotalTasks) + } + ip := mq.tasks["inprogress_1"] + if ip == nil { + t.Fatal("in-progress task was not re-queued") + } + if ip.Status != TaskStatusPending || ip.WorkerID != "" || ip.Progress != 0 { + t.Errorf("in-progress task not reset to pending: status=%q worker=%q progress=%v", ip.Status, ip.WorkerID, ip.Progress) + } + if mq.tasks["done_1"] != nil { + t.Error("completed task should not be re-queued") } }