mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
volume.tier.move: fulfill target replication before deleting old replicas (#8950)
* volume.tier.move: fulfill target replication before deleting old replicas When -toReplication is specified, volume.tier.move now creates all required replicas on the destination tier before deleting old replicas. This closes the data-loss window where only one copy existed on the target tier while awaiting volume.fix.replication. If replication fulfillment fails, old replicas are preserved and marked writable so the volume remains accessible. Also extracts replicateVolumeToServer and configureVolumeReplication helpers to reduce duplication across volume.tier.move and volume.fix.replication. Fixes #8937 * volume.tier.move: always fulfill replication before deleting old replicas When -toReplication is specified, use that replication setting. Otherwise, read the volume's existing replication from the super block. In both cases, all required replicas are created on the destination tier before old replicas are deleted. If replication fulfillment fails (e.g. not enough destination nodes), old replicas are preserved and marked writable so no data is lost. * volume.tier.move: address review feedback on ensureReplicationFulfilled - Add 5s delay before re-collecting topology to allow master heartbeat propagation after the move - Add nil guard for targetTierReplicas to prevent panic if the moved replica is not yet visible in the topology - Treat configureVolumeReplication failure as a hard error instead of a warning, so the rollback logic preserves old replicas * volume.tier.move: harden replication config error handling - Make configureVolumeReplication failure on the primary moved replica a hard error that aborts the move, instead of logging and continuing - Configure replication metadata on all existing target-tier replicas (not just newly created ones) when -toReplication is specified - Deletion of old replicas cannot affect new replicas since the locations list only contains pre-move servers (verified, no change) * volume.tier.move: fix cleanup deleting fulfilled replicas and broken recovery Fix 1: The cleanup loop now preserves pre-existing target-tier replicas that ensureReplicationFulfilled counted toward the replication target. Previously, a mixed-tier volume with an existing replica on the target tier could have that replica deleted right after being counted as fulfilled, leaving the volume under-replicated. ensureReplicationFulfilled now returns a preserveServers set that the deletion loop checks before removing any old replica. Fix 2: Failure paths after LiveMoveVolume (which deletes the source replica) now use restoreSurvivingReplicasWritable instead of markVolumeReplicasWritable. The old helper stopped on first error, so attempting to mark the already-deleted source writable would prevent all surviving replicas from being restored. The new helper skips the deleted source and continues through all remaining locations, logging per-replica errors instead of aborting. * volume.tier.move: mark preserved replicas writable, skip nodes with existing volume Fix 1: Preserved pre-existing target-tier replicas were left read-only after the move completed. They were marked read-only at the start (along with all other replicas) but never restored since the old code deleted them. Now they are explicitly marked writable before cleanup. Fix 2: The fulfillment loop could pick a candidate node that already hosts this volume on a different disk type, causing a VolumeCopy conflict. Added a guard that skips any node already hosting the volume (on any disk) before attempting replication.
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package shell
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -16,11 +15,8 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
)
|
||||
|
||||
@@ -390,30 +386,10 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
|
||||
return true, nil
|
||||
}
|
||||
|
||||
err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
||||
VolumeId: replica.info.Id,
|
||||
SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
|
||||
})
|
||||
if replicateErr != nil {
|
||||
return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
|
||||
}
|
||||
for {
|
||||
resp, recvErr := stream.Recv()
|
||||
if recvErr != nil {
|
||||
if recvErr == io.EOF {
|
||||
break
|
||||
} else {
|
||||
return recvErr
|
||||
}
|
||||
}
|
||||
if resp.ProcessedBytes > 0 {
|
||||
fmt.Fprintf(writer, "volume %d processed %s bytes\n", replica.info.Id, util.BytesToHumanReadable(uint64(resp.ProcessedBytes)))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
err := replicateVolumeToServer(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(replica.info.Id),
|
||||
pb.NewServerAddressFromDataNode(replica.location.dataNode),
|
||||
pb.NewServerAddressFromDataNode(dst.dataNode),
|
||||
replica.info.DiskType)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
||||
@@ -2,6 +2,7 @@ package shell
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -240,3 +241,47 @@ func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// replicateVolumeToServer copies a volume from sourceAddress to targetAddress via the VolumeCopy gRPC stream.
|
||||
func replicateVolumeToServer(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceAddress, targetAddress pb.ServerAddress, diskType string) error {
|
||||
return operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
SourceDataNode: string(sourceAddress),
|
||||
DiskType: diskType,
|
||||
})
|
||||
if replicateErr != nil {
|
||||
return replicateErr
|
||||
}
|
||||
for {
|
||||
resp, recvErr := stream.Recv()
|
||||
if recvErr != nil {
|
||||
if recvErr == io.EOF {
|
||||
break
|
||||
}
|
||||
return recvErr
|
||||
}
|
||||
if resp.ProcessedBytes > 0 {
|
||||
fmt.Fprintf(writer, "volume %d processed %s bytes\n", volumeId, util.BytesToHumanReadable(uint64(resp.ProcessedBytes)))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// configureVolumeReplication sets the replication setting on a volume at the given server.
|
||||
func configureVolumeReplication(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, targetAddress pb.ServerAddress, replicationString string) error {
|
||||
return operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
Replication: replicationString,
|
||||
})
|
||||
if configureErr != nil {
|
||||
return configureErr
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return errors.New(resp.Error)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package shell
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -13,11 +11,10 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
)
|
||||
|
||||
@@ -47,8 +44,12 @@ func (c *commandVolumeTierMove) Help() string {
|
||||
|
||||
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4] [-toReplication=XYZ]
|
||||
|
||||
Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
|
||||
So "volume.fix.replication" and "volume.balance" should be followed.
|
||||
The command ensures the target replication is fully achieved on the destination tier
|
||||
before deleting old replicas. This prevents data loss if a destination disk fails
|
||||
before replication repair completes.
|
||||
|
||||
When -toReplication is specified, the moved volume is reconfigured with the new
|
||||
replication setting. Otherwise, the volume's existing replication is preserved.
|
||||
|
||||
Note:
|
||||
Use -collectionPattern="_default" to match only the default collection (volumes with no collection name).
|
||||
@@ -271,36 +272,201 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
|
||||
|
||||
// If move is successful and replication is not empty, alter moved volume's replication setting
|
||||
if *replicationString != "" {
|
||||
err = operation.WithVolumeServerClient(false, newAddress, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
|
||||
VolumeId: uint32(vid),
|
||||
Replication: *replicationString,
|
||||
})
|
||||
if configureErr != nil {
|
||||
return configureErr
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return errors.New(resp.Error)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
glog.Errorf("update volume %d replication on %s: %v", vid, locations[0].Url, err)
|
||||
if err = configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, newAddress, *replicationString); err != nil {
|
||||
// LiveMoveVolume already deleted sourceVolumeServer; mark surviving
|
||||
// old replicas writable before aborting so the volume stays accessible.
|
||||
restoreSurvivingReplicasWritable(commandEnv, vid, locations, sourceVolumeServer)
|
||||
return fmt.Errorf("configure replication %s on volume %d at %s: %v", *replicationString, vid, newAddress, err)
|
||||
}
|
||||
}
|
||||
|
||||
// remove the remaining replicas
|
||||
// Ensure the required number of replicas exist on the target tier BEFORE
|
||||
// deleting old replicas to avoid data-loss risk.
|
||||
// Use the explicit -toReplication if given, otherwise preserve the volume's
|
||||
// existing replication from the source tier.
|
||||
preserveServers, replicateErr := c.ensureReplicationFulfilled(commandEnv, writer, vid, toDiskType, dst, *replicationString)
|
||||
if replicateErr != nil {
|
||||
// Replication not fully achieved — do NOT delete old replicas.
|
||||
restoreSurvivingReplicasWritable(commandEnv, vid, locations, sourceVolumeServer)
|
||||
return fmt.Errorf("volume %d moved to %s but failed to fulfill replication, old replicas preserved: %v", vid, dst.dataNode.Id, replicateErr)
|
||||
}
|
||||
|
||||
// Mark preserved pre-existing target-tier replicas as writable.
|
||||
// They were marked read-only at the start of the move and would otherwise
|
||||
// stay read-only since we're keeping rather than deleting them.
|
||||
for _, loc := range locations {
|
||||
if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer {
|
||||
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), false); err != nil {
|
||||
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
|
||||
if preserveServers[loc.Url] {
|
||||
if markErr := markVolumeWritable(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), true, false); markErr != nil {
|
||||
glog.Errorf("mark volume %d as writable on preserved replica %s: %v", vid, loc.Url, markErr)
|
||||
}
|
||||
// reduce volume count? Not really necessary since they are "more" full and will not be a candidate to move to
|
||||
}
|
||||
}
|
||||
|
||||
// Remove old replicas that are NOT needed by the fulfilled replication.
|
||||
// Skip the move destination, the already-deleted source, and any pre-existing
|
||||
// target-tier replicas that were counted toward replication fulfillment.
|
||||
for _, loc := range locations {
|
||||
if loc.Url == dst.dataNode.Id || loc.ServerAddress() == sourceVolumeServer {
|
||||
continue
|
||||
}
|
||||
if preserveServers[loc.Url] {
|
||||
continue
|
||||
}
|
||||
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), false); err != nil {
|
||||
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreSurvivingReplicasWritable marks old replicas writable after a failure,
|
||||
// skipping the source that was already deleted by LiveMoveVolume.
|
||||
func restoreSurvivingReplicasWritable(commandEnv *CommandEnv, vid needle.VolumeId, locations []wdclient.Location, deletedSource pb.ServerAddress) {
|
||||
for _, loc := range locations {
|
||||
if loc.ServerAddress() == deletedSource {
|
||||
continue
|
||||
}
|
||||
if markErr := markVolumeWritable(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), true, false); markErr != nil {
|
||||
glog.Errorf("mark volume %d as writable on %s: %v", vid, loc.Url, markErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ensureReplicationFulfilled creates additional replicas of the volume on the target tier
|
||||
// to satisfy the requested replication placement. It re-collects topology after the initial
|
||||
// move so it can see the newly placed volume and find suitable destinations for additional copies.
|
||||
// It returns a set of server URLs (from the original locations) that host target-tier replicas
|
||||
// counted toward fulfillment, so the caller can avoid deleting them during cleanup.
|
||||
func (c *commandVolumeTierMove) ensureReplicationFulfilled(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, movedDst location, replicationString string) (preserveServers map[string]bool, err error) {
|
||||
preserveServers = make(map[string]bool)
|
||||
sourceAddress := pb.NewServerAddressFromDataNode(movedDst.dataNode)
|
||||
|
||||
// Wait briefly for the master to receive heartbeats reflecting the move,
|
||||
// then re-collect topology to get the current state.
|
||||
topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Second)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("collect topology: %v", err)
|
||||
}
|
||||
|
||||
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
|
||||
allLocations = filterLocationsByDiskType(allLocations, toDiskType)
|
||||
keepDataNodesSorted(allLocations, toDiskType)
|
||||
|
||||
existingReplicas := volumeReplicas[uint32(vid)]
|
||||
if len(existingReplicas) == 0 {
|
||||
return nil, fmt.Errorf("volume %d not found in topology after move", vid)
|
||||
}
|
||||
|
||||
// Build a set of all data nodes that already host this volume (any disk type)
|
||||
// so we don't try to VolumeCopy to a server that already has it.
|
||||
nodesWithVolume := make(map[string]bool)
|
||||
for _, r := range existingReplicas {
|
||||
nodesWithVolume[r.location.dataNode.Id] = true
|
||||
}
|
||||
|
||||
// Determine the target replication: use explicit -toReplication if given,
|
||||
// otherwise read the volume's existing replication setting.
|
||||
var replicaPlacement *super_block.ReplicaPlacement
|
||||
if replicationString != "" {
|
||||
replicaPlacement, err = super_block.NewReplicaPlacementFromString(replicationString)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse replication %s: %v", replicationString, err)
|
||||
}
|
||||
} else {
|
||||
replicaPlacement, err = super_block.NewReplicaPlacementFromByte(byte(existingReplicas[0].info.ReplicaPlacement))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse existing replication for volume %d: %v", vid, err)
|
||||
}
|
||||
}
|
||||
|
||||
requiredCopies := replicaPlacement.GetCopyCount()
|
||||
if requiredCopies <= 1 {
|
||||
// No additional replicas needed (e.g., replication "000")
|
||||
return preserveServers, nil
|
||||
}
|
||||
|
||||
// Filter to only replicas on the target disk type (the newly moved one).
|
||||
var targetTierReplicas []*VolumeReplica
|
||||
for _, r := range existingReplicas {
|
||||
if types.ToDiskType(r.info.DiskType) == toDiskType {
|
||||
targetTierReplicas = append(targetTierReplicas, r)
|
||||
// Track pre-existing target-tier replicas so the caller won't delete them.
|
||||
preserveServers[r.location.dataNode.Id] = true
|
||||
}
|
||||
}
|
||||
if len(targetTierReplicas) == 0 {
|
||||
return nil, fmt.Errorf("volume %d not found on target tier %s in topology after move", vid, toDiskType)
|
||||
}
|
||||
|
||||
// Ensure all existing target-tier replicas have the correct replication metadata.
|
||||
// The primary moved replica is already configured in doMoveOneVolume, but there may
|
||||
// be pre-existing replicas on the target tier that need updating.
|
||||
if replicationString != "" {
|
||||
for _, r := range targetTierReplicas {
|
||||
addr := pb.NewServerAddressFromDataNode(r.location.dataNode)
|
||||
if configErr := configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, addr, replicationString); configErr != nil {
|
||||
return nil, fmt.Errorf("volume %d: failed to configure replication on existing replica %s: %v", vid, r.location.dataNode.Id, configErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
additionalCopiesNeeded := requiredCopies - len(targetTierReplicas)
|
||||
if additionalCopiesNeeded <= 0 {
|
||||
return preserveServers, nil
|
||||
}
|
||||
|
||||
fmt.Fprintf(writer, "volume %d: creating %d additional replica(s) for replication %s\n", vid, additionalCopiesNeeded, replicaPlacement)
|
||||
|
||||
fn := capacityByFreeVolumeCount(toDiskType)
|
||||
copiesMade := 0
|
||||
for _, candidateDst := range allLocations {
|
||||
if copiesMade >= additionalCopiesNeeded {
|
||||
break
|
||||
}
|
||||
if fn(candidateDst.dataNode) <= 0 {
|
||||
continue
|
||||
}
|
||||
// Skip nodes that already host this volume on any disk type to avoid
|
||||
// VolumeCopy conflicts (e.g., same volume on source tier and target tier).
|
||||
if nodesWithVolume[candidateDst.dataNode.Id] {
|
||||
continue
|
||||
}
|
||||
if !satisfyReplicaPlacement(replicaPlacement, targetTierReplicas, candidateDst) {
|
||||
continue
|
||||
}
|
||||
|
||||
candidateAddress := pb.NewServerAddressFromDataNode(candidateDst.dataNode)
|
||||
fmt.Fprintf(writer, "volume %d: replicating from %s to %s\n", vid, sourceAddress, candidateDst.dataNode.Id)
|
||||
|
||||
if copyErr := replicateVolumeToServer(commandEnv.option.GrpcDialOption, writer, vid, sourceAddress, candidateAddress, toDiskType.ReadableString()); copyErr != nil {
|
||||
return nil, fmt.Errorf("replicate volume %d to %s: %v", vid, candidateDst.dataNode.Id, copyErr)
|
||||
}
|
||||
|
||||
// Configure replication on the new replica if an explicit -toReplication was given.
|
||||
// Without it, VolumeCopy already preserves the source's replication from the super block.
|
||||
if replicationString != "" {
|
||||
if configErr := configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, candidateAddress, replicationString); configErr != nil {
|
||||
return nil, fmt.Errorf("volume %d: failed to configure replication on %s: %v", vid, candidateDst.dataNode.Id, configErr)
|
||||
}
|
||||
}
|
||||
|
||||
// Track the new replica for placement decisions
|
||||
targetTierReplicas = append(targetTierReplicas, &VolumeReplica{
|
||||
location: &candidateDst,
|
||||
info: targetTierReplicas[0].info,
|
||||
})
|
||||
addVolumeCount(candidateDst.dataNode.DiskInfos[string(toDiskType)], 1)
|
||||
copiesMade++
|
||||
}
|
||||
|
||||
if copiesMade < additionalCopiesNeeded {
|
||||
return nil, fmt.Errorf("could only create %d of %d additional replicas for volume %d (replication %s): not enough eligible destinations", copiesMade, additionalCopiesNeeded, vid, replicaPlacement)
|
||||
}
|
||||
|
||||
fmt.Fprintf(writer, "volume %d: replication %s fulfilled with %d total copies\n", vid, replicaPlacement, requiredCopies)
|
||||
return preserveServers, nil
|
||||
}
|
||||
|
||||
func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
|
||||
|
||||
quietSeconds := int64(quietPeriod / time.Second)
|
||||
|
||||
Reference in New Issue
Block a user