diff --git a/weed/admin/plugin/cluster_rate_limit.go b/weed/admin/plugin/cluster_rate_limit.go new file mode 100644 index 000000000..e1802a5f7 --- /dev/null +++ b/weed/admin/plugin/cluster_rate_limit.go @@ -0,0 +1,150 @@ +package plugin + +import ( + "strconv" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" +) + +// Job types whose ExecuteJobRequest needs per-worker rate-allocation +// metadata injected. Keyed by the job-type string so plugin.go's +// generic dispatch path stays job-agnostic. +// +// To add a new job type to the share-allocation pipeline: register an +// entry here that knows how to read its admin-config field(s) and +// produce the metadata keys/values the worker reads. + +// s3LifecycleClusterDeletesPerSecondKey, s3LifecycleClusterDeletesBurstKey, +// s3LifecycleMetadataDeletesPerSecond, and s3LifecycleMetadataDeletesBurst +// are the contract between admin and worker. The values must match the +// constants exported from weed/worker/tasks/s3_lifecycle/cluster_rate_limit.go +// — duplicated here as plain strings rather than imported so the admin +// plugin package doesn't pull a dependency on the worker handler +// package. A mismatch on either side would silently disable rate +// limiting; tests pin the constants in both packages against the same +// values. +const ( + s3LifecycleJobType = "s3_lifecycle" + s3LifecycleClusterDeletesPerSecondKey = "cluster_deletes_per_second" + s3LifecycleClusterDeletesBurstKey = "cluster_deletes_burst" + s3LifecycleMetadataDeletesPerSecond = "s3_lifecycle.deletes_per_second" + s3LifecycleMetadataDeletesBurst = "s3_lifecycle.deletes_burst" +) + +// decorateClusterContextForJob returns a new ClusterContext with any +// per-job-type metadata the admin needs to inject before the +// ExecuteJobRequest is sent. Returns the input cc unchanged when no +// decoration applies. +// +// Today only s3_lifecycle decorates; the function exists so a future +// job type's plumbing slots in alongside without touching +// executeJobWithExecutor. +// +// maxJobsPerDetection is the job-type's AdminRuntimeConfig.MaxJobsPerDetection +// — the cap on how many parallel instances of this job the scheduler will +// dispatch per detection cycle. For singleton jobs (s3_lifecycle has +// MaxJobsPerDetection=1) only one worker is ever active, so the cluster +// budget must go to that worker undivided. For parallel-dispatch jobs the +// budget divides across the actually-running set, not across every +// capable worker. The divisor is min(executors, maxJobsPerDetection), +// clamped to ≥1. +func (r *Plugin) decorateClusterContextForJob(cc *plugin_pb.ClusterContext, jobType string, adminConfigValues map[string]*plugin_pb.ConfigValue, maxJobsPerDetection int) *plugin_pb.ClusterContext { + if cc == nil { + return cc + } + if jobType != s3LifecycleJobType { + return cc + } + rps := readNonNegativeInt(adminConfigValues, s3LifecycleClusterDeletesPerSecondKey) + burst := readNonNegativeInt(adminConfigValues, s3LifecycleClusterDeletesBurstKey) + if rps <= 0 { + // Operator hasn't configured a cluster cap; nothing to allocate. + // The worker treats missing metadata keys as "unlimited," which + // is the legacy behavior. + return cc + } + executors := r.registry.CountCapableExecutors(jobType) + if executors <= 0 { + // No executors means the job won't dispatch at all; metadata + // would be discarded. Log so the case is visible in ops. + glog.V(2).Infof("decorateClusterContext: %s rps=%d but no execute-capable workers; skipping allocation", jobType, rps) + return cc + } + // Divide by the number of *concurrently-running* workers, not the + // number of capable ones. A singleton job (maxJobs=1) gets the full + // budget on its single active worker. + activeWorkers := executors + if maxJobsPerDetection > 0 && maxJobsPerDetection < activeWorkers { + activeWorkers = maxJobsPerDetection + } + perWorkerRps := float64(rps) / float64(activeWorkers) + perWorkerBurst := 0 + if burst > 0 { + perWorkerBurst = burst / activeWorkers + if perWorkerBurst < 1 { + perWorkerBurst = 1 + } + } + + // Clone so we don't mutate the shared base context. The metadata + // map is small; a fresh allocation per ExecuteJob is fine. + out := cloneClusterContext(cc) + if out.Metadata == nil { + out.Metadata = map[string]string{} + } + out.Metadata[s3LifecycleMetadataDeletesPerSecond] = strconv.FormatFloat(perWorkerRps, 'f', -1, 64) + if perWorkerBurst > 0 { + out.Metadata[s3LifecycleMetadataDeletesBurst] = strconv.Itoa(perWorkerBurst) + } + glog.V(3).Infof("decorateClusterContext: %s rps=%d burst=%d executors=%d maxJobs=%d active=%d -> per-worker rps=%g burst=%d", + jobType, rps, burst, executors, maxJobsPerDetection, activeWorkers, perWorkerRps, perWorkerBurst) + return out +} + +// cloneClusterContext returns a shallow-but-safe copy: the top-level +// fields are reassigned, and the Metadata map is duplicated so the +// caller can mutate it without racing other consumers of the input. +// Slices of strings (master/filer/volume/s3 addresses) are copied by +// reference — those are treated as immutable elsewhere in the codebase. +func cloneClusterContext(in *plugin_pb.ClusterContext) *plugin_pb.ClusterContext { + if in == nil { + return nil + } + out := &plugin_pb.ClusterContext{ + MasterGrpcAddresses: in.MasterGrpcAddresses, + FilerGrpcAddresses: in.FilerGrpcAddresses, + VolumeGrpcAddresses: in.VolumeGrpcAddresses, + S3GrpcAddresses: in.S3GrpcAddresses, + } + if in.Metadata != nil { + out.Metadata = make(map[string]string, len(in.Metadata)) + for k, v := range in.Metadata { + out.Metadata[k] = v + } + } + return out +} + +// readNonNegativeInt reads an int64 admin config value, treating +// missing fields and non-int kinds as 0. Negative values are clamped +// to 0 since the AdminConfigForm declares MinValue=0 on both fields. +func readNonNegativeInt(values map[string]*plugin_pb.ConfigValue, field string) int { + v, ok := values[field] + if !ok || v == nil { + return 0 + } + switch k := v.Kind.(type) { + case *plugin_pb.ConfigValue_Int64Value: + if k.Int64Value < 0 { + return 0 + } + return int(k.Int64Value) + case *plugin_pb.ConfigValue_DoubleValue: + if k.DoubleValue < 0 { + return 0 + } + return int(k.DoubleValue) + } + return 0 +} diff --git a/weed/admin/plugin/cluster_rate_limit_test.go b/weed/admin/plugin/cluster_rate_limit_test.go new file mode 100644 index 000000000..9a336db40 --- /dev/null +++ b/weed/admin/plugin/cluster_rate_limit_test.go @@ -0,0 +1,177 @@ +package plugin + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// pluginWithExecutors returns a Plugin whose registry contains n +// non-stale execute-capable workers for jobType. Helper for the +// allocator tests. Bypasses UpsertFromHello so tests don't have to +// build a full Hello message. +func pluginWithExecutors(t *testing.T, jobType string, n int) *Plugin { + t.Helper() + reg := NewRegistry() + now := time.Now() + for i := 0; i < n; i++ { + id := "worker-" + string(rune('a'+i)) + reg.sessions[id] = &WorkerSession{ + WorkerID: id, + LastSeenAt: now, + ConnectedAt: now, + Capabilities: map[string]*plugin_pb.JobTypeCapability{ + jobType: {CanExecute: true}, + }, + } + } + return &Plugin{registry: reg} +} + +// adminConfig builds an int64 admin config map for the given fields. +func adminConfig(pairs ...interface{}) map[string]*plugin_pb.ConfigValue { + if len(pairs)%2 != 0 { + panic("adminConfig expects key/value pairs") + } + out := map[string]*plugin_pb.ConfigValue{} + for i := 0; i < len(pairs); i += 2 { + key := pairs[i].(string) + switch v := pairs[i+1].(type) { + case int: + out[key] = &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(v)}} + case int64: + out[key] = &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: v}} + case float64: + out[key] = &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: v}} + default: + panic("adminConfig: unsupported value type") + } + } + return out +} + +func TestDecorateClusterContext_NonS3LifecycleIsPassThrough(t *testing.T) { + // Any job type other than s3_lifecycle gets the input cc back + // unchanged. Future allocators add their own branch; the default + // is pass-through. + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + in := &plugin_pb.ClusterContext{Metadata: map[string]string{"unrelated": "v"}} + out := r.decorateClusterContextForJob(in, "some_other_job", adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) + assert.Same(t, in, out, "non-allocator job type must return the same pointer") +} + +func TestDecorateClusterContext_RpsZeroSkipsAllocation(t *testing.T) { + // rps=0 means "operator hasn't configured a cap"; the worker + // treats missing keys as unlimited. We must NOT inject any + // metadata (in particular, not "0") because that would force the + // worker into a no-throughput state on a misconfigured cluster. + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 0), 1) + if out.Metadata != nil { + _, hasRps := out.Metadata[s3LifecycleMetadataDeletesPerSecond] + assert.False(t, hasRps, "rps=0 must not write a deletes_per_second key") + } +} + +func TestDecorateClusterContext_NoExecutorsSkipsAllocation(t *testing.T) { + r := pluginWithExecutors(t, s3LifecycleJobType, 0) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) + if out.Metadata != nil { + _, hasRps := out.Metadata[s3LifecycleMetadataDeletesPerSecond] + assert.False(t, hasRps, "0 executors must not write share metadata (would divide by zero)") + } +} + +func TestDecorateClusterContext_SingletonJobGetsFullBudget(t *testing.T) { + // s3_lifecycle has MaxJobsPerDetection=1: only ONE worker runs the + // job at a time. The cluster budget must go to that worker undivided + // — dividing by N capable executors would starve the active worker + // to 1/N of the configured rps. Pin the singleton behavior. + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) + require.NotNil(t, out.Metadata) + assert.Equal(t, "100", out.Metadata[s3LifecycleMetadataDeletesPerSecond], "singleton job: full budget to the single active worker") +} + +func TestDecorateClusterContext_SharedEvenlyWhenParallelLimited(t *testing.T) { + // Hypothetical parallel-dispatch job type (maxJobs=4): budget + // divides across the running-set, which equals min(executors, + // maxJobs)=4. 100/4=25. + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 4) + require.NotNil(t, out.Metadata) + assert.Equal(t, "25", out.Metadata[s3LifecycleMetadataDeletesPerSecond], "maxJobs=4 across 4 executors = 25/worker") +} + +func TestDecorateClusterContext_MaxJobsExceedsExecutors(t *testing.T) { + // maxJobs=10 but only 4 executors exist — the divisor is the + // smaller value (executors) since you can't run more jobs in + // parallel than there are workers to run them. + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 10) + require.NotNil(t, out.Metadata) + assert.Equal(t, "25", out.Metadata[s3LifecycleMetadataDeletesPerSecond]) +} + +func TestDecorateClusterContext_BurstSharedWhenParallel(t *testing.T) { + r := pluginWithExecutors(t, s3LifecycleJobType, 2) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 20), 2) + require.NotNil(t, out.Metadata) + assert.Equal(t, "50", out.Metadata[s3LifecycleMetadataDeletesPerSecond]) + assert.Equal(t, "10", out.Metadata[s3LifecycleMetadataDeletesBurst]) +} + +func TestDecorateClusterContext_BurstZeroOmitsKey(t *testing.T) { + // burst=0 means "let the worker default it." Don't write the key — + // the worker's parsePositiveInt would then take the unset path + // and compute 2 × rps automatically. + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 0), 4) + _, hasBurst := out.Metadata[s3LifecycleMetadataDeletesBurst] + assert.False(t, hasBurst, "burst=0 must NOT write the burst key (worker default kicks in)") +} + +func TestDecorateClusterContext_BurstFloorIsOneWhenDividesBelowOne(t *testing.T) { + // burst=1 across 4 active workers would round to 0; clamp to 1 so + // the limiter doesn't become "single-token bucket that never refills." + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + in := &plugin_pb.ClusterContext{} + out := r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100, s3LifecycleClusterDeletesBurstKey, 1), 4) + assert.Equal(t, "1", out.Metadata[s3LifecycleMetadataDeletesBurst]) +} + +func TestDecorateClusterContext_DoesNotMutateInput(t *testing.T) { + // The same base ClusterContext is shared across many parallel + // ExecuteJob calls. The decorator must produce a fresh map so it + // can't race / leak per-job metadata into the base. + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + baseMeta := map[string]string{"existing": "value"} + in := &plugin_pb.ClusterContext{Metadata: baseMeta} + _ = r.decorateClusterContextForJob(in, s3LifecycleJobType, + adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) + _, leaked := baseMeta[s3LifecycleMetadataDeletesPerSecond] + assert.False(t, leaked, "decorator must not mutate the input metadata map") + assert.Equal(t, "value", baseMeta["existing"]) +} + +func TestDecorateClusterContext_NilInputPassesThrough(t *testing.T) { + r := pluginWithExecutors(t, s3LifecycleJobType, 4) + out := r.decorateClusterContextForJob(nil, s3LifecycleJobType, adminConfig(s3LifecycleClusterDeletesPerSecondKey, 100), 1) + assert.Nil(t, out) +} diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index a48670d40..d50d7d2a3 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -649,6 +649,14 @@ func (r *Plugin) executeJobWithExecutor( return nil, err } + // Apply per-job-type cluster-allocation decoration (e.g. s3_lifecycle + // divides cluster_deletes_per_second by min(workers, maxJobsPerDetection) + // and ships the share via ClusterContext.Metadata). No-op for job + // types without an allocator registered. MaxJobsPerDetection caps + // the divisor so a singleton job (maxJobs=1) gets the full budget on + // the single active worker, not 1/N of it. + clusterContext = r.decorateClusterContextForJob(clusterContext, job.JobType, adminConfigValues, int(adminRuntime.GetMaxJobsPerDetection())) + completedCh := make(chan *plugin_pb.JobCompleted, 1) r.pendingExecutionMu.Lock() r.pendingExecution[requestID] = completedCh diff --git a/weed/admin/plugin/registry.go b/weed/admin/plugin/registry.go index 033a11fcd..46d51e79d 100644 --- a/weed/admin/plugin/registry.go +++ b/weed/admin/plugin/registry.go @@ -142,6 +142,31 @@ func (r *Registry) HasCapableWorker(jobType string) bool { return false } +// CountCapableExecutors returns the number of non-stale workers that +// can EXECUTE the given job type. Used by per-job-type cluster +// allocators (e.g. the s3_lifecycle delete-rate divider) to compute a +// per-worker share at dispatch time. Returns 0 when no executor is +// available — callers should treat that as "skip allocation" rather +// than dividing by zero. +func (r *Registry) CountCapableExecutors(jobType string) int { + r.mu.RLock() + defer r.mu.RUnlock() + + now := time.Now() + n := 0 + for _, session := range r.sessions { + if r.isSessionStaleLocked(session, now) { + continue + } + capability := session.Capabilities[jobType] + if capability == nil || !capability.CanExecute { + continue + } + n++ + } + return n +} + // DetectableJobTypes returns sorted job types that currently have at least one detect-capable worker. func (r *Registry) DetectableJobTypes() []string { r.mu.RLock() diff --git a/weed/s3api/s3lifecycle/dailyrun/run.go b/weed/s3api/s3lifecycle/dailyrun/run.go index 772055b92..879936ada 100644 --- a/weed/s3api/s3lifecycle/dailyrun/run.go +++ b/weed/s3api/s3lifecycle/dailyrun/run.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/engine" "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/reader" "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/router" + "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" "golang.org/x/time/rate" ) @@ -401,9 +402,11 @@ func processMatches(ctx context.Context, cfg Config, runNow time.Time, ev *reade continue } if cfg.Limiter != nil { + waitStart := time.Now() if waitErr := cfg.Limiter.Wait(ctx); waitErr != nil { return skippedAny, true, waitErr } + stats.S3LifecycleDispatchLimiterWaitSeconds.Observe(time.Since(waitStart).Seconds()) } outcome, dispatchErr := dispatchWithRetry(ctx, cfg.Client, m) if dispatchErr != nil { diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 2aeb843ba..b225db356 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -592,6 +592,22 @@ var ( Name: "metadata_only_total", Help: "Counter of LifecycleDelete completions that skipped per-chunk delete (volume TTL reclaim).", }, []string{"bucket", "rule_hash"}) + + // S3LifecycleDispatchLimiterWaitSeconds is the cluster-wide rate + // limiter's per-dispatch wait time on the daily-replay path. The + // limiter blocks just before each LifecycleDelete RPC; near-zero + // observations mean the cluster cap isn't binding, a long-tail at + // the configured 1/rate ceiling means the cluster cap is the + // active throttle. Operators tune cluster_deletes_per_second by + // reading p95/p99 on this histogram. + S3LifecycleDispatchLimiterWaitSeconds = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: "s3_lifecycle", + Name: "dispatch_limiter_wait_seconds", + Help: "Time spent waiting on the cluster rate limiter before issuing a LifecycleDelete RPC. Non-zero values indicate the cluster cap is binding.", + Buckets: []float64{0.0001, 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10}, + }) ) func init() { @@ -668,6 +684,7 @@ func init() { Gather.MustRegister(S3LifecycleEventCounter) Gather.MustRegister(S3LifecycleBootstrapDispatchCounter) Gather.MustRegister(S3LifecycleMetadataOnlyCounter) + Gather.MustRegister(S3LifecycleDispatchLimiterWaitSeconds) Gather.MustRegister(UploadErrorCounter) diff --git a/weed/worker/tasks/s3_lifecycle/cluster_rate_limit.go b/weed/worker/tasks/s3_lifecycle/cluster_rate_limit.go new file mode 100644 index 000000000..1e92dd6df --- /dev/null +++ b/weed/worker/tasks/s3_lifecycle/cluster_rate_limit.go @@ -0,0 +1,34 @@ +package s3_lifecycle + +// Cluster-wide rate-limit configuration plumbing for the daily-replay +// worker. The admin holds a single "cluster delete budget" knob, divides +// it by the number of execute-capable s3_lifecycle workers at job-dispatch +// time, and ships the per-worker share to the worker via +// ExecuteJobRequest.ClusterContext.Metadata. The worker reads the share, +// constructs a rate.Limiter, and passes it to dailyrun.Run. +// +// These constants are the contract between admin (weed/admin/plugin/plugin.go +// computes the share and writes the keys) and worker (this package's +// handler.go reads them). Changing a name on one side without the other +// would silently disable rate limiting — both sides must read these +// exact values. + +const ( + // ClusterDeletesPerSecondAdminKey is the admin-config field that + // holds the cluster-wide budget in delete RPCs per second. 0 means + // unlimited (legacy behavior). Set via the AdminConfigForm in + // handler.go's "Scope" section. + ClusterDeletesPerSecondAdminKey = "cluster_deletes_per_second" + // ClusterDeletesBurstAdminKey holds the token-bucket burst. 0 means + // "2 × rps" (computed by the admin allocator). + ClusterDeletesBurstAdminKey = "cluster_deletes_burst" + + // MetadataKeyDeletesPerSecond is the per-worker share value the + // admin writes into ClusterContext.Metadata at ExecuteJob time. + // Stored as a string of a non-negative float64; empty/missing/zero + // means "no rate limit on this run" (cfg.Limiter stays nil). + MetadataKeyDeletesPerSecond = "s3_lifecycle.deletes_per_second" + // MetadataKeyDeletesBurst is the per-worker burst share. Stored as + // a string of a non-negative integer. + MetadataKeyDeletesBurst = "s3_lifecycle.deletes_burst" +) diff --git a/weed/worker/tasks/s3_lifecycle/cluster_rate_limit_test.go b/weed/worker/tasks/s3_lifecycle/cluster_rate_limit_test.go new file mode 100644 index 000000000..4b3c1b2a4 --- /dev/null +++ b/weed/worker/tasks/s3_lifecycle/cluster_rate_limit_test.go @@ -0,0 +1,78 @@ +package s3_lifecycle + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBuildLimiterFromClusterContext_NilCC(t *testing.T) { + l, desc := buildLimiterFromClusterContext(nil) + assert.Nil(t, l) + assert.Equal(t, "unlimited", desc) +} + +func TestBuildLimiterFromClusterContext_NoMetadata(t *testing.T) { + l, desc := buildLimiterFromClusterContext(&plugin_pb.ClusterContext{}) + assert.Nil(t, l) + assert.Equal(t, "unlimited", desc) +} + +func TestBuildLimiterFromClusterContext_MissingRateKey(t *testing.T) { + l, desc := buildLimiterFromClusterContext(&plugin_pb.ClusterContext{ + Metadata: map[string]string{"unrelated": "x"}, + }) + assert.Nil(t, l) + assert.Equal(t, "unlimited", desc) +} + +func TestBuildLimiterFromClusterContext_NonPositiveRate(t *testing.T) { + // 0 or negative rate means the admin didn't allocate; the worker + // must NOT construct a limiter that throttles every request to + // zero-throughput. + for _, raw := range []string{"0", "-1", "0.0", "not-a-number", ""} { + l, desc := buildLimiterFromClusterContext(&plugin_pb.ClusterContext{ + Metadata: map[string]string{MetadataKeyDeletesPerSecond: raw}, + }) + assert.Nil(t, l, "rate=%q must yield nil limiter", raw) + assert.Equal(t, "unlimited", desc) + } +} + +func TestBuildLimiterFromClusterContext_PositiveRateBuildsLimiter(t *testing.T) { + l, desc := buildLimiterFromClusterContext(&plugin_pb.ClusterContext{ + Metadata: map[string]string{ + MetadataKeyDeletesPerSecond: "12.5", + MetadataKeyDeletesBurst: "25", + }, + }) + require.NotNil(t, l) + assert.Equal(t, 12.5, float64(l.Limit())) + assert.Equal(t, 25, l.Burst()) + assert.Contains(t, desc, "12.5/s") + assert.Contains(t, desc, "burst=25") +} + +func TestBuildLimiterFromClusterContext_BurstMissingDefaultsTo2xRate(t *testing.T) { + // burst omitted (admin allocator wrote nothing) → worker computes + // 2 × rps so a single tick has headroom for two refills. + l, _ := buildLimiterFromClusterContext(&plugin_pb.ClusterContext{ + Metadata: map[string]string{MetadataKeyDeletesPerSecond: "10"}, + }) + require.NotNil(t, l) + assert.Equal(t, 10.0, float64(l.Limit())) + assert.Equal(t, 20, l.Burst(), "burst default must be 2× rps") +} + +func TestBuildLimiterFromClusterContext_TinyRateClampsBurstToOne(t *testing.T) { + // A sub-1-rps allocation (e.g. 0.5/s across few workers) would + // compute 2 × 0.5 = 1, but if the int truncation produced 0 the + // limiter would never refill. Clamp to at least 1. + l, _ := buildLimiterFromClusterContext(&plugin_pb.ClusterContext{ + Metadata: map[string]string{MetadataKeyDeletesPerSecond: "0.1"}, + }) + require.NotNil(t, l) + assert.GreaterOrEqual(t, l.Burst(), 1, "burst must never round down to 0") +} diff --git a/weed/worker/tasks/s3_lifecycle/config.go b/weed/worker/tasks/s3_lifecycle/config.go index 104934def..ed3ec9f8c 100644 --- a/weed/worker/tasks/s3_lifecycle/config.go +++ b/weed/worker/tasks/s3_lifecycle/config.go @@ -9,7 +9,12 @@ import ( const ( jobType = "s3_lifecycle" - defaultWorkers = 1 + // shardPipelineGoroutines is the in-process fan-out across the + // 16-shard space. Kept as a hardcoded internal default — formerly + // an admin form field, removed because it's a per-worker tuning + // knob, not a cluster-coordination concern. + shardPipelineGoroutines = 1 + defaultDispatchTickMinutes = int64(1) defaultCheckpointTickSeconds = int64(30) defaultRefreshIntervalMinutes = int64(5) @@ -45,7 +50,7 @@ type Config struct { // admin+worker config values. Missing fields fall back to defaults. func ParseConfig(adminValues, workerValues map[string]*plugin_pb.ConfigValue) Config { cfg := Config{ - Workers: int(readInt64(adminValues, "workers", defaultWorkers)), + Workers: shardPipelineGoroutines, DispatchTick: time.Duration(readInt64(workerValues, "dispatch_tick_minutes", defaultDispatchTickMinutes)) * time.Minute, CheckpointTick: time.Duration(readInt64(workerValues, "checkpoint_tick_seconds", defaultCheckpointTickSeconds)) * time.Second, RefreshInterval: time.Duration(readInt64(workerValues, "refresh_interval_minutes", defaultRefreshIntervalMinutes)) * time.Minute, @@ -53,9 +58,6 @@ func ParseConfig(adminValues, workerValues map[string]*plugin_pb.ConfigValue) Co MaxRuntime: time.Duration(readInt64(workerValues, "max_runtime_minutes", defaultMaxRuntimeMinutes)) * time.Minute, Algorithm: readString(adminValues, "algorithm", defaultAlgorithm), } - if cfg.Workers <= 0 { - cfg.Workers = defaultWorkers - } if cfg.DispatchTick <= 0 { cfg.DispatchTick = time.Duration(defaultDispatchTickMinutes) * time.Minute } diff --git a/weed/worker/tasks/s3_lifecycle/config_test.go b/weed/worker/tasks/s3_lifecycle/config_test.go index aab98a477..dc7b91c7d 100644 --- a/weed/worker/tasks/s3_lifecycle/config_test.go +++ b/weed/worker/tasks/s3_lifecycle/config_test.go @@ -9,8 +9,8 @@ import ( func TestParseConfigDefaults(t *testing.T) { cfg := ParseConfig(nil, nil) - if cfg.Workers != defaultWorkers { - t.Errorf("Workers default=%d, want %d", cfg.Workers, defaultWorkers) + if cfg.Workers != shardPipelineGoroutines { + t.Errorf("Workers default=%d, want %d", cfg.Workers, shardPipelineGoroutines) } if cfg.DispatchTick != 1*time.Minute { t.Errorf("DispatchTick default=%v, want 1m", cfg.DispatchTick) @@ -58,9 +58,6 @@ func TestParseConfig_AlgorithmUnknownValueFallsBackToDefault(t *testing.T) { } func TestParseConfigOverrides(t *testing.T) { - admin := map[string]*plugin_pb.ConfigValue{ - "workers": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 4}}, - } worker := map[string]*plugin_pb.ConfigValue{ "dispatch_tick_minutes": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}}, "checkpoint_tick_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 15}}, @@ -68,10 +65,7 @@ func TestParseConfigOverrides(t *testing.T) { "bootstrap_interval_minutes": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 120}}, "max_runtime_minutes": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 120}}, } - cfg := ParseConfig(admin, worker) - if cfg.Workers != 4 { - t.Errorf("Workers=%d, want 4", cfg.Workers) - } + cfg := ParseConfig(nil, worker) if cfg.DispatchTick != 2*time.Minute { t.Errorf("DispatchTick=%v, want 2m", cfg.DispatchTick) } diff --git a/weed/worker/tasks/s3_lifecycle/handler.go b/weed/worker/tasks/s3_lifecycle/handler.go index a63d92a40..98c2404fd 100644 --- a/weed/worker/tasks/s3_lifecycle/handler.go +++ b/weed/worker/tasks/s3_lifecycle/handler.go @@ -3,6 +3,7 @@ package s3_lifecycle import ( "context" "fmt" + "strconv" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -17,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/engine" "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle/scheduler" "github.com/seaweedfs/seaweedfs/weed/util" + "golang.org/x/time/rate" "google.golang.org/grpc" ) @@ -72,17 +74,8 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { { SectionId: "scope", Title: "Scope", - Description: "How many pipeline goroutines split the 16-shard space.", + Description: "Cluster-wide algorithm choice and delete-throughput cap.", Fields: []*plugin_pb.ConfigField{ - { - Name: "workers", - Label: "Worker Count", - Description: "Number of pipeline goroutines per executing worker. Each owns a contiguous slice of [0, 16) shards. Default 1 = one goroutine handles all 16 shards.", - FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, - Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, - MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, - MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 16}}, - }, { Name: "algorithm", Label: "Algorithm", @@ -94,12 +87,29 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { {Value: AlgorithmStreaming, Label: "Streaming (legacy fallback)", Description: "Long-running reader + per-shard heap + tick dispatcher. Pre-cutover behavior; removed in Phase 5."}, }, }, + { + Name: ClusterDeletesPerSecondAdminKey, + Label: "Cluster Delete Rate (per second)", + Description: "Cluster-wide ceiling on lifecycle delete RPCs per second, divided evenly across active s3_lifecycle workers at job-dispatch time. 0 = unlimited (legacy behavior). Only honored by the Daily Replay algorithm; streaming ignores it.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, + }, + { + Name: ClusterDeletesBurstAdminKey, + Label: "Cluster Delete Burst", + Description: "Token-bucket burst capacity across the cluster (max simultaneous deletes). 0 = 2 × rate. Same allocation rule as the rate.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, + }, }, }, }, DefaultValues: map[string]*plugin_pb.ConfigValue{ - "workers": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultWorkers}}, - "algorithm": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultAlgorithm}}, + "algorithm": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultAlgorithm}}, + ClusterDeletesPerSecondAdminKey: {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, + ClusterDeletesBurstAdminKey: {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, }, }, WorkerConfigForm: &plugin_pb.ConfigForm{ @@ -302,18 +312,17 @@ func (h *Handler) executeDailyReplay(ctx context.Context, request *plugin_pb.Exe } eng.Compile(inputs, engine.CompileOptions{PriorStates: scheduler.AllActivePriorStates(inputs)}) - shards := make([]int, 0, cfg.Workers) - // One pass per shard ID across [0, ShardCount). cfg.Workers governs - // concurrency, not partitioning — every shard gets exactly one - // goroutine and the rate.Limiter is the throughput governor. + shards := make([]int, 0, s3lifecycle.ShardCount) for i := 0; i < s3lifecycle.ShardCount; i++ { shards = append(shards, i) } + limiter, limiterDesc := buildLimiterFromClusterContext(request.GetClusterContext()) + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: request.Job.JobId, JobType: jobType, State: plugin_pb.JobState_JOB_STATE_RUNNING, Stage: "starting", - Message: fmt.Sprintf("daily_replay shards=%d runtime=%s", len(shards), cfg.MaxRuntime), + Message: fmt.Sprintf("daily_replay shards=%d workers=%d runtime=%s rate=%s", len(shards), cfg.Workers, cfg.MaxRuntime, limiterDesc), }) runErr := dailyrun.Run(ctx, dailyrun.Config{ @@ -325,8 +334,8 @@ func (h *Handler) executeDailyReplay(ctx context.Context, request *plugin_pb.Exe Persister: &dailyrun.FilerCursorPersister{Store: dispatcher.NewFilerStoreClient(filerClient)}, Lister: dispatcher.NewFilerSiblingLister(filerClient, bucketsPath), Workers: cfg.Workers, + Limiter: limiter, ClientName: "worker-s3-lifecycle-daily", - // Limiter is wired in Phase 3 from ClusterContext.Metadata. }) if dailyrun.IsUnsupportedRule(runErr) { // Surface the typed error verbatim so admin marks the run as @@ -341,6 +350,59 @@ func (h *Handler) executeDailyReplay(ctx context.Context, request *plugin_pb.Exe return nil } +// buildLimiterFromClusterContext parses the per-worker share the admin +// wrote into ClusterContext.Metadata (see weed/admin/plugin/plugin.go's +// s3_lifecycle injection) and returns a rate.Limiter, or nil when no +// rate cap applies. The description string is for the JobProgressUpdate +// so operators can see "rate=unlimited" / "rate=12.5/s burst=25" in +// the activity log. +// +// Tolerant of missing keys, empty strings, malformed numbers, and +// non-positive values — all treated as "no limit" rather than failing +// the run. The admin allocator is the single point that decides whether +// to populate these keys; the worker doesn't second-guess. +func buildLimiterFromClusterContext(cc *plugin_pb.ClusterContext) (*rate.Limiter, string) { + if cc == nil || cc.Metadata == nil { + return nil, "unlimited" + } + rps, ok := parsePositiveFloat(cc.Metadata[MetadataKeyDeletesPerSecond]) + if !ok { + return nil, "unlimited" + } + burst, _ := parsePositiveInt(cc.Metadata[MetadataKeyDeletesBurst]) + if burst <= 0 { + // Sensible default: enough headroom for one tick's worth of + // throughput. Caller may also supply 0 to opt into this default. + burst = int(rps * 2) + if burst < 1 { + burst = 1 + } + } + return rate.NewLimiter(rate.Limit(rps), burst), fmt.Sprintf("%.3g/s burst=%d", rps, burst) +} + +func parsePositiveFloat(s string) (float64, bool) { + if s == "" { + return 0, false + } + v, err := strconv.ParseFloat(s, 64) + if err != nil || v <= 0 { + return 0, false + } + return v, true +} + +func parsePositiveInt(s string) (int, bool) { + if s == "" { + return 0, false + } + v, err := strconv.Atoi(s) + if err != nil || v <= 0 { + return 0, false + } + return v, true +} + // clusterS3Endpoints returns the master-discovered S3 gRPC addresses for the // cluster. The handler dials the first reachable one; the master refreshes // the list on KeepConnected so a stale entry self-heals on the next run. diff --git a/weed/worker/tasks/s3_lifecycle/handler_test.go b/weed/worker/tasks/s3_lifecycle/handler_test.go index c4529e1f4..9b326b636 100644 --- a/weed/worker/tasks/s3_lifecycle/handler_test.go +++ b/weed/worker/tasks/s3_lifecycle/handler_test.go @@ -284,31 +284,21 @@ func TestDescriptor_BasicShape(t *testing.T) { assert.Greater(t, d.DescriptorVersion, uint32(0), "descriptor version must be positive (admins use it for compat)") } -func TestDescriptor_AdminConfigFormHasWorkersField(t *testing.T) { - // Workers is the only admin-side knob today; if it disappears, the - // admin form would render empty and operators couldn't tune - // concurrency. +func TestDescriptor_AdminConfigFormHasNoWorkersField(t *testing.T) { + // "workers" used to be an admin form field controlling in-process + // pipeline goroutines. It's per-worker tuning, not a cluster-wide + // scope concern, so it was removed from the form. ParseConfig hard- + // codes cfg.Workers from shardPipelineGoroutines instead. h := NewHandler(nil) d := h.Descriptor() require.NotNil(t, d.AdminConfigForm) - assert.Equal(t, "s3-lifecycle-admin", d.AdminConfigForm.FormId) - - // Walk every section's fields and find "workers". - var found bool for _, sec := range d.AdminConfigForm.Sections { for _, f := range sec.Fields { - if f.Name == "workers" { - found = true - assert.Equal(t, plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, f.FieldType) - } + assert.NotEqual(t, "workers", f.Name, "admin form must NOT expose the in-memory pipeline-goroutine knob") } } - assert.True(t, found, "admin form must expose 'workers' field") - - // Default value matches the constant used by ParseConfig. - dv, ok := d.AdminConfigForm.DefaultValues["workers"] - require.True(t, ok, "workers must have a default in AdminConfigForm") - assert.Equal(t, int64(defaultWorkers), dv.GetInt64Value()) + _, hasDefault := d.AdminConfigForm.DefaultValues["workers"] + assert.False(t, hasDefault, "DefaultValues must NOT include 'workers' (form field removed)") } func TestDescriptor_WorkerConfigFormCadenceDefaultsMatchParseConfig(t *testing.T) {