mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-17 00:07:18 +03:00
1f6f473995
* refactor(worker): co-locate plugin handlers with their task packages
Move every per-task plugin handler from weed/plugin/worker/ into the
matching weed/worker/tasks/<name>/ package, so each task owns its
detection, scheduling, execution, and plugin handler in one place.
Step 0 (within pluginworker, no behavior change): extract shared helpers
that previously lived inside individual handler files into dedicated
files and export the ones now consumed across packages.
- activity.go: BuildExecutorActivity, BuildDetectorActivity
- config.go: ReadStringConfig/Double/Int64/Bytes/StringList, MapTaskPriority
- interval.go: ShouldSkipDetectionByInterval
- volume_state.go: VolumeState + consts, FilterMetricsByVolumeState/Location
- collection_filter.go: CollectionFilterMode + consts
- volume_metrics.go: export CollectVolumeMetricsFromMasters,
MasterAddressCandidates, FetchVolumeList
- testing_senders_test.go: shared test stubs
Phase 1: move the per-task plugin handlers (and the iceberg subpackage)
into their task packages.
weed/plugin/worker/vacuum_handler.go -> weed/worker/tasks/vacuum/plugin_handler.go
weed/plugin/worker/ec_balance_handler.go -> weed/worker/tasks/ec_balance/plugin_handler.go
weed/plugin/worker/erasure_coding_handler.go -> weed/worker/tasks/erasure_coding/plugin_handler.go
weed/plugin/worker/volume_balance_handler.go -> weed/worker/tasks/balance/plugin_handler.go
weed/plugin/worker/iceberg/ -> weed/worker/tasks/iceberg/
weed/plugin/worker/handlers/handlers.go now blank-imports all five
task subpackages so their init() registrations fire.
weed/command/mini.go and the worker tests construct the handler with
vacuum.DefaultMaxExecutionConcurrency (the constant moved with the
vacuum handler).
admin_script remains in weed/plugin/worker/ because there is no
underlying weed/worker/tasks/admin_script/ package to merge with.
* refactor(worker): update test/plugin_workers imports for moved handlers
Three handler constructors moved out of pluginworker into their task
packages — update the integration test files in test/plugin_workers/
to import from the new locations:
pluginworker.NewVacuumHandler -> vacuum.NewVacuumHandler
pluginworker.NewVolumeBalanceHandler -> balance.NewVolumeBalanceHandler
pluginworker.NewErasureCodingHandler -> erasure_coding.NewErasureCodingHandler
The pluginworker import is kept where the file still uses
pluginworker.WorkerOptions / pluginworker.JobHandler.
* refactor(worker): update test/s3tables iceberg import path
The iceberg subpackage moved from weed/plugin/worker/iceberg/ to
weed/worker/tasks/iceberg/. test/s3tables/maintenance/maintenance_integration_test.go
still imported the old path, breaking S3 Tables / RisingWave / Trino /
Spark / Iceberg-catalog / STS integration test builds.
Mirrors the OSS-side fix needed by every job in the run that
transitively imports test/s3tables/maintenance.
* chore: gofmt PR-touched files
The S3 Tables Format Check job runs `gofmt -l` over weed/s3api/s3tables
and test/s3tables, then fails if anything is unformatted. Files this
PR moved or modified had import-grouping and trailing-spacing issues
introduced by perl-based renames; reformat them with gofmt -w.
Touched files:
test/plugin_workers/erasure_coding/{detection,execution}_test.go
test/s3tables/maintenance/maintenance_integration_test.go
weed/plugin/worker/handlers/handlers.go
weed/worker/tasks/{balance,ec_balance,erasure_coding,vacuum}/plugin_handler*.go
* refactor(worker): bounds-checked int conversions for plugin config values
CodeQL flagged 18 go/incorrect-integer-conversion warnings on the moved
plugin handler files: results of pluginworker.ReadInt64Config (which
ultimately calls strconv.ParseInt with bit size 64) were being narrowed
to int32/uint32/int without an upper-bound check, so a malicious or
malformed admin/worker config value could overflow the target type.
Add three helpers in weed/plugin/worker/config.go that wrap
ReadInt64Config and clamp out-of-range values back to the caller's
fallback:
ReadInt32Config (math.MinInt32 .. math.MaxInt32)
ReadUint32Config (0 .. math.MaxUint32)
ReadIntConfig (math.MinInt32 .. math.MaxInt32, platform-portable)
Update each flagged call site in the four moved task packages to use
the bounds-checked helper. For protobuf uint32 fields (volume IDs)
the variable type also becomes uint32, removing the trailing
uint32(volumeID) casts and changing the "missing volume_id" check
from `<= 0` to `== 0`.
Touched files:
weed/plugin/worker/config.go
weed/worker/tasks/balance/plugin_handler.go
weed/worker/tasks/erasure_coding/plugin_handler.go
weed/worker/tasks/vacuum/plugin_handler.go
* refactor(worker): use ReadIntConfig for clamped derive-worker-config helpers
CodeQL still flagged three call sites where ReadInt64Config was being
narrowed to int after a value-range clamp (max_concurrent_moves <= 50,
batch_size <= 100, min_server_count >= 2). The clamp is correct but
CodeQL's flow analysis didn't recognize the bound, so it flagged them
as unbounded narrowing.
Switch to ReadIntConfig (already int32-bounded by the helper) for
those three sites, drop the now-redundant int64 intermediate variables.
Also drops the now-unused `> math.MaxInt32` clamp in
ec_balance.deriveECBalanceWorkerConfig (the helper covers it).
344 lines
10 KiB
Go
344 lines
10 KiB
Go
package erasure_coding
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
func TestDecodeErasureCodingTaskParamsFromPayload(t *testing.T) {
|
|
expected := &worker_pb.TaskParams{
|
|
TaskId: "task-ec-1",
|
|
VolumeId: 88,
|
|
Collection: "images",
|
|
Sources: []*worker_pb.TaskSource{
|
|
{
|
|
Node: "10.0.0.1:8080",
|
|
VolumeId: 88,
|
|
},
|
|
},
|
|
Targets: []*worker_pb.TaskTarget{
|
|
{
|
|
Node: "10.0.0.2:8080",
|
|
VolumeId: 88,
|
|
ShardIds: []uint32{0, 10},
|
|
},
|
|
},
|
|
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
|
|
ErasureCodingParams: &worker_pb.ErasureCodingTaskParams{
|
|
DataShards: ecstorage.DataShardsCount,
|
|
ParityShards: ecstorage.ParityShardsCount,
|
|
WorkingDir: "/tmp/ec-work",
|
|
CleanupSource: true,
|
|
},
|
|
},
|
|
}
|
|
payload, err := proto.Marshal(expected)
|
|
if err != nil {
|
|
t.Fatalf("marshal payload: %v", err)
|
|
}
|
|
|
|
job := &plugin_pb.JobSpec{
|
|
JobId: "job-from-admin",
|
|
Parameters: map[string]*plugin_pb.ConfigValue{
|
|
"task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}},
|
|
},
|
|
}
|
|
|
|
actual, err := decodeErasureCodingTaskParams(job)
|
|
if err != nil {
|
|
t.Fatalf("decodeErasureCodingTaskParams() err = %v", err)
|
|
}
|
|
if !proto.Equal(expected, actual) {
|
|
t.Fatalf("decoded params mismatch\nexpected: %+v\nactual: %+v", expected, actual)
|
|
}
|
|
}
|
|
|
|
func TestDecodeErasureCodingTaskParamsFallback(t *testing.T) {
|
|
targetServers := make([]string, 0, ecstorage.TotalShardsCount)
|
|
for i := 0; i < ecstorage.TotalShardsCount; i++ {
|
|
targetServers = append(targetServers, "10.0.0."+string(rune('a'+i))+":8080")
|
|
}
|
|
|
|
job := &plugin_pb.JobSpec{
|
|
JobId: "job-ec-2",
|
|
Parameters: map[string]*plugin_pb.ConfigValue{
|
|
"volume_id": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 7},
|
|
},
|
|
"source_server": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "127.0.0.1:8080"},
|
|
},
|
|
"target_servers": {
|
|
Kind: &plugin_pb.ConfigValue_StringList{
|
|
StringList: &plugin_pb.StringList{Values: targetServers},
|
|
},
|
|
},
|
|
"collection": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "videos"},
|
|
},
|
|
},
|
|
}
|
|
|
|
params, err := decodeErasureCodingTaskParams(job)
|
|
if err != nil {
|
|
t.Fatalf("decodeErasureCodingTaskParams() err = %v", err)
|
|
}
|
|
if params.TaskId != "job-ec-2" || params.VolumeId != 7 || params.Collection != "videos" {
|
|
t.Fatalf("unexpected basic params: %+v", params)
|
|
}
|
|
if len(params.Sources) != 1 || params.Sources[0].Node != "127.0.0.1:8080" {
|
|
t.Fatalf("unexpected sources: %+v", params.Sources)
|
|
}
|
|
if len(params.Targets) != ecstorage.TotalShardsCount {
|
|
t.Fatalf("unexpected target count: %d", len(params.Targets))
|
|
}
|
|
if params.GetErasureCodingParams() == nil {
|
|
t.Fatalf("expected fallback erasure coding params")
|
|
}
|
|
}
|
|
|
|
func TestDeriveErasureCodingWorkerConfig(t *testing.T) {
|
|
values := map[string]*plugin_pb.ConfigValue{
|
|
"quiet_for_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 720},
|
|
},
|
|
"fullness_ratio": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.92},
|
|
},
|
|
"min_size_mb": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 128},
|
|
},
|
|
}
|
|
|
|
cfg := deriveErasureCodingWorkerConfig(values)
|
|
if cfg.TaskConfig.QuietForSeconds != 720 {
|
|
t.Fatalf("expected quiet_for_seconds 720, got %d", cfg.TaskConfig.QuietForSeconds)
|
|
}
|
|
if cfg.TaskConfig.FullnessRatio != 0.92 {
|
|
t.Fatalf("expected fullness_ratio 0.92, got %v", cfg.TaskConfig.FullnessRatio)
|
|
}
|
|
if cfg.TaskConfig.MinSizeMB != 128 {
|
|
t.Fatalf("expected min_size_mb 128, got %d", cfg.TaskConfig.MinSizeMB)
|
|
}
|
|
}
|
|
|
|
func TestBuildErasureCodingProposal(t *testing.T) {
|
|
params := &worker_pb.TaskParams{
|
|
TaskId: "ec-task-1",
|
|
VolumeId: 99,
|
|
Collection: "c1",
|
|
Sources: []*worker_pb.TaskSource{
|
|
{
|
|
Node: "source-a:8080",
|
|
VolumeId: 99,
|
|
},
|
|
},
|
|
Targets: []*worker_pb.TaskTarget{
|
|
{
|
|
Node: "target-a:8080",
|
|
VolumeId: 99,
|
|
ShardIds: []uint32{0, 10},
|
|
},
|
|
{
|
|
Node: "target-b:8080",
|
|
VolumeId: 99,
|
|
ShardIds: []uint32{1, 11},
|
|
},
|
|
},
|
|
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
|
|
ErasureCodingParams: &worker_pb.ErasureCodingTaskParams{
|
|
DataShards: ecstorage.DataShardsCount,
|
|
ParityShards: ecstorage.ParityShardsCount,
|
|
},
|
|
},
|
|
}
|
|
result := &workertypes.TaskDetectionResult{
|
|
TaskID: "ec-task-1",
|
|
TaskType: workertypes.TaskTypeErasureCoding,
|
|
VolumeID: 99,
|
|
Server: "source-a",
|
|
Collection: "c1",
|
|
Priority: workertypes.TaskPriorityLow,
|
|
Reason: "quiet and full",
|
|
TypedParams: params,
|
|
}
|
|
|
|
proposal, err := buildErasureCodingProposal(result, "")
|
|
if err != nil {
|
|
t.Fatalf("buildErasureCodingProposal() err = %v", err)
|
|
}
|
|
if proposal.JobType != "erasure_coding" {
|
|
t.Fatalf("unexpected job type %q", proposal.JobType)
|
|
}
|
|
if proposal.Parameters["task_params_pb"] == nil {
|
|
t.Fatalf("expected serialized task params")
|
|
}
|
|
if proposal.Labels["source_node"] != "source-a:8080" {
|
|
t.Fatalf("unexpected source label %q", proposal.Labels["source_node"])
|
|
}
|
|
}
|
|
|
|
func TestErasureCodingHandlerRejectsUnsupportedJobType(t *testing.T) {
|
|
handler := NewErasureCodingHandler(nil, "")
|
|
err := handler.Detect(context.Background(), &plugin_pb.RunDetectionRequest{
|
|
JobType: "vacuum",
|
|
}, noopDetectionSender{})
|
|
if err == nil {
|
|
t.Fatalf("expected detect job type mismatch error")
|
|
}
|
|
|
|
err = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{
|
|
Job: &plugin_pb.JobSpec{JobId: "job-1", JobType: "vacuum"},
|
|
}, noopExecutionSender{})
|
|
if err == nil {
|
|
t.Fatalf("expected execute job type mismatch error")
|
|
}
|
|
}
|
|
|
|
func TestEmitErasureCodingDetectionDecisionTraceNoTasks(t *testing.T) {
|
|
sender := &recordingDetectionSender{}
|
|
config := NewDefaultConfig()
|
|
config.QuietForSeconds = 5 * 60
|
|
config.MinSizeMB = 30
|
|
config.FullnessRatio = 0.91
|
|
|
|
metrics := []*workertypes.VolumeHealthMetrics{
|
|
{
|
|
VolumeID: 20,
|
|
Size: 0,
|
|
Age: 218*time.Hour + 41*time.Minute,
|
|
FullnessRatio: 0,
|
|
},
|
|
{
|
|
VolumeID: 27,
|
|
Size: uint64(16 * 1024 * 1024 / 10),
|
|
Age: 91*time.Hour + time.Minute,
|
|
FullnessRatio: 0.002,
|
|
},
|
|
{
|
|
VolumeID: 12,
|
|
Size: 0,
|
|
Age: 219*time.Hour + 49*time.Minute,
|
|
FullnessRatio: 0,
|
|
},
|
|
}
|
|
|
|
if err := emitErasureCodingDetectionDecisionTrace(sender, metrics, config, nil, 0, false); err != nil {
|
|
t.Fatalf("emitErasureCodingDetectionDecisionTrace error: %v", err)
|
|
}
|
|
if len(sender.events) < 4 {
|
|
t.Fatalf("expected at least 4 detection events, got %d", len(sender.events))
|
|
}
|
|
|
|
if sender.events[0].Source != plugin_pb.ActivitySource_ACTIVITY_SOURCE_DETECTOR {
|
|
t.Fatalf("expected detector source, got %v", sender.events[0].Source)
|
|
}
|
|
if !strings.Contains(sender.events[0].Message, "EC detection: No tasks created for 3 volumes") {
|
|
t.Fatalf("unexpected summary message: %q", sender.events[0].Message)
|
|
}
|
|
if !strings.Contains(sender.events[1].Message, "ERASURE CODING: Volume 20: size=0.0MB") {
|
|
t.Fatalf("unexpected first detail message: %q", sender.events[1].Message)
|
|
}
|
|
}
|
|
|
|
func TestErasureCodingDescriptorOmitsLocalExecutionFields(t *testing.T) {
|
|
descriptor := NewErasureCodingHandler(nil, "").Descriptor()
|
|
if descriptor == nil || descriptor.WorkerConfigForm == nil {
|
|
t.Fatalf("expected worker config form in descriptor")
|
|
}
|
|
if workerConfigFormHasField(descriptor.WorkerConfigForm, "working_dir") {
|
|
t.Fatalf("unexpected working_dir in erasure coding worker config form")
|
|
}
|
|
if workerConfigFormHasField(descriptor.WorkerConfigForm, "cleanup_source") {
|
|
t.Fatalf("unexpected cleanup_source in erasure coding worker config form")
|
|
}
|
|
}
|
|
|
|
func TestApplyErasureCodingExecutionDefaultsForcesLocalFields(t *testing.T) {
|
|
baseWorkingDir := "/var/lib/seaweedfs-worker"
|
|
params := &worker_pb.TaskParams{
|
|
TaskId: "ec-test",
|
|
VolumeId: 100,
|
|
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
|
|
ErasureCodingParams: &worker_pb.ErasureCodingTaskParams{
|
|
DataShards: ecstorage.DataShardsCount,
|
|
ParityShards: ecstorage.ParityShardsCount,
|
|
WorkingDir: "/tmp/custom-from-job",
|
|
CleanupSource: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
applyErasureCodingExecutionDefaults(params, nil, baseWorkingDir)
|
|
|
|
ecParams := params.GetErasureCodingParams()
|
|
if ecParams == nil {
|
|
t.Fatalf("expected erasure coding params")
|
|
}
|
|
if ecParams.WorkingDir != defaultErasureCodingWorkingDir(baseWorkingDir) {
|
|
t.Fatalf("expected local working_dir %q, got %q", defaultErasureCodingWorkingDir(baseWorkingDir), ecParams.WorkingDir)
|
|
}
|
|
if !ecParams.CleanupSource {
|
|
t.Fatalf("expected cleanup_source true")
|
|
}
|
|
}
|
|
|
|
type noopDetectionSender struct{}
|
|
|
|
func (noopDetectionSender) SendProposals(*plugin_pb.DetectionProposals) error { return nil }
|
|
func (noopDetectionSender) SendComplete(*plugin_pb.DetectionComplete) error { return nil }
|
|
func (noopDetectionSender) SendActivity(*plugin_pb.ActivityEvent) error { return nil }
|
|
|
|
type noopExecutionSender struct{}
|
|
|
|
func (noopExecutionSender) SendProgress(*plugin_pb.JobProgressUpdate) error { return nil }
|
|
func (noopExecutionSender) SendCompleted(*plugin_pb.JobCompleted) error { return nil }
|
|
|
|
type recordingDetectionSender struct {
|
|
proposals *plugin_pb.DetectionProposals
|
|
complete *plugin_pb.DetectionComplete
|
|
events []*plugin_pb.ActivityEvent
|
|
}
|
|
|
|
func (r *recordingDetectionSender) SendProposals(proposals *plugin_pb.DetectionProposals) error {
|
|
r.proposals = proposals
|
|
return nil
|
|
}
|
|
|
|
func (r *recordingDetectionSender) SendComplete(complete *plugin_pb.DetectionComplete) error {
|
|
r.complete = complete
|
|
return nil
|
|
}
|
|
|
|
func (r *recordingDetectionSender) SendActivity(event *plugin_pb.ActivityEvent) error {
|
|
if event != nil {
|
|
r.events = append(r.events, event)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func workerConfigFormHasField(form *plugin_pb.ConfigForm, fieldName string) bool {
|
|
if form == nil {
|
|
return false
|
|
}
|
|
for _, section := range form.Sections {
|
|
if section == nil {
|
|
continue
|
|
}
|
|
for _, field := range section.Fields {
|
|
if field != nil && field.Name == fieldName {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|