mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-15 23:56:48 +03:00
admin/maintenance: reload in-flight tasks on startup instead of discarding them (#9857)
* admin/maintenance: reload in-flight tasks on startup instead of discarding LoadTasksFromPersistence deleted all persisted task files on startup and relied on the scanner to re-detect, so saved task state was never consumed — the persistence was effectively write-only. Reload non-terminal tasks (pending/assigned/in_progress) into the queue, resetting in-flight ones to pending since their worker is gone after a restart (maintenance tasks are idempotent). Terminal task files are dropped; the scanner still backfills anything not persisted. * address review: nil-guard reloaded tasks and SyncTask to ActiveTopology - skip nil entries from LoadAllTaskStates (corrupted state) - re-sync restored tasks with MaintenanceIntegration so ActiveTopology (in-memory, empty on startup) knows about them; otherwise GetNextTask's AssignTask rejects them as unknown and they never get assigned
This commit is contained in:
@@ -34,16 +34,66 @@ func (mq *MaintenanceQueue) SetPersistence(persistence TaskPersistence) {
|
||||
glog.V(1).Infof("Maintenance queue configured with task persistence")
|
||||
}
|
||||
|
||||
// LoadTasksFromPersistence is called on startup. Previous task states are NOT loaded
|
||||
// into memory — the maintenance scanner will re-detect current needs from the live
|
||||
// cluster state. Stale task files from previous runs are deleted from disk.
|
||||
// LoadTasksFromPersistence is called on startup to restore in-flight tasks
|
||||
// across an admin restart. Non-terminal tasks (pending/assigned/in_progress)
|
||||
// are re-queued as pending — the worker that held an in-flight task is gone
|
||||
// after a restart, and maintenance tasks are idempotent, so re-running a
|
||||
// partially-done one is safe. Terminal task files are deleted. Anything missed
|
||||
// is still re-detected by the scanner from live cluster state.
|
||||
func (mq *MaintenanceQueue) LoadTasksFromPersistence() error {
|
||||
if mq.persistence != nil {
|
||||
if err := mq.persistence.DeleteAllTaskStates(); err != nil {
|
||||
glog.Warningf("Failed to clean up old task files: %v", err)
|
||||
if mq.persistence == nil {
|
||||
glog.Infof("Task queue initialized (no persistence configured)")
|
||||
return nil
|
||||
}
|
||||
tasks, err := mq.persistence.LoadAllTaskStates()
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to load persisted task states: %v; starting with an empty queue", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var terminal []string
|
||||
var restored []*MaintenanceTask
|
||||
mq.mutex.Lock()
|
||||
requeued := 0
|
||||
for _, task := range tasks {
|
||||
if task == nil {
|
||||
continue
|
||||
}
|
||||
switch task.Status {
|
||||
case TaskStatusPending, TaskStatusAssigned, TaskStatusInProgress:
|
||||
task.Status = TaskStatusPending
|
||||
task.WorkerID = ""
|
||||
task.Progress = 0
|
||||
mq.tasks[task.ID] = task
|
||||
mq.pendingTasks = append(mq.pendingTasks, task)
|
||||
restored = append(restored, snapshotTask(task))
|
||||
requeued++
|
||||
default:
|
||||
terminal = append(terminal, task.ID)
|
||||
}
|
||||
}
|
||||
glog.Infof("Task queue initialized (previous tasks will be re-detected by scanner)")
|
||||
sort.Slice(mq.pendingTasks, func(i, j int) bool {
|
||||
if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority {
|
||||
return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority
|
||||
}
|
||||
return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
|
||||
})
|
||||
mq.mutex.Unlock()
|
||||
|
||||
// ActiveTopology is in-memory and starts empty, so restored tasks must be
|
||||
// re-synced or GetNextTask's AssignTask would reject them as unknown and
|
||||
// they'd sit pending forever. Done outside the lock, like the retry path.
|
||||
if mq.integration != nil {
|
||||
for _, task := range restored {
|
||||
mq.integration.SyncTask(task)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete stale terminal files outside the lock to avoid blocking on disk I/O.
|
||||
for _, id := range terminal {
|
||||
mq.deleteTaskState(id)
|
||||
}
|
||||
glog.Infof("Task queue initialized: re-queued %d in-flight task(s) from persistence; scanner will re-detect the rest", requeued)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -697,8 +697,7 @@ func (m *MockPersistence) DeleteAllTaskStates() error
|
||||
func (m *MockPersistence) CleanupCompletedTasks() error { return nil }
|
||||
func (m *MockPersistence) SaveTaskPolicy(taskType string, policy *TaskPolicy) error { return nil }
|
||||
|
||||
func TestMaintenanceQueue_LoadTasksStartsEmpty(t *testing.T) {
|
||||
// Setup
|
||||
func TestMaintenanceQueue_LoadRequeuesInFlightTasks(t *testing.T) {
|
||||
policy := &MaintenancePolicy{
|
||||
TaskPolicies: map[string]*worker_pb.TaskPolicy{
|
||||
"balance": {MaxConcurrent: 1},
|
||||
@@ -706,24 +705,30 @@ func TestMaintenanceQueue_LoadTasksStartsEmpty(t *testing.T) {
|
||||
}
|
||||
mq := NewMaintenanceQueue(policy)
|
||||
|
||||
// Setup mock persistence with tasks — these should NOT be loaded
|
||||
mockTask := &MaintenanceTask{
|
||||
ID: "old_task_123",
|
||||
Type: "balance",
|
||||
Status: TaskStatusPending,
|
||||
}
|
||||
mq.SetPersistence(&MockPersistence{tasks: []*MaintenanceTask{mockTask}})
|
||||
// Non-terminal tasks must be re-queued across a restart; an in-progress
|
||||
// task is reset to pending (its worker is gone). Terminal tasks are dropped.
|
||||
mq.SetPersistence(&MockPersistence{tasks: []*MaintenanceTask{
|
||||
{ID: "pending_1", Type: "balance", Status: TaskStatusPending},
|
||||
{ID: "inprogress_1", Type: "balance", Status: TaskStatusInProgress, WorkerID: "gone", Progress: 42},
|
||||
{ID: "done_1", Type: "balance", Status: TaskStatusCompleted},
|
||||
}})
|
||||
|
||||
// LoadTasksFromPersistence should be a no-op — scanner will re-detect
|
||||
err := mq.LoadTasksFromPersistence()
|
||||
if err != nil {
|
||||
if err := mq.LoadTasksFromPersistence(); err != nil {
|
||||
t.Fatalf("LoadTasksFromPersistence failed: %v", err)
|
||||
}
|
||||
|
||||
// Queue should be empty — tasks will be re-detected by scanner
|
||||
stats := mq.GetStats()
|
||||
if stats.TotalTasks != 0 {
|
||||
t.Errorf("Expected 0 tasks after startup, got %d", stats.TotalTasks)
|
||||
if stats := mq.GetStats(); stats.TotalTasks != 2 {
|
||||
t.Errorf("expected 2 re-queued tasks, got %d", stats.TotalTasks)
|
||||
}
|
||||
ip := mq.tasks["inprogress_1"]
|
||||
if ip == nil {
|
||||
t.Fatal("in-progress task was not re-queued")
|
||||
}
|
||||
if ip.Status != TaskStatusPending || ip.WorkerID != "" || ip.Progress != 0 {
|
||||
t.Errorf("in-progress task not reset to pending: status=%q worker=%q progress=%v", ip.Status, ip.WorkerID, ip.Progress)
|
||||
}
|
||||
if mq.tasks["done_1"] != nil {
|
||||
t.Error("completed task should not be re-queued")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user