diff --git a/weed/worker/tasks/erasure_coding/ec_task.go b/weed/worker/tasks/erasure_coding/ec_task.go index cb2304bef..bcc7bb880 100644 --- a/weed/worker/tasks/erasure_coding/ec_task.go +++ b/weed/worker/tasks/erasure_coding/ec_task.go @@ -45,6 +45,11 @@ type ErasureCodingTask struct { sources []*worker_pb.TaskSource // Unified sources for cleanup shardAssignment map[string][]string // destination -> assigned shard types readonlyReplicas []pb.ServerAddress // replicas marked readonly, for rollback + + // Replica servers whose original volume was an empty stub, deleted in the + // pre-distribute sweep. deleteOriginalVolume skips these so it does not + // re-delete and remove the now-EC .vif those servers share. + emptyReplicasDeleted map[string]bool } // NewErasureCodingTask creates a new unified EC task instance @@ -200,6 +205,18 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP return fmt.Errorf("failed to clear stale EC shards on destinations: %v", err) } + // Delete 0-byte stub replicas left by an interrupted encode before the new + // EC files land. A stub shares the _.vif path the EC + // volume will use; deleting it after distribute (in deleteOriginalVolume) + // would remove that .vif and damage the freshly written shards. OnlyEmpty + // keeps data-bearing replicas, which are deleted later after verify. + t.ReportProgressWithStage(57.0, "Removing empty stub replicas") + t.GetLogger().Info("Removing empty stub replicas before distribute") + if err := t.sweepEmptyReplicas(ctx); err != nil { + t.rollbackReadonly(ctx) + return fmt.Errorf("failed to remove empty stub replicas: %w", err) + } + // Step 4: Distribute shards to destinations t.ReportProgressWithStage(60.0, "Distributing EC shards to destinations") t.GetLogger().Info("Distributing EC shards to destinations") @@ -675,6 +692,14 @@ func (t *ErasureCodingTask) deleteOriginalVolume(ctx context.Context) error { replicas = []string{t.server} } + // Empty stub replicas were already removed before distribute; skip them so + // VolumeDelete does not run on a server that now holds only EC shards. + replicas = replicasPendingDelete(replicas, t.emptyReplicasDeleted) + if len(replicas) == 0 { + glog.V(0).Infof("EC volume %d: all original replicas were empty stubs removed before distribute", t.volumeID) + return nil + } + t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, "replica_count": len(replicas), @@ -762,6 +787,67 @@ func (t *ErasureCodingTask) getReplicas() []string { return replicas } +// sweepEmptyReplicas deletes any original replica that is an empty 0-byte stub +// (OnlyEmpty so a data-bearing replica is refused and kept for the post-verify +// delete). Run before distribute: a stub shares the _.vif the +// EC volume reuses, so removing it afterwards would strip that .vif. Servers +// whose stub was deleted are recorded so deleteOriginalVolume skips them. +// +// A refusal (volume not empty) or an already-gone volume is expected and left +// for the later delete. Any other error means the node's state is unknown; we +// fail rather than proceed to distribute and a force-delete that could strip a +// shared .vif. +func (t *ErasureCodingTask) sweepEmptyReplicas(ctx context.Context) error { + for _, node := range t.getReplicas() { + err := operation.WithVolumeServerClient(false, pb.ServerAddress(node), t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + _, e := client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ + VolumeId: t.volumeID, + OnlyEmpty: true, + }) + return e + }) + switch { + case err == nil: + if t.emptyReplicasDeleted == nil { + t.emptyReplicasDeleted = make(map[string]bool) + } + t.emptyReplicasDeleted[node] = true + glog.V(0).Infof("EC volume %d: removed empty stub replica on %s before distribute", t.volumeID, node) + case isExpectedSweepSkip(err): + glog.V(1).Infof("EC volume %d: empty-replica sweep left %s in place: %v", t.volumeID, node, err) + default: + return fmt.Errorf("empty-replica sweep on %s: %w", node, err) + } + } + return nil +} + +// isExpectedSweepSkip reports whether a VolumeDelete(OnlyEmpty) error is the +// expected leave-in-place case: the replica still holds data (refused) or no +// longer exists. Other errors (e.g. an unreachable node) leave its state +// unknown and must not be swallowed. +func isExpectedSweepSkip(err error) bool { + s := err.Error() + return strings.Contains(s, "volume not empty") || strings.Contains(s, "not found") +} + +// replicasPendingDelete returns replicas not already removed by the +// pre-distribute empty-stub sweep. +func replicasPendingDelete(replicas []string, alreadyDeleted map[string]bool) []string { + if len(alreadyDeleted) == 0 { + return replicas + } + pending := make([]string, 0, len(replicas)) + for _, r := range replicas { + if alreadyDeleted[r] { + continue + } + pending = append(pending, r) + } + return pending +} + // cleanupStaleEcShards unmounts and deletes any EC shards still mounted on // destinations from a previous failed encode of this volume. Targets every // node we plan to write to (t.targets) plus every node detection saw EC diff --git a/weed/worker/tasks/erasure_coding/ec_task_empty_replica_sweep_test.go b/weed/worker/tasks/erasure_coding/ec_task_empty_replica_sweep_test.go new file mode 100644 index 000000000..a579022c1 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/ec_task_empty_replica_sweep_test.go @@ -0,0 +1,95 @@ +package erasure_coding + +import ( + "context" + "errors" + "net/http" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestReplicasPendingDelete(t *testing.T) { + replicas := []string{"a:8080", "b:8080", "c:8080"} + + require.Equal(t, replicas, replicasPendingDelete(replicas, nil), + "nil swept set leaves the list unchanged") + require.ElementsMatch(t, []string{"a:8080", "c:8080"}, + replicasPendingDelete(replicas, map[string]bool{"b:8080": true}), + "swept node is excluded") + require.Empty(t, + replicasPendingDelete(replicas, map[string]bool{"a:8080": true, "b:8080": true, "c:8080": true}), + "all swept yields nothing left to delete") +} + +// The sweep leaves a node in place only when VolumeDelete(OnlyEmpty) reports the +// volume holds data or no longer exists. An unreachable node leaves its state +// unknown and must propagate so the encode fails rather than force-deleting it +// later over the shared .vif. +func TestIsExpectedSweepSkip(t *testing.T) { + require.True(t, isExpectedSweepSkip(errors.New("DeleteVolume 5: volume not empty"))) + require.True(t, isExpectedSweepSkip(errors.New("volume 5 not found on disk"))) + require.False(t, isExpectedSweepSkip(errors.New("rpc error: code = Unavailable desc = connection refused"))) + require.False(t, isExpectedSweepSkip(errors.New("context deadline exceeded"))) +} + +// The pre-distribute sweep must delete a 0-byte stub replica (so its shared +// _.vif can't be clobbered by a later original-delete and +// can't shadow the incoming EC files) while leaving a data-bearing replica +// untouched — VolumeDelete(OnlyEmpty=true) is the guard. A data-bearing +// replica is deleted later, only after the EC shard set is verified. +func TestSweepEmptyReplicasDeletesStubKeepsData(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + clusterHarness := framework.StartVolumeCluster(t, matrix.P1()) + conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress()) + defer conn.Close() + + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + server := clusterHarness.VolumeServerAddress() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // An empty allocated volume is a superblock-only stub. + const stubVol = uint32(9479) + framework.AllocateVolume(t, grpcClient, stubVol, "ec-stub") + + stubTask := newSweepTaskForTest(server, stubVol, "ec-stub", dialOption) + require.NoError(t, stubTask.sweepEmptyReplicas(ctx)) + + require.True(t, stubTask.emptyReplicasDeleted[server], "stub server should be recorded as swept") + _, err := grpcClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{VolumeId: stubVol}) + require.Error(t, err, "empty stub volume must be deleted by the sweep") + + // A volume holding data must survive the sweep. + const dataVol = uint32(9480) + framework.AllocateVolume(t, grpcClient, dataVol, "ec-data") + httpClient := framework.NewHTTPClient() + fid := framework.NewFileID(dataVol, 948000, 0x9480CAFE) + upResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid, []byte("real-data-keep-me")) + _ = framework.ReadAllAndClose(t, upResp) + require.Equal(t, http.StatusCreated, upResp.StatusCode) + + dataTask := newSweepTaskForTest(server, dataVol, "ec-data", dialOption) + require.NoError(t, dataTask.sweepEmptyReplicas(ctx), "a not-empty refusal is expected, not an error") + + require.False(t, dataTask.emptyReplicasDeleted[server], "data-bearing server must not be recorded as swept") + _, err = grpcClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{VolumeId: dataVol}) + require.NoError(t, err, "data-bearing volume must survive the sweep") +} + +func newSweepTaskForTest(server string, volumeID uint32, collection string, dialOption grpc.DialOption) *ErasureCodingTask { + task := NewErasureCodingTask("sweep-"+collection, server, volumeID, collection, dialOption) + task.sources = []*worker_pb.TaskSource{{Node: server, VolumeId: volumeID}} + return task +}