fix(s3/lifecycle): report success to admin via JobCompleted (#9787)

This commit is contained in:
ahalaun
2026-06-02 20:28:56 +02:00
committed by GitHub
parent bcd2c958e1
commit 3ce4e0dbdf
2 changed files with 57 additions and 3 deletions
+21 -1
View File
@@ -256,7 +256,27 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ
defer s3Conn.Close()
rpc := s3_lifecycle_pb.NewSeaweedS3LifecycleInternalClient(s3Conn)
return h.executeDailyReplay(runCtx, request, bucketsPath, filerClient, rpc, cfg, sender)
if err := h.executeDailyReplay(runCtx, request, bucketsPath, filerClient, rpc, cfg, sender); err != nil {
return err
}
return sendSuccessCompletion(request, sender)
}
const dailyReplaySuccessSummary = "s3 lifecycle daily replay completed"
// JobCompleted must set JobType to the handler's jobType constant;
// routed requests may leave request.Job.JobType empty, and the admin
// ignores completions with empty JobType.
func sendSuccessCompletion(request *plugin_pb.ExecuteJobRequest, sender pluginworker.ExecutionSender) error {
return sender.SendCompleted(&plugin_pb.JobCompleted{
JobId: request.Job.JobId,
JobType: jobType,
Success: true,
Result: &plugin_pb.JobResult{
Summary: dailyReplaySuccessSummary,
},
})
}
// executeDailyReplay runs one bounded daily-replay pass via
+36 -2
View File
@@ -371,8 +371,9 @@ func TestDescriptor_AdminRuntimeDefaultsEnabledByDefault(t *testing.T) {
// the handler refuses malformed jobs early instead of waiting on a
// 30s dial timeout.
type recordingExecSender struct {
progress []*plugin_pb.JobProgressUpdate
completed []*plugin_pb.JobCompleted
progress []*plugin_pb.JobProgressUpdate
completed []*plugin_pb.JobCompleted
completedErr error
}
func (r *recordingExecSender) SendProgress(p *plugin_pb.JobProgressUpdate) error {
@@ -380,6 +381,9 @@ func (r *recordingExecSender) SendProgress(p *plugin_pb.JobProgressUpdate) error
return nil
}
func (r *recordingExecSender) SendCompleted(c *plugin_pb.JobCompleted) error {
if r.completedErr != nil {
return r.completedErr
}
r.completed = append(r.completed, c)
return nil
}
@@ -464,6 +468,36 @@ func TestExecute_EmptyJobTypeAccepted(t *testing.T) {
assert.Contains(t, err.Error(), "no s3 servers", "validation flowed past the type check")
}
// ---------- sendSuccessCompletion ----------
func TestSendSuccessCompletion_EmitsCanonicalCompletion(t *testing.T) {
sender := &recordingExecSender{}
request := &plugin_pb.ExecuteJobRequest{
Job: &plugin_pb.JobSpec{JobId: "job-123"}, // empty JobType (broadcast routing)
}
require.NoError(t, sendSuccessCompletion(request, sender))
require.Len(t, sender.completed, 1, "exactly one completion must be sent")
c := sender.completed[0]
assert.Equal(t, jobType, c.JobType, "completion must use the canonical jobType, not request.Job.JobType")
assert.NotEmpty(t, c.JobType, "an empty JobType is dropped by the admin")
assert.Equal(t, "job-123", c.JobId, "completion must echo the original JobId")
assert.True(t, c.Success, "a clean replay is a success completion")
require.NotNil(t, c.Result)
assert.NotEmpty(t, c.Result.Summary, "admin UI surfaces a non-empty Result.Summary")
}
func TestSendSuccessCompletion_PropagatesSendError(t *testing.T) {
sentinel := errors.New("stream closed")
sender := &recordingExecSender{completedErr: sentinel}
request := &plugin_pb.ExecuteJobRequest{Job: &plugin_pb.JobSpec{JobId: "job-456"}}
err := sendSuccessCompletion(request, sender)
require.Error(t, err)
assert.ErrorIs(t, err, sentinel)
}
// ---------- lookupBucketsPath ----------
// stubFilerConfigClient implements filer_pb.SeaweedFilerClient for the