mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(ec): delete empty stub replicas before distributing EC shards (#9722)
* fix(ec): delete empty stub replicas before distributing EC shards An interrupted encode can leave a 0-byte .dat replica behind. Until now the only thing that removed it was deleteOriginalVolume, which runs after distribute+mount and calls VolumeDelete -> removeVolumeFiles. A regular volume and an EC volume share the same <collection>_<vid>.vif path, so deleting the stub at that point strips the .vif out from under the freshly distributed shards. Sweep the original replicas with VolumeDelete(OnlyEmpty=true) before distribute: doIsEmpty uses the same superblock threshold, so only the 0-byte stubs go and any data-bearing replica is refused and kept for the post-verify delete. Servers cleared in the sweep are skipped by deleteOriginalVolume so it never touches a server that now holds only EC shards. * fix(ec): fail the encode when an empty-replica sweep can't confirm a node The sweep swallowed every VolumeDelete(OnlyEmpty) error, so a transient failure on a stub node fell through to the post-verify force-delete on that node — the shared-.vif clobber the sweep exists to avoid. Treat only the expected cases (volume not empty, or already gone) as leave-in-place; any other error propagates and fails the encode, which rolls back the readonly marks and retries next cycle.
This commit is contained in:
@@ -45,6 +45,11 @@ type ErasureCodingTask struct {
|
||||
sources []*worker_pb.TaskSource // Unified sources for cleanup
|
||||
shardAssignment map[string][]string // destination -> assigned shard types
|
||||
readonlyReplicas []pb.ServerAddress // replicas marked readonly, for rollback
|
||||
|
||||
// Replica servers whose original volume was an empty stub, deleted in the
|
||||
// pre-distribute sweep. deleteOriginalVolume skips these so it does not
|
||||
// re-delete and remove the now-EC .vif those servers share.
|
||||
emptyReplicasDeleted map[string]bool
|
||||
}
|
||||
|
||||
// NewErasureCodingTask creates a new unified EC task instance
|
||||
@@ -200,6 +205,18 @@ func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskP
|
||||
return fmt.Errorf("failed to clear stale EC shards on destinations: %v", err)
|
||||
}
|
||||
|
||||
// Delete 0-byte stub replicas left by an interrupted encode before the new
|
||||
// EC files land. A stub shares the <collection>_<vid>.vif path the EC
|
||||
// volume will use; deleting it after distribute (in deleteOriginalVolume)
|
||||
// would remove that .vif and damage the freshly written shards. OnlyEmpty
|
||||
// keeps data-bearing replicas, which are deleted later after verify.
|
||||
t.ReportProgressWithStage(57.0, "Removing empty stub replicas")
|
||||
t.GetLogger().Info("Removing empty stub replicas before distribute")
|
||||
if err := t.sweepEmptyReplicas(ctx); err != nil {
|
||||
t.rollbackReadonly(ctx)
|
||||
return fmt.Errorf("failed to remove empty stub replicas: %w", err)
|
||||
}
|
||||
|
||||
// Step 4: Distribute shards to destinations
|
||||
t.ReportProgressWithStage(60.0, "Distributing EC shards to destinations")
|
||||
t.GetLogger().Info("Distributing EC shards to destinations")
|
||||
@@ -675,6 +692,14 @@ func (t *ErasureCodingTask) deleteOriginalVolume(ctx context.Context) error {
|
||||
replicas = []string{t.server}
|
||||
}
|
||||
|
||||
// Empty stub replicas were already removed before distribute; skip them so
|
||||
// VolumeDelete does not run on a server that now holds only EC shards.
|
||||
replicas = replicasPendingDelete(replicas, t.emptyReplicasDeleted)
|
||||
if len(replicas) == 0 {
|
||||
glog.V(0).Infof("EC volume %d: all original replicas were empty stubs removed before distribute", t.volumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
t.GetLogger().WithFields(map[string]interface{}{
|
||||
"volume_id": t.volumeID,
|
||||
"replica_count": len(replicas),
|
||||
@@ -762,6 +787,67 @@ func (t *ErasureCodingTask) getReplicas() []string {
|
||||
return replicas
|
||||
}
|
||||
|
||||
// sweepEmptyReplicas deletes any original replica that is an empty 0-byte stub
|
||||
// (OnlyEmpty so a data-bearing replica is refused and kept for the post-verify
|
||||
// delete). Run before distribute: a stub shares the <collection>_<vid>.vif the
|
||||
// EC volume reuses, so removing it afterwards would strip that .vif. Servers
|
||||
// whose stub was deleted are recorded so deleteOriginalVolume skips them.
|
||||
//
|
||||
// A refusal (volume not empty) or an already-gone volume is expected and left
|
||||
// for the later delete. Any other error means the node's state is unknown; we
|
||||
// fail rather than proceed to distribute and a force-delete that could strip a
|
||||
// shared .vif.
|
||||
func (t *ErasureCodingTask) sweepEmptyReplicas(ctx context.Context) error {
|
||||
for _, node := range t.getReplicas() {
|
||||
err := operation.WithVolumeServerClient(false, pb.ServerAddress(node), t.grpcDialOption,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, e := client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
|
||||
VolumeId: t.volumeID,
|
||||
OnlyEmpty: true,
|
||||
})
|
||||
return e
|
||||
})
|
||||
switch {
|
||||
case err == nil:
|
||||
if t.emptyReplicasDeleted == nil {
|
||||
t.emptyReplicasDeleted = make(map[string]bool)
|
||||
}
|
||||
t.emptyReplicasDeleted[node] = true
|
||||
glog.V(0).Infof("EC volume %d: removed empty stub replica on %s before distribute", t.volumeID, node)
|
||||
case isExpectedSweepSkip(err):
|
||||
glog.V(1).Infof("EC volume %d: empty-replica sweep left %s in place: %v", t.volumeID, node, err)
|
||||
default:
|
||||
return fmt.Errorf("empty-replica sweep on %s: %w", node, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// isExpectedSweepSkip reports whether a VolumeDelete(OnlyEmpty) error is the
|
||||
// expected leave-in-place case: the replica still holds data (refused) or no
|
||||
// longer exists. Other errors (e.g. an unreachable node) leave its state
|
||||
// unknown and must not be swallowed.
|
||||
func isExpectedSweepSkip(err error) bool {
|
||||
s := err.Error()
|
||||
return strings.Contains(s, "volume not empty") || strings.Contains(s, "not found")
|
||||
}
|
||||
|
||||
// replicasPendingDelete returns replicas not already removed by the
|
||||
// pre-distribute empty-stub sweep.
|
||||
func replicasPendingDelete(replicas []string, alreadyDeleted map[string]bool) []string {
|
||||
if len(alreadyDeleted) == 0 {
|
||||
return replicas
|
||||
}
|
||||
pending := make([]string, 0, len(replicas))
|
||||
for _, r := range replicas {
|
||||
if alreadyDeleted[r] {
|
||||
continue
|
||||
}
|
||||
pending = append(pending, r)
|
||||
}
|
||||
return pending
|
||||
}
|
||||
|
||||
// cleanupStaleEcShards unmounts and deletes any EC shards still mounted on
|
||||
// destinations from a previous failed encode of this volume. Targets every
|
||||
// node we plan to write to (t.targets) plus every node detection saw EC
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
package erasure_coding
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
|
||||
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
func TestReplicasPendingDelete(t *testing.T) {
|
||||
replicas := []string{"a:8080", "b:8080", "c:8080"}
|
||||
|
||||
require.Equal(t, replicas, replicasPendingDelete(replicas, nil),
|
||||
"nil swept set leaves the list unchanged")
|
||||
require.ElementsMatch(t, []string{"a:8080", "c:8080"},
|
||||
replicasPendingDelete(replicas, map[string]bool{"b:8080": true}),
|
||||
"swept node is excluded")
|
||||
require.Empty(t,
|
||||
replicasPendingDelete(replicas, map[string]bool{"a:8080": true, "b:8080": true, "c:8080": true}),
|
||||
"all swept yields nothing left to delete")
|
||||
}
|
||||
|
||||
// The sweep leaves a node in place only when VolumeDelete(OnlyEmpty) reports the
|
||||
// volume holds data or no longer exists. An unreachable node leaves its state
|
||||
// unknown and must propagate so the encode fails rather than force-deleting it
|
||||
// later over the shared .vif.
|
||||
func TestIsExpectedSweepSkip(t *testing.T) {
|
||||
require.True(t, isExpectedSweepSkip(errors.New("DeleteVolume 5: volume not empty")))
|
||||
require.True(t, isExpectedSweepSkip(errors.New("volume 5 not found on disk")))
|
||||
require.False(t, isExpectedSweepSkip(errors.New("rpc error: code = Unavailable desc = connection refused")))
|
||||
require.False(t, isExpectedSweepSkip(errors.New("context deadline exceeded")))
|
||||
}
|
||||
|
||||
// The pre-distribute sweep must delete a 0-byte stub replica (so its shared
|
||||
// <collection>_<vid>.vif can't be clobbered by a later original-delete and
|
||||
// can't shadow the incoming EC files) while leaving a data-bearing replica
|
||||
// untouched — VolumeDelete(OnlyEmpty=true) is the guard. A data-bearing
|
||||
// replica is deleted later, only after the EC shard set is verified.
|
||||
func TestSweepEmptyReplicasDeletesStubKeepsData(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
clusterHarness := framework.StartVolumeCluster(t, matrix.P1())
|
||||
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
|
||||
defer conn.Close()
|
||||
|
||||
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||
server := clusterHarness.VolumeServerAddress()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// An empty allocated volume is a superblock-only stub.
|
||||
const stubVol = uint32(9479)
|
||||
framework.AllocateVolume(t, grpcClient, stubVol, "ec-stub")
|
||||
|
||||
stubTask := newSweepTaskForTest(server, stubVol, "ec-stub", dialOption)
|
||||
require.NoError(t, stubTask.sweepEmptyReplicas(ctx))
|
||||
|
||||
require.True(t, stubTask.emptyReplicasDeleted[server], "stub server should be recorded as swept")
|
||||
_, err := grpcClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{VolumeId: stubVol})
|
||||
require.Error(t, err, "empty stub volume must be deleted by the sweep")
|
||||
|
||||
// A volume holding data must survive the sweep.
|
||||
const dataVol = uint32(9480)
|
||||
framework.AllocateVolume(t, grpcClient, dataVol, "ec-data")
|
||||
httpClient := framework.NewHTTPClient()
|
||||
fid := framework.NewFileID(dataVol, 948000, 0x9480CAFE)
|
||||
upResp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), fid, []byte("real-data-keep-me"))
|
||||
_ = framework.ReadAllAndClose(t, upResp)
|
||||
require.Equal(t, http.StatusCreated, upResp.StatusCode)
|
||||
|
||||
dataTask := newSweepTaskForTest(server, dataVol, "ec-data", dialOption)
|
||||
require.NoError(t, dataTask.sweepEmptyReplicas(ctx), "a not-empty refusal is expected, not an error")
|
||||
|
||||
require.False(t, dataTask.emptyReplicasDeleted[server], "data-bearing server must not be recorded as swept")
|
||||
_, err = grpcClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{VolumeId: dataVol})
|
||||
require.NoError(t, err, "data-bearing volume must survive the sweep")
|
||||
}
|
||||
|
||||
func newSweepTaskForTest(server string, volumeID uint32, collection string, dialOption grpc.DialOption) *ErasureCodingTask {
|
||||
task := NewErasureCodingTask("sweep-"+collection, server, volumeID, collection, dialOption)
|
||||
task.sources = []*worker_pb.TaskSource{{Node: server, VolumeId: volumeID}}
|
||||
return task
|
||||
}
|
||||
Reference in New Issue
Block a user