Files
Chris Lu 284796c7b6 fix(ec): fence stale-worker EC shard cleanup by encode generation (#9953)
* feat(ec): add encode_ts_ns to the EC task params, shard-unmount, and shard-delete RPCs

The generation fence for stale EC-worker cleanup needs the encode
generation on three messages: ErasureCodingTaskParams (admin issues it),
VolumeEcShardsUnmountRequest, and VolumeEcShardsDeleteRequest (the worker
carries it to the volume server). Additive fields only; 0 preserves the
existing unfenced behavior. Mirror the two volume-server fields in the
Rust volume server's proto copy.

* feat(ec): issue the EC encode generation from the admin and carry it on the worker

Stamp each EC proposal's encode_ts_ns from the admin's per-cycle
DetectionSequence (a single-clock value) so generations are globally
ordered even though detection runs on a rotating worker. The worker
writes that generation into the distributed .vif and passes it on its
shard unmount/delete RPCs; it falls back to a local timestamp for the
.vif only on the unfenced legacy/shell path (keeping the read guard on).

* fix(ec): fence the stale-worker EC shard unmount and teardown by generation

A reaped-but-still-running EC worker's cleanupStaleEcShards issued a
generation-blind unmount + full teardown that could unmount and then
overwrite a newer run's live shards on a shared node. Both RPCs now
carry the encode generation: the volume server unmounts/deletes a disk
only when its .vif generation is strictly older than the request, and
preserves a same-or-newer generation, a generation-0 (recovered or
pre-upgrade) volume, and an unreadable .vif. Unload is per-disk, never
node-wide. Request generation 0 keeps the blanket teardown for the shell
pre-encode cleanup and pre-upgrade callers. Mirrored in the Rust volume
server.

* test(ec): cover the generation-fenced teardown and unmount

End-to-end volume-server tests: a fenced FullTeardown wipes a strictly-
older generation, preserves a newer one, preserves a generation-0 volume,
and blanket-wipes on request generation 0; the gen-aware unmount preserves
a same-or-newer mounted generation; and the .vif generation reader handles
present/absent/no-config cases.

* test(ec): pin the fenced .vif==teardown generation and the unreadable-.vif preserve

A fenced run must stamp the admin generation verbatim into the .vif so it
matches the generation sent on the teardown RPCs; add a regression test
that sets the task generation and asserts the .vif carries it exactly.
Also cover the present-but-unparseable .vif case (reads as generation 0,
preserved) and correct the readEcGenerationTsNs docstring accordingly.

* fix(ec): surface EC full-teardown filesystem errors in the Rust volume server

remove_ec_volume_files(_full_teardown) discarded every fs::remove_file
error, so a teardown that failed on permissions or a full disk still
returned full_teardown_done=true and left stale artifacts to collide with
the next encode. Return io::Result, ignore NotFound, propagate the first
real error, and have the teardown RPC surface it -- matching the Go
contract. The best-effort reconcile/load-cleanup callers keep ignoring it.

* refactor(ec): reuse the EC volume lookup on unmount and short-circuit the gen read

Address review: the Rust unmount fence reuses the ec_vol it already
fetched instead of a second find_ec_volume; the Go .vif generation reader
breaks out of the data/idx loop early when the two dirs are the same.
2026-06-14 01:54:04 -07:00

876 lines
28 KiB
Go

package erasure_coding
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
func init() {
pluginworker.RegisterHandler(pluginworker.HandlerFactory{
JobType: "erasure_coding",
Category: pluginworker.CategoryHeavy,
Aliases: []string{"erasure-coding", "erasure.coding", "ec"},
Build: func(opts pluginworker.HandlerBuildOptions) (pluginworker.JobHandler, error) {
return NewErasureCodingHandler(opts.GrpcDialOption, opts.WorkingDir), nil
},
})
}
type erasureCodingWorkerConfig struct {
TaskConfig *Config
}
// ErasureCodingHandler is the plugin job handler for erasure coding.
type ErasureCodingHandler struct {
grpcDialOption grpc.DialOption
workingDir string
}
func NewErasureCodingHandler(grpcDialOption grpc.DialOption, workingDir string) *ErasureCodingHandler {
return &ErasureCodingHandler{grpcDialOption: grpcDialOption, workingDir: strings.TrimSpace(workingDir)}
}
func (h *ErasureCodingHandler) Capability() *plugin_pb.JobTypeCapability {
return &plugin_pb.JobTypeCapability{
JobType: "erasure_coding",
CanDetect: true,
CanExecute: true,
MaxDetectionConcurrency: 1,
MaxExecutionConcurrency: 1,
DisplayName: "EC Encoding",
Description: "Converts full and quiet volumes into EC shards",
Weight: 80,
}
}
func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
return &plugin_pb.JobTypeDescriptor{
JobType: "erasure_coding",
DisplayName: "EC Encoding",
Description: "Detect and execute erasure coding for suitable volumes",
Icon: "fas fa-shield-alt",
DescriptorVersion: 1,
AdminConfigForm: &plugin_pb.ConfigForm{
FormId: "erasure-coding-admin",
Title: "Erasure Coding Admin Config",
Description: "Admin-side controls for erasure coding detection scope.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "scope",
Title: "Scope",
Description: "Optional filters applied before erasure coding detection.",
Fields: []*plugin_pb.ConfigField{
{
Name: "collection_filter",
Label: "Collection Filter",
Description: "Only detect erasure coding opportunities in this collection when set.",
Placeholder: "all collections",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"collection_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
WorkerConfigForm: &plugin_pb.ConfigForm{
FormId: "erasure-coding-worker",
Title: "Erasure Coding Worker Config",
Description: "Worker-side detection thresholds.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "thresholds",
Title: "Detection Thresholds",
Description: "Controls for when erasure coding jobs should be proposed.",
Fields: []*plugin_pb.ConfigField{
{
Name: "quiet_for_seconds",
Label: "Quiet Period (s)",
Description: "Volume must remain unmodified for at least this duration.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
{
Name: "fullness_ratio",
Label: "Fullness Ratio",
Description: "Minimum volume fullness ratio to trigger erasure coding.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_DOUBLE,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0}},
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 1}},
},
{
Name: "min_size_mb",
Label: "Minimum Volume Size (MB)",
Description: "Only volumes larger than this size are considered.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
},
{
Name: "preferred_tags",
Label: "Preferred Tags",
Description: "Comma-separated disk tags to prioritize for EC shard placement, ordered by preference.",
Placeholder: "fast,ssd",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "replica_placement",
Label: "Replica Placement",
Description: "EC shard placement (e.g. 020): 2nd/3rd digits cap shards per rack/node (best-effort during encode, enforced by rebalancing); the data-center digit is ignored. Empty uses the master default.",
Placeholder: "020",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3600},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.95},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"preferred_tags": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
"replica_placement": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
Enabled: true,
DetectionIntervalMinutes: 17,
DetectionTimeoutSeconds: 300,
MaxJobsPerDetection: 500,
GlobalExecutionConcurrency: 16,
PerWorkerExecutionConcurrency: 4,
RetryLimit: 1,
RetryBackoffSeconds: 30,
JobTypeMaxRuntimeSeconds: 1800,
ExecutionTimeoutSeconds: 1800,
},
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3600},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.95},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"preferred_tags": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
}
}
func (h *ErasureCodingHandler) Detect(
ctx context.Context,
request *plugin_pb.RunDetectionRequest,
sender pluginworker.DetectionSender,
) error {
if request == nil {
return fmt.Errorf("run detection request is nil")
}
if sender == nil {
return fmt.Errorf("detection sender is nil")
}
if request.JobType != "" && request.JobType != "erasure_coding" {
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.JobType)
}
workerConfig := deriveErasureCodingWorkerConfig(request.GetWorkerConfigValues())
collectionFilter := strings.TrimSpace(pluginworker.ReadStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
if collectionFilter != "" {
workerConfig.TaskConfig.CollectionFilter = collectionFilter
}
masters := make([]string, 0)
if request.ClusterContext != nil {
masters = append(masters, request.ClusterContext.MasterGrpcAddresses...)
}
metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter)
if err != nil {
return err
}
clusterInfo := &workertypes.ClusterInfo{
ActiveTopology: activeTopology,
GrpcDialOption: h.grpcDialOption,
DefaultReplicaPlacement: pluginworker.FetchDefaultReplicaPlacement(ctx, masters, h.grpcDialOption),
}
maxResults := int(request.MaxResults)
if maxResults < 0 {
maxResults = 0
}
results, hasMore, err := Detection(ctx, metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
if err != nil {
return err
}
// Stamp the admin-issued encode generation onto every EC proposal. DetectionSequence
// is minted once per cycle on the single admin clock, so generations are globally
// ordered even though detection runs on a rotating worker; this lets a stale worker's
// shard cleanup fence against a newer run instead of wiping it.
for _, result := range results {
if result == nil || result.TypedParams == nil {
continue
}
if ecp := result.TypedParams.GetErasureCodingParams(); ecp != nil {
ecp.EncodeTsNs = request.DetectionSequence
}
}
if traceErr := emitErasureCodingDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results, maxResults, hasMore); traceErr != nil {
glog.Warningf("Plugin worker failed to emit erasure_coding detection trace: %v", traceErr)
}
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
for _, result := range results {
proposal, proposalErr := buildErasureCodingProposal(result, h.workingDir)
if proposalErr != nil {
glog.Warningf("Plugin worker skip invalid erasure_coding proposal: %v", proposalErr)
continue
}
proposals = append(proposals, proposal)
}
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "erasure_coding",
Proposals: proposals,
HasMore: hasMore,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "erasure_coding",
Success: true,
TotalProposals: int32(len(proposals)),
})
}
func emitErasureCodingDetectionDecisionTrace(
sender pluginworker.DetectionSender,
metrics []*workertypes.VolumeHealthMetrics,
taskConfig *Config,
results []*workertypes.TaskDetectionResult,
maxResults int,
hasMore bool,
) error {
if sender == nil || taskConfig == nil {
return nil
}
quietThreshold := time.Duration(taskConfig.QuietForSeconds) * time.Second
minSizeBytes := uint64(taskConfig.MinSizeMB) * 1024 * 1024
allowedCollections := wildcard.CompileWildcardMatchers(taskConfig.CollectionFilter)
volumeGroups := make(map[uint32][]*workertypes.VolumeHealthMetrics)
for _, metric := range metrics {
if metric == nil {
continue
}
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
}
skippedAlreadyEC := 0
skippedTooSmall := 0
skippedCollectionFilter := 0
skippedQuietTime := 0
skippedFullness := 0
for _, groupMetrics := range volumeGroups {
if len(groupMetrics) == 0 {
continue
}
metric := groupMetrics[0]
for _, candidate := range groupMetrics {
if candidate != nil && candidate.Server < metric.Server {
metric = candidate
}
}
if metric == nil {
continue
}
if metric.IsECVolume {
skippedAlreadyEC++
continue
}
if metric.Size < minSizeBytes {
skippedTooSmall++
continue
}
if len(allowedCollections) > 0 && !wildcard.MatchesAnyWildcard(allowedCollections, metric.Collection) {
skippedCollectionFilter++
continue
}
if metric.Age < quietThreshold {
skippedQuietTime++
continue
}
if metric.FullnessRatio < taskConfig.FullnessRatio {
skippedFullness++
continue
}
}
totalVolumes := len(metrics)
summarySuffix := ""
if hasMore {
summarySuffix = fmt.Sprintf(" (max_results=%d reached; remaining volumes not evaluated)", maxResults)
}
summaryMessage := ""
if len(results) == 0 {
summaryMessage = fmt.Sprintf(
"EC detection: No tasks created for %d volumes%s (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
totalVolumes,
summarySuffix,
skippedAlreadyEC,
skippedTooSmall,
skippedCollectionFilter,
skippedQuietTime,
skippedFullness,
)
} else {
summaryMessage = fmt.Sprintf(
"EC detection: Created %d task(s)%s from %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
len(results),
summarySuffix,
totalVolumes,
skippedAlreadyEC,
skippedTooSmall,
skippedCollectionFilter,
skippedQuietTime,
skippedFullness,
)
}
if err := sender.SendActivity(pluginworker.BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{
"total_volumes": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)},
},
"selected_tasks": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))},
},
"max_results": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(maxResults)},
},
"has_more": {
Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: hasMore},
},
"skipped_already_ec": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedAlreadyEC)},
},
"skipped_too_small": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedTooSmall)},
},
"skipped_filtered": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedCollectionFilter)},
},
"skipped_not_quiet": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedQuietTime)},
},
"skipped_not_full": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedFullness)},
},
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
},
"fullness_threshold_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
},
})); err != nil {
return err
}
detailsEmitted := 0
for _, metric := range metrics {
if metric == nil || metric.IsECVolume {
continue
}
sizeMB := float64(metric.Size) / (1024 * 1024)
message := fmt.Sprintf(
"ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
metric.VolumeID,
sizeMB,
taskConfig.MinSizeMB,
metric.Age.Truncate(time.Minute),
quietThreshold.Truncate(time.Minute),
metric.FullnessRatio*100,
taskConfig.FullnessRatio*100,
)
if err := sender.SendActivity(pluginworker.BuildDetectorActivity("decision_volume", message, map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.VolumeID)},
},
"size_mb": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: sizeMB},
},
"required_min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
},
"age_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.Age.Seconds())},
},
"required_quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
},
"fullness_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: metric.FullnessRatio * 100},
},
"required_fullness_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
},
})); err != nil {
return err
}
detailsEmitted++
if detailsEmitted >= 3 {
break
}
}
return nil
}
func (h *ErasureCodingHandler) Execute(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
sender pluginworker.ExecutionSender,
) error {
if request == nil || request.Job == nil {
return fmt.Errorf("execute request/job is nil")
}
if sender == nil {
return fmt.Errorf("execution sender is nil")
}
if request.Job.JobType != "" && request.Job.JobType != "erasure_coding" {
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.Job.JobType)
}
params, err := decodeErasureCodingTaskParams(request.Job)
if err != nil {
return err
}
applyErasureCodingExecutionDefaults(params, request.GetClusterContext(), h.workingDir)
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
return fmt.Errorf("erasure coding source node is required")
}
if len(params.Targets) == 0 {
return fmt.Errorf("erasure coding targets are required")
}
task := NewErasureCodingTask(
request.Job.JobId,
params.Sources[0].Node,
params.VolumeId,
params.Collection,
h.grpcDialOption,
)
execCtx, execCancel := context.WithCancel(ctx)
defer execCancel()
task.SetProgressCallback(func(progress float64, stage string) {
message := fmt.Sprintf("erasure coding progress %.0f%%", progress)
if strings.TrimSpace(stage) != "" {
message = stage
}
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: progress,
Stage: stage,
Message: message,
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity(stage, message),
},
}); err != nil {
execCancel()
}
})
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
ProgressPercent: 0,
Stage: "assigned",
Message: "erasure coding job accepted",
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity("assigned", "erasure coding job accepted"),
},
}); err != nil {
return err
}
if err := task.Execute(execCtx, params); err != nil {
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_FAILED,
ProgressPercent: 100,
Stage: "failed",
Message: err.Error(),
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity("failed", err.Error()),
},
})
return err
}
sourceNode := params.Sources[0].Node
resultSummary := fmt.Sprintf("erasure coding completed for volume %d across %d targets", params.VolumeId, len(params.Targets))
return sender.SendCompleted(&plugin_pb.JobCompleted{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
Success: true,
Result: &plugin_pb.JobResult{
Summary: resultSummary,
OutputValues: map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(params.VolumeId)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"target_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
},
},
},
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity("completed", resultSummary),
},
})
}
func (h *ErasureCodingHandler) collectVolumeMetrics(
ctx context.Context,
masterAddresses []string,
collectionFilter string,
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) {
metrics, activeTopology, _, err := pluginworker.CollectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption)
return metrics, activeTopology, err
}
func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *erasureCodingWorkerConfig {
taskConfig := NewDefaultConfig()
quietForSeconds := pluginworker.ReadIntConfig(values, "quiet_for_seconds", taskConfig.QuietForSeconds)
if quietForSeconds < 0 {
quietForSeconds = 0
}
taskConfig.QuietForSeconds = quietForSeconds
fullnessRatio := pluginworker.ReadDoubleConfig(values, "fullness_ratio", taskConfig.FullnessRatio)
if fullnessRatio < 0 {
fullnessRatio = 0
}
if fullnessRatio > 1 {
fullnessRatio = 1
}
taskConfig.FullnessRatio = fullnessRatio
minSizeMB := pluginworker.ReadIntConfig(values, "min_size_mb", taskConfig.MinSizeMB)
if minSizeMB < 1 {
minSizeMB = 1
}
taskConfig.MinSizeMB = minSizeMB
taskConfig.PreferredTags = util.NormalizeTagList(pluginworker.ReadStringListConfig(values, "preferred_tags"))
taskConfig.ReplicaPlacement = strings.TrimSpace(pluginworker.ReadStringConfig(values, "replica_placement", taskConfig.ReplicaPlacement))
return &erasureCodingWorkerConfig{
TaskConfig: taskConfig,
}
}
func buildErasureCodingProposal(
result *workertypes.TaskDetectionResult,
baseWorkingDir string,
) (*plugin_pb.JobProposal, error) {
if result == nil {
return nil, fmt.Errorf("task detection result is nil")
}
if result.TypedParams == nil {
return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID)
}
params := proto.Clone(result.TypedParams).(*worker_pb.TaskParams)
applyErasureCodingExecutionDefaults(params, nil, baseWorkingDir)
paramsPayload, err := proto.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal task params: %w", err)
}
proposalID := strings.TrimSpace(result.TaskID)
if proposalID == "" {
proposalID = fmt.Sprintf("erasure-coding-%d-%d", result.VolumeID, time.Now().UnixNano())
}
dedupeKey := fmt.Sprintf("erasure_coding:%d", result.VolumeID)
if result.Collection != "" {
dedupeKey += ":" + result.Collection
}
sourceNode := ""
if len(params.Sources) > 0 {
sourceNode = strings.TrimSpace(params.Sources[0].Node)
}
summary := fmt.Sprintf("Erasure code volume %d", result.VolumeID)
if sourceNode != "" {
summary = fmt.Sprintf("Erasure code volume %d from %s", result.VolumeID, sourceNode)
}
// EC encoding reads the full volume, computes shards, and writes 14
// shards out to target nodes. Budget 10 min/GB (roughly 2x a plain copy)
// so the scheduler grants a deadline scaled to volume size.
volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1
estimatedRuntimeSeconds := volumeSizeGB * 10 * 60
return &plugin_pb.JobProposal{
ProposalId: proposalID,
DedupeKey: dedupeKey,
JobType: "erasure_coding",
Priority: pluginworker.MapTaskPriority(result.Priority),
Summary: summary,
Detail: strings.TrimSpace(result.Reason),
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramsPayload},
},
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(result.VolumeID)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"collection": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
},
"target_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
},
"estimated_runtime_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
},
},
Labels: map[string]string{
"task_type": "erasure_coding",
"volume_id": fmt.Sprintf("%d", result.VolumeID),
"collection": result.Collection,
"source_node": sourceNode,
"target_count": fmt.Sprintf("%d", len(params.Targets)),
},
}, nil
}
func decodeErasureCodingTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) {
if job == nil {
return nil, fmt.Errorf("job spec is nil")
}
if payload := pluginworker.ReadBytesConfig(job.Parameters, "task_params_pb"); len(payload) > 0 {
params := &worker_pb.TaskParams{}
if err := proto.Unmarshal(payload, params); err != nil {
return nil, fmt.Errorf("unmarshal task_params_pb: %w", err)
}
if params.TaskId == "" {
params.TaskId = job.JobId
}
return params, nil
}
volumeID := pluginworker.ReadUint32Config(job.Parameters, "volume_id", 0)
sourceNode := strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "source_server", ""))
if sourceNode == "" {
sourceNode = strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "server", ""))
}
targetServers := pluginworker.ReadStringListConfig(job.Parameters, "target_servers")
if len(targetServers) == 0 {
targetServers = pluginworker.ReadStringListConfig(job.Parameters, "targets")
}
collection := pluginworker.ReadStringConfig(job.Parameters, "collection", "")
dataShards := pluginworker.ReadInt32Config(job.Parameters, "data_shards", int32(ecstorage.DataShardsCount))
if dataShards <= 0 {
dataShards = int32(ecstorage.DataShardsCount)
}
parityShards := pluginworker.ReadInt32Config(job.Parameters, "parity_shards", int32(ecstorage.ParityShardsCount))
if parityShards <= 0 {
parityShards = int32(ecstorage.ParityShardsCount)
}
sourceDiskType := strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "source_disk_type", ""))
totalShards := int(dataShards + parityShards)
if volumeID == 0 {
return nil, fmt.Errorf("missing volume_id in job parameters")
}
if sourceNode == "" {
return nil, fmt.Errorf("missing source_server in job parameters")
}
if len(targetServers) == 0 {
return nil, fmt.Errorf("missing target_servers in job parameters")
}
if len(targetServers) < totalShards {
return nil, fmt.Errorf("insufficient target_servers: got %d, need at least %d", len(targetServers), totalShards)
}
shardAssignments := assignECShardIDs(totalShards, len(targetServers))
targets := make([]*worker_pb.TaskTarget, 0, len(targetServers))
for i := 0; i < len(targetServers); i++ {
targetNode := strings.TrimSpace(targetServers[i])
if targetNode == "" {
continue
}
targets = append(targets, &worker_pb.TaskTarget{
Node: targetNode,
VolumeId: volumeID,
ShardIds: shardAssignments[i],
})
}
if len(targets) < totalShards {
return nil, fmt.Errorf("insufficient non-empty target_servers after normalization: got %d, need at least %d", len(targets), totalShards)
}
return &worker_pb.TaskParams{
TaskId: job.JobId,
VolumeId: volumeID,
Collection: collection,
Sources: []*worker_pb.TaskSource{
{
Node: sourceNode,
VolumeId: volumeID,
},
},
Targets: targets,
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
ErasureCodingParams: &worker_pb.ErasureCodingTaskParams{
DataShards: dataShards,
ParityShards: parityShards,
SourceDiskType: sourceDiskType,
},
},
}, nil
}
func applyErasureCodingExecutionDefaults(
params *worker_pb.TaskParams,
clusterContext *plugin_pb.ClusterContext,
baseWorkingDir string,
) {
if params == nil {
return
}
ecParams := params.GetErasureCodingParams()
if ecParams == nil {
ecParams = &worker_pb.ErasureCodingTaskParams{
DataShards: ecstorage.DataShardsCount,
ParityShards: ecstorage.ParityShardsCount,
}
params.TaskParams = &worker_pb.TaskParams_ErasureCodingParams{ErasureCodingParams: ecParams}
}
if ecParams.DataShards <= 0 {
ecParams.DataShards = ecstorage.DataShardsCount
}
if ecParams.ParityShards <= 0 {
ecParams.ParityShards = ecstorage.ParityShardsCount
}
ecParams.WorkingDir = defaultErasureCodingWorkingDir(baseWorkingDir)
ecParams.CleanupSource = true
if strings.TrimSpace(ecParams.MasterClient) == "" && clusterContext != nil && len(clusterContext.MasterGrpcAddresses) > 0 {
ecParams.MasterClient = clusterContext.MasterGrpcAddresses[0]
}
totalShards := int(ecParams.DataShards + ecParams.ParityShards)
if totalShards <= 0 {
totalShards = ecstorage.TotalShardsCount
}
needsShardAssignment := false
for _, target := range params.Targets {
if target == nil || len(target.ShardIds) == 0 {
needsShardAssignment = true
break
}
}
if needsShardAssignment && len(params.Targets) > 0 {
assignments := assignECShardIDs(totalShards, len(params.Targets))
for i := 0; i < len(params.Targets); i++ {
if params.Targets[i] == nil {
continue
}
if len(params.Targets[i].ShardIds) == 0 {
params.Targets[i].ShardIds = assignments[i]
}
}
}
}
func assignECShardIDs(totalShards int, targetCount int) [][]uint32 {
if targetCount <= 0 {
return nil
}
if totalShards <= 0 {
totalShards = ecstorage.TotalShardsCount
}
assignments := make([][]uint32, targetCount)
for i := 0; i < totalShards; i++ {
targetIndex := i % targetCount
assignments[targetIndex] = append(assignments[targetIndex], uint32(i))
}
return assignments
}
func defaultErasureCodingWorkingDir(baseWorkingDir string) string {
dir := strings.TrimSpace(baseWorkingDir)
if dir == "" {
return filepath.Join(".", "erasure_coding")
}
return filepath.Join(dir, "erasure_coding")
}