Plugin Worker Scheduling
This page explains how the admin server discovers, schedules, and dispatches work to plugin workers.
Architecture Overview
Admin Server Workers
──────────── ───────
┌──────────────────┐
│ Scheduler Loop │
│ (single goroutine)│
└────────┬─────────┘
│
┌───────────┼───────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ vacuum │ │ balance │ │ ec │ ← per-job-type detection intervals
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
▼ ▼ ▼
┌───────────────────────────────────┐
│ gRPC Bidirectional │
│ Streams (one per worker) │
└────────┬──────────┬──────────┬────┘
▼ ▼ ▼
Worker A Worker B Worker C
All communication between the admin server and workers flows over a single bidirectional gRPC stream per worker (PluginControlService.WorkerStream). This stream carries registration, heartbeats, detection requests, job dispatch, progress updates, and completion reports.
Worker Registration
When a worker starts, it dials the admin server's gRPC port (admin HTTP port + 10000) and initiates a WorkerStream:
- Worker sends
WorkerHello— contains worker ID, capabilities (which job types it can detect and/or execute), and concurrency limits. - Admin upserts the worker into its Registry and replies with
AdminHello(heartbeat interval, reconnect settings). - Admin prefetches job type descriptors from the worker's declared capabilities.
Workers send WorkerHeartbeat every ~15 seconds, reporting:
- Detection and execution slot usage (used / total)
- Currently running work with progress
- Queued jobs by type
The registry uses heartbeats to track worker health and available capacity for scheduling decisions.
The Scheduler Loop
A single goroutine (schedulerLoop) drives all periodic scheduling:
loop:
runSchedulerIteration()
if hadJobs → loop immediately (stay busy)
if !hadJobs → sleep until the earliest job type's next detection time
Sequential Group Execution
Each scheduler iteration processes job types one at a time, sequentially. Each job type is a "group" — its detection and all resulting executions run as a single unit before the next job type begins. This ensures job types don't compete for the admin lock or cluster resources simultaneously.
Iteration starts (admin lock acquired)
│
├─ [Group 1] admin_script (max runtime: 30m, configurable)
│ ├─ detect → find scripts to run
│ └─ execute all detected jobs → blocks until done or timeout
│
├─ [Group 2] erasure_coding (max runtime: 30m, configurable)
│ ├─ detect → find volumes to encode
│ └─ execute all detected jobs → blocks until done or timeout
│
├─ [Group 3] vacuum (max runtime: 30m, configurable)
│ ├─ detect → find volumes to vacuum
│ └─ execute all detected jobs → blocks until done or timeout
│
├─ [Group 4] volume_balance (max runtime: 30m, configurable)
│ ├─ detect → find imbalanced servers
│ └─ execute all detected jobs → blocks until done or timeout
│
Iteration ends (lock released) → sleep until earliest nextDetectionAt
Job types are processed in alphabetical order. Each group only runs if its detection interval has elapsed — groups that aren't due yet are skipped with no cost.
Per-Group Timeout
Each job type group has a configurable max runtime (job_type_max_runtime_seconds, default 30 minutes) that caps the total time for detection + all executions in that group. If the timeout fires, remaining jobs in the group are canceled and the scheduler moves on to the next job type.
The timeout hierarchy within a group:
job_type_max_runtime_seconds (30m default, configurable per job type)
├── detection_timeout_seconds (45s default, capped by remaining runtime)
└── execution_timeout (per job, capped by remaining runtime)
├── Job 1 ─┐
├── Job 2 ├── up to global_execution_concurrency in parallel
└── Job N ─┘
This means a slow job type (e.g., erasure coding encoding large volumes) won't starve other job types indefinitely — it will be cut off at its max runtime.
Per-Job-Type Detection Intervals
Each job type has its own detection_interval_seconds that controls how often the scheduler checks for new work. These come from the job type's descriptor defaults and can be overridden in the admin UI per job type.
| Job Type | Default Interval |
|---|---|
vacuum |
2 hours |
volume_balance |
30 minutes |
erasure_coding |
5 minutes |
admin_script |
configurable |
iceberg_maintenance |
1 hour |
On each iteration, the scheduler:
- Lists all detectable job types from the registry (job types with at least one connected worker that has
CanDetectcapability). - For each job type, checks
markDetectionDue(jobType, interval):- If
now < nextDetectionAt[jobType]→ skip, not due yet. - If due → set
nextDetectionAt[jobType] = now + intervaland proceed.
- If
- Loads the scheduler policy for the job type (from persisted admin config or descriptor defaults).
- Runs the group: detection first, then dispatches any resulting jobs, all under the group timeout.
Scheduler Policy
Each job type's scheduling behavior is governed by a policy loaded from persisted configuration:
| Setting | Default | Description |
|---|---|---|
detection_interval_seconds |
varies by job type | How often to run detection |
detection_timeout_seconds |
45s | Max time for a detection request |
job_type_max_runtime_seconds |
30 min | Max total time for detection + execution per group |
max_jobs_per_detection |
1000 | Max proposals returned per detection run |
global_execution_concurrency |
1 | Max jobs dispatched in parallel within the group |
per_worker_execution_concurrency |
1 | Max jobs per worker |
retry_limit |
0 | How many times to retry a failed job |
retry_backoff_seconds |
5s | Wait between retries |
All of these are editable per job type in the admin UI at /plugin.
Detection Flow (Discovering Work)
Detection answers the question: "What work needs to be done for this job type?"
Scheduler Admin Server Worker
──────── ──────────── ──────
│ │ │
├─ detection due ──────────────────►│ │
│ ├─ pickDetector() ────────────►│
│ │ (round-robin, prefers │
│ │ leased worker) │
│ │ │
│ ├── RunDetectionRequest ──────►│
│ │ (job type, cluster context, │
│ │ admin config, max results) │
│ │ │
│ │ ├── handler.Detect()
│ │ │ queries cluster state
│ │ │
│ │◄── DetectionProposals ───────┤ (streamed in batches)
│ │◄── DetectionComplete ────────┤
│ │ │
│ ◄── proposals (filtered) ────────┤ │
│ │ │
- The scheduler picks a detector worker via round-robin selection (prefers a "leased" worker for consistency).
- Sends
RunDetectionRequestwith the cluster context (topology, volume layout) and admin config. - The worker's handler runs
Detect(), which inspects cluster state and streams backDetectionProposals— each proposal describes one unit of work (e.g., "vacuum volume 42", "move volume 7 from server A to server B"). - The worker sends
DetectionCompleteto signal it's done. - Admin filters proposals:
- Deduplication: Removes proposals whose
dedupe_keymatches an already-active job. - Within-run dedup: Removes duplicate proposals within the same detection run.
- Deduplication: Removes proposals whose
Execution Flow (Dispatching Jobs)
Once detection produces proposals, the scheduler converts them to JobSpec objects and dispatches them:
Scheduler Admin Server Worker
──────── ──────────── ──────
│ │ │
├─ dispatch proposals ─────────────►│ │
│ │ │
│ ┌─ worker pool ──────────────┐ │ │
│ │ (GlobalExecutionConcurrency│ │ │
│ │ goroutines) │ │ │
│ │ │ │ │
│ │ for each job: │ │ │
│ │ reserveExecutor() ───────┼──┤ │
│ │ (check heartbeat slots │ │ │
│ │ + local reservations │ │ │
│ │ vs per-worker limit) │ │ │
│ │ │ ├── ExecuteJobRequest ────────►│
│ │ │ │ (job spec, config, │
│ │ │ │ cluster context) │
│ │ │ │ ├── handler.Execute()
│ │ │ │◄── JobProgressUpdate ────────┤ (streamed)
│ │ │ │◄── JobCompleted ─────────────┤
│ │ release reservation ─────┼──┤ │
│ └────────────────────────────┘ │ │
- The scheduler creates a worker pool of
GlobalExecutionConcurrencygoroutines. - Each goroutine pulls a job from the queue and calls
reserveScheduledExecutor():- Lists executor workers for the job type, sorted by available slots.
- Checks the worker's heartbeat-reported slot usage plus local reservation count against
PerWorkerConcurrency. - If no executor has capacity, waits and retries with backoff until the
ExecutionTimeout.
- Sends
ExecuteJobRequestto the chosen worker. - The worker's handler runs
Execute(), streamingJobProgressUpdatemessages during execution. - When done, the worker sends
JobCompletedwith the result. - On failure, the scheduler retries up to
RetryLimittimes withRetryBackoffbetween attempts.
Slot-Based Capacity Control
The system enforces concurrency limits at multiple levels:
| Level | Control | Mechanism |
|---|---|---|
| Global | GlobalExecutionConcurrency |
Size of the dispatch worker pool |
| Per-Worker | PerWorkerConcurrency |
Checked against heartbeat slots + reservations |
| Worker-Side | -maxExecute flag |
Worker rejects with "at capacity" if slots full |
| Capability | MaxExecutionConcurrency in descriptor |
Per-job-type cap declared by the handler |
If a worker returns "at capacity", the scheduler tries the next available executor.
Job Lifecycle
A job transitions through these states:
pending → assigned → running → completed
→ failed (→ retry → assigned)
→ canceled
→ stale (expired)
Stale Job Expiration
expireStaleJobs() runs every scheduler iteration and marks jobs stale if:
- No heartbeat from the assigned worker for >15 minutes
- No progress update for >24 hours
This prevents stale jobs from blocking new work indefinitely.
Job Tracking
The admin server tracks:
- In-process jobs: Currently active jobs with progress (max ~1000)
- Activities: Audit log of all scheduler actions (max ~4000)
- Run history: Last 10 successful + 10 failed runs per job type
All tracked state is persisted to disk every 2 seconds (when dirty) via the persistence loop.
Worker-Side Processing
When the worker receives a message from the admin server:
| Message | Worker Action |
|---|---|
RunDetectionRequest |
Acquires detection slot, calls handler.Detect(sender). Handler streams proposals back. |
ExecuteJobRequest |
Acquires execution slot (returns "at capacity" if full). Calls handler.Execute(sender). Handler streams progress and sends completion. |
CancelRequest |
Cancels the context of the in-flight detection or execution. |
Workers are stateless — all scheduling state lives on the admin server. If a worker disconnects, the admin server detects it via missing heartbeats and marks its jobs stale.
Persistence
The admin server persists all plugin state to its -dataDir:
| Data | File | Purpose |
|---|---|---|
| Job type configs | plugin/<jobType>/config.json |
Admin runtime settings, admin/worker config values |
| Descriptors | plugin/<jobType>/descriptor.json |
Job type schema from worker |
| Tracked jobs | plugin/tracked_jobs.json |
In-flight and recent job history |
| Activities | plugin/activities.json |
Audit log |
| Run history | plugin/<jobType>/run_history.json |
Per-type success/error history |
| Scheduler config | plugin/scheduler_config.json |
Legacy global config |
If -dataDir is not set, all state is in-memory only.
Concrete Example: Volume Balance
With DetectionIntervalSeconds: 1800 (30 minutes), the flow is:
- t=0: Scheduler calls
markDetectionDue("volume_balance", 30m)→ due (first run). SetsnextDetectionAt = t+30m. - Detection: Picks a worker with
CanDetectforvolume_balance. Sends cluster context (topology with disk usage per server). Worker'sDetect()compares disk utilization, proposes moves like "move volume 42 from server-a to server-b". - Filtering: Admin checks no existing active job already targets volume 42.
- Dispatch: Up to 16 moves dispatched concurrently (
GlobalExecutionConcurrency: 16). Each move is sent to an available executor worker. - Execution: Worker copies volume data to the target server, reports progress, sends completion.
- t=30m: Scheduler wakes, runs detection again.
See Also
- Worker —
weed workercommand reference and options - Admin UI — Admin server setup and UI reference
- Migrate Maintenance Scripts to Admin Script Plugin — Migration guide from legacy maintenance scripts
Introduction
- Quick Start with weed mini
- Simplest S3 Bucket and User Setup
- Components
- Getting Started
- Production Setup
- A typical step‐by‐step example
- Benchmarks
- FAQ
- Applications
API
Configuration
- Replication
- Store file with a Time To Live
- Failover Master Server
- Erasure coding for warm storage
- EC Bitrot Detection
- Server Startup via Systemd
- Environment Variables
Filer
- Filer Setup
- Directories and Files
- File Operations Quick Reference
- Data Structure for Large Files
- Filer Data Encryption
- Filer Commands and Operations
- Filer JWT Use
- TUS Resumable Uploads
Filer Stores
- Filer Cassandra Setup
- Filer Redis Setup
- Super Large Directories
- Path-Specific Filer Store
- Choosing a Filer Store
- Customize Filer Store
Management
Advanced Filer Configurations
- Migrate to Filer Store
- Add New Filer Store
- Filer Store Replication
- Filer Active Active cross cluster continuous synchronization
- Filer as a Key-Large-Value Store
- Path Specific Configuration
- Filer Change Data Capture
- Filer Operation Serialization
FUSE Mount
- FIO benchmark
- fstab and systemd mount
- POSIX Compliance
- Distributed POSIX Locks
- P2P reading in weed mount
WebDAV
SFTP Server
Cloud Drive
- Cloud Drive Benefits
- Cloud Drive Architecture
- Configure Remote Storage
- Mount Remote Storage
- Cache Remote Storage
- Cloud Drive Quick Setup
- Gateway to Remote Object Storage
AWS S3 API
- Amazon S3 API
- Supported APIs vs Minio
- S3 Lifecycle
- S3 Lifecycle vs Volume TTL
- S3 Conditional Operations
- S3 CORS
- S3 Object Lock and Retention
- S3 Object Versioning
- S3 API Benchmark
- S3 API FAQ
- S3 Bucket Quota
- S3 Rate Limiting
- S3 API Audit log
- S3 Nginx Proxy
- Docker Compose for S3
S3 Table Bucket
- S3 Table Bucket
- S3 Table Bucket Commands
- S3 Tables Security
- SeaweedFS Iceberg Catalog
- Iceberg Table Maintenance
Iceberg Integrations
- Spark Iceberg Integration
- Trino Iceberg Integration
- Dremio Iceberg Integration
- DuckDB Iceberg Integration
- Doris Iceberg Integration
- RisingWave Iceberg Integration
- Lakekeeper Iceberg Integration
S3 Authentication & IAM
- S3 Configuration - Start Here
- S3 Credentials (
-s3.config) - OIDC Integration (
-s3.iam.config) - Kubernetes ServiceAccount Authentication (IRSA-style)
- S3 Policy Variables
- S3 Policy Conditions
- S3 Bucket Policies
- Amazon IAM API
- AWS IAM CLI
- weed shell - Shell IAM Commands
Server-Side Encryption
S3 Client Tools
- AWS CLI with SeaweedFS
- s3cmd with SeaweedFS
- rclone with SeaweedFS
- restic with SeaweedFS
- nodejs with Seaweed S3
Machine Learning
HDFS
- Hadoop Compatible File System
- run Spark on SeaweedFS
- run HBase on SeaweedFS
- run Presto on SeaweedFS
- Hadoop Benchmark
- HDFS via S3 connector
Replication and Backup
- Async Replication to another Filer [Deprecated]
- Async Backup
- Async Filer Metadata Backup
- Async Replication to Cloud [Deprecated]
- Kubernetes Backups and Recovery with K8up
Metadata Change Events
Messaging
- Structured Data Lake with SMQ and SQL
- Seaweed Message Queue
- SQL Queries on Message Queue
- SQL Quick Reference
- PostgreSQL-compatible Server weed db
- Pub-Sub to SMQ to SQL
- Kafka to Kafka Gateway to SMQ to SQL
Use Cases
Operations
- System Metrics
- weed shell
- Data Backup
- Deployment to Kubernetes and Minikube
- Deployment with seaweed-up
Rust Volume Server
Advanced
- Large File Handling
- Optimization
- Optimization for Many Small Buckets
- Volume Management
- Tiered Storage
- Cloud Tier
- Cloud Monitoring
- Load Command Line Options from a file
- SRV Service Discovery
- Volume Files Structure
Security
- Security Overview
- Security Configuration
- Cryptography and FIPS Compliance
- Run Blob Storage on Public Internet