From 3ce4e0dbdfa2d3fd02c68d9d3dc9e7490883a3d7 Mon Sep 17 00:00:00 2001 From: ahalaun Date: Tue, 2 Jun 2026 20:28:56 +0200 Subject: [PATCH] fix(s3/lifecycle): report success to admin via JobCompleted (#9787) --- weed/worker/tasks/s3_lifecycle/handler.go | 22 ++++++++++- .../worker/tasks/s3_lifecycle/handler_test.go | 38 ++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/weed/worker/tasks/s3_lifecycle/handler.go b/weed/worker/tasks/s3_lifecycle/handler.go index 896a363f1..499520376 100644 --- a/weed/worker/tasks/s3_lifecycle/handler.go +++ b/weed/worker/tasks/s3_lifecycle/handler.go @@ -256,7 +256,27 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ defer s3Conn.Close() rpc := s3_lifecycle_pb.NewSeaweedS3LifecycleInternalClient(s3Conn) - return h.executeDailyReplay(runCtx, request, bucketsPath, filerClient, rpc, cfg, sender) + if err := h.executeDailyReplay(runCtx, request, bucketsPath, filerClient, rpc, cfg, sender); err != nil { + return err + } + + return sendSuccessCompletion(request, sender) +} + +const dailyReplaySuccessSummary = "s3 lifecycle daily replay completed" + +// JobCompleted must set JobType to the handler's jobType constant; +// routed requests may leave request.Job.JobType empty, and the admin +// ignores completions with empty JobType. +func sendSuccessCompletion(request *plugin_pb.ExecuteJobRequest, sender pluginworker.ExecutionSender) error { + return sender.SendCompleted(&plugin_pb.JobCompleted{ + JobId: request.Job.JobId, + JobType: jobType, + Success: true, + Result: &plugin_pb.JobResult{ + Summary: dailyReplaySuccessSummary, + }, + }) } // executeDailyReplay runs one bounded daily-replay pass via diff --git a/weed/worker/tasks/s3_lifecycle/handler_test.go b/weed/worker/tasks/s3_lifecycle/handler_test.go index c1b98da89..f4a1481bd 100644 --- a/weed/worker/tasks/s3_lifecycle/handler_test.go +++ b/weed/worker/tasks/s3_lifecycle/handler_test.go @@ -371,8 +371,9 @@ func TestDescriptor_AdminRuntimeDefaultsEnabledByDefault(t *testing.T) { // the handler refuses malformed jobs early instead of waiting on a // 30s dial timeout. type recordingExecSender struct { - progress []*plugin_pb.JobProgressUpdate - completed []*plugin_pb.JobCompleted + progress []*plugin_pb.JobProgressUpdate + completed []*plugin_pb.JobCompleted + completedErr error } func (r *recordingExecSender) SendProgress(p *plugin_pb.JobProgressUpdate) error { @@ -380,6 +381,9 @@ func (r *recordingExecSender) SendProgress(p *plugin_pb.JobProgressUpdate) error return nil } func (r *recordingExecSender) SendCompleted(c *plugin_pb.JobCompleted) error { + if r.completedErr != nil { + return r.completedErr + } r.completed = append(r.completed, c) return nil } @@ -464,6 +468,36 @@ func TestExecute_EmptyJobTypeAccepted(t *testing.T) { assert.Contains(t, err.Error(), "no s3 servers", "validation flowed past the type check") } +// ---------- sendSuccessCompletion ---------- + +func TestSendSuccessCompletion_EmitsCanonicalCompletion(t *testing.T) { + sender := &recordingExecSender{} + request := &plugin_pb.ExecuteJobRequest{ + Job: &plugin_pb.JobSpec{JobId: "job-123"}, // empty JobType (broadcast routing) + } + + require.NoError(t, sendSuccessCompletion(request, sender)) + + require.Len(t, sender.completed, 1, "exactly one completion must be sent") + c := sender.completed[0] + assert.Equal(t, jobType, c.JobType, "completion must use the canonical jobType, not request.Job.JobType") + assert.NotEmpty(t, c.JobType, "an empty JobType is dropped by the admin") + assert.Equal(t, "job-123", c.JobId, "completion must echo the original JobId") + assert.True(t, c.Success, "a clean replay is a success completion") + require.NotNil(t, c.Result) + assert.NotEmpty(t, c.Result.Summary, "admin UI surfaces a non-empty Result.Summary") +} + +func TestSendSuccessCompletion_PropagatesSendError(t *testing.T) { + sentinel := errors.New("stream closed") + sender := &recordingExecSender{completedErr: sentinel} + request := &plugin_pb.ExecuteJobRequest{Job: &plugin_pb.JobSpec{JobId: "job-456"}} + + err := sendSuccessCompletion(request, sender) + require.Error(t, err) + assert.ErrorIs(t, err, sentinel) +} + // ---------- lookupBucketsPath ---------- // stubFilerConfigClient implements filer_pb.SeaweedFilerClient for the