diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 548cd6412..6776e3dff 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -1,7 +1,6 @@ package shell import ( - "context" "flag" "fmt" "io" @@ -16,11 +15,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" ) @@ -390,30 +386,10 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co return true, nil } - err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: replica.info.Id, - SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)), - }) - if replicateErr != nil { - return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) - } - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } else { - return recvErr - } - } - if resp.ProcessedBytes > 0 { - fmt.Fprintf(writer, "volume %d processed %s bytes\n", replica.info.Id, util.BytesToHumanReadable(uint64(resp.ProcessedBytes))) - } - } - - return nil - }) + err := replicateVolumeToServer(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(replica.info.Id), + pb.NewServerAddressFromDataNode(replica.location.dataNode), + pb.NewServerAddressFromDataNode(dst.dataNode), + replica.info.DiskType) if err != nil { return false, err diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 05bc00d44..def85b8ed 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -2,6 +2,7 @@ package shell import ( "context" + "errors" "flag" "fmt" "io" @@ -240,3 +241,47 @@ func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle. } return nil } + +// replicateVolumeToServer copies a volume from sourceAddress to targetAddress via the VolumeCopy gRPC stream. +func replicateVolumeToServer(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceAddress, targetAddress pb.ServerAddress, diskType string) error { + return operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + VolumeId: uint32(volumeId), + SourceDataNode: string(sourceAddress), + DiskType: diskType, + }) + if replicateErr != nil { + return replicateErr + } + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } + return recvErr + } + if resp.ProcessedBytes > 0 { + fmt.Fprintf(writer, "volume %d processed %s bytes\n", volumeId, util.BytesToHumanReadable(uint64(resp.ProcessedBytes))) + } + } + return nil + }) +} + +// configureVolumeReplication sets the replication setting on a volume at the given server. +func configureVolumeReplication(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, targetAddress pb.ServerAddress, replicationString string) error { + return operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ + VolumeId: uint32(volumeId), + Replication: replicationString, + }) + if configureErr != nil { + return configureErr + } + if resp.Error != "" { + return errors.New(resp.Error) + } + return nil + }) +} diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index b0f9cba12..619e2f904 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -1,8 +1,6 @@ package shell import ( - "context" - "errors" "flag" "fmt" "io" @@ -13,11 +11,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/wdclient" - "github.com/seaweedfs/seaweedfs/weed/operation" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) @@ -47,8 +44,12 @@ func (c *commandVolumeTierMove) Help() string { volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4] [-toReplication=XYZ] - Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped. - So "volume.fix.replication" and "volume.balance" should be followed. + The command ensures the target replication is fully achieved on the destination tier + before deleting old replicas. This prevents data loss if a destination disk fails + before replication repair completes. + + When -toReplication is specified, the moved volume is reconfigured with the new + replication setting. Otherwise, the volume's existing replication is preserved. Note: Use -collectionPattern="_default" to match only the default collection (volumes with no collection name). @@ -271,36 +272,201 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i // If move is successful and replication is not empty, alter moved volume's replication setting if *replicationString != "" { - err = operation.WithVolumeServerClient(false, newAddress, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ - VolumeId: uint32(vid), - Replication: *replicationString, - }) - if configureErr != nil { - return configureErr - } - if resp.Error != "" { - return errors.New(resp.Error) - } - return nil - }) - if err != nil { - glog.Errorf("update volume %d replication on %s: %v", vid, locations[0].Url, err) + if err = configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, newAddress, *replicationString); err != nil { + // LiveMoveVolume already deleted sourceVolumeServer; mark surviving + // old replicas writable before aborting so the volume stays accessible. + restoreSurvivingReplicasWritable(commandEnv, vid, locations, sourceVolumeServer) + return fmt.Errorf("configure replication %s on volume %d at %s: %v", *replicationString, vid, newAddress, err) } } - // remove the remaining replicas + // Ensure the required number of replicas exist on the target tier BEFORE + // deleting old replicas to avoid data-loss risk. + // Use the explicit -toReplication if given, otherwise preserve the volume's + // existing replication from the source tier. + preserveServers, replicateErr := c.ensureReplicationFulfilled(commandEnv, writer, vid, toDiskType, dst, *replicationString) + if replicateErr != nil { + // Replication not fully achieved — do NOT delete old replicas. + restoreSurvivingReplicasWritable(commandEnv, vid, locations, sourceVolumeServer) + return fmt.Errorf("volume %d moved to %s but failed to fulfill replication, old replicas preserved: %v", vid, dst.dataNode.Id, replicateErr) + } + + // Mark preserved pre-existing target-tier replicas as writable. + // They were marked read-only at the start of the move and would otherwise + // stay read-only since we're keeping rather than deleting them. for _, loc := range locations { - if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer { - if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), false); err != nil { - fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err) + if preserveServers[loc.Url] { + if markErr := markVolumeWritable(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), true, false); markErr != nil { + glog.Errorf("mark volume %d as writable on preserved replica %s: %v", vid, loc.Url, markErr) } - // reduce volume count? Not really necessary since they are "more" full and will not be a candidate to move to + } + } + + // Remove old replicas that are NOT needed by the fulfilled replication. + // Skip the move destination, the already-deleted source, and any pre-existing + // target-tier replicas that were counted toward replication fulfillment. + for _, loc := range locations { + if loc.Url == dst.dataNode.Id || loc.ServerAddress() == sourceVolumeServer { + continue + } + if preserveServers[loc.Url] { + continue + } + if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), false); err != nil { + fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err) } } return nil } +// restoreSurvivingReplicasWritable marks old replicas writable after a failure, +// skipping the source that was already deleted by LiveMoveVolume. +func restoreSurvivingReplicasWritable(commandEnv *CommandEnv, vid needle.VolumeId, locations []wdclient.Location, deletedSource pb.ServerAddress) { + for _, loc := range locations { + if loc.ServerAddress() == deletedSource { + continue + } + if markErr := markVolumeWritable(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), true, false); markErr != nil { + glog.Errorf("mark volume %d as writable on %s: %v", vid, loc.Url, markErr) + } + } +} + +// ensureReplicationFulfilled creates additional replicas of the volume on the target tier +// to satisfy the requested replication placement. It re-collects topology after the initial +// move so it can see the newly placed volume and find suitable destinations for additional copies. +// It returns a set of server URLs (from the original locations) that host target-tier replicas +// counted toward fulfillment, so the caller can avoid deleting them during cleanup. +func (c *commandVolumeTierMove) ensureReplicationFulfilled(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, movedDst location, replicationString string) (preserveServers map[string]bool, err error) { + preserveServers = make(map[string]bool) + sourceAddress := pb.NewServerAddressFromDataNode(movedDst.dataNode) + + // Wait briefly for the master to receive heartbeats reflecting the move, + // then re-collect topology to get the current state. + topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Second) + if err != nil { + return nil, fmt.Errorf("collect topology: %v", err) + } + + volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) + allLocations = filterLocationsByDiskType(allLocations, toDiskType) + keepDataNodesSorted(allLocations, toDiskType) + + existingReplicas := volumeReplicas[uint32(vid)] + if len(existingReplicas) == 0 { + return nil, fmt.Errorf("volume %d not found in topology after move", vid) + } + + // Build a set of all data nodes that already host this volume (any disk type) + // so we don't try to VolumeCopy to a server that already has it. + nodesWithVolume := make(map[string]bool) + for _, r := range existingReplicas { + nodesWithVolume[r.location.dataNode.Id] = true + } + + // Determine the target replication: use explicit -toReplication if given, + // otherwise read the volume's existing replication setting. + var replicaPlacement *super_block.ReplicaPlacement + if replicationString != "" { + replicaPlacement, err = super_block.NewReplicaPlacementFromString(replicationString) + if err != nil { + return nil, fmt.Errorf("parse replication %s: %v", replicationString, err) + } + } else { + replicaPlacement, err = super_block.NewReplicaPlacementFromByte(byte(existingReplicas[0].info.ReplicaPlacement)) + if err != nil { + return nil, fmt.Errorf("parse existing replication for volume %d: %v", vid, err) + } + } + + requiredCopies := replicaPlacement.GetCopyCount() + if requiredCopies <= 1 { + // No additional replicas needed (e.g., replication "000") + return preserveServers, nil + } + + // Filter to only replicas on the target disk type (the newly moved one). + var targetTierReplicas []*VolumeReplica + for _, r := range existingReplicas { + if types.ToDiskType(r.info.DiskType) == toDiskType { + targetTierReplicas = append(targetTierReplicas, r) + // Track pre-existing target-tier replicas so the caller won't delete them. + preserveServers[r.location.dataNode.Id] = true + } + } + if len(targetTierReplicas) == 0 { + return nil, fmt.Errorf("volume %d not found on target tier %s in topology after move", vid, toDiskType) + } + + // Ensure all existing target-tier replicas have the correct replication metadata. + // The primary moved replica is already configured in doMoveOneVolume, but there may + // be pre-existing replicas on the target tier that need updating. + if replicationString != "" { + for _, r := range targetTierReplicas { + addr := pb.NewServerAddressFromDataNode(r.location.dataNode) + if configErr := configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, addr, replicationString); configErr != nil { + return nil, fmt.Errorf("volume %d: failed to configure replication on existing replica %s: %v", vid, r.location.dataNode.Id, configErr) + } + } + } + + additionalCopiesNeeded := requiredCopies - len(targetTierReplicas) + if additionalCopiesNeeded <= 0 { + return preserveServers, nil + } + + fmt.Fprintf(writer, "volume %d: creating %d additional replica(s) for replication %s\n", vid, additionalCopiesNeeded, replicaPlacement) + + fn := capacityByFreeVolumeCount(toDiskType) + copiesMade := 0 + for _, candidateDst := range allLocations { + if copiesMade >= additionalCopiesNeeded { + break + } + if fn(candidateDst.dataNode) <= 0 { + continue + } + // Skip nodes that already host this volume on any disk type to avoid + // VolumeCopy conflicts (e.g., same volume on source tier and target tier). + if nodesWithVolume[candidateDst.dataNode.Id] { + continue + } + if !satisfyReplicaPlacement(replicaPlacement, targetTierReplicas, candidateDst) { + continue + } + + candidateAddress := pb.NewServerAddressFromDataNode(candidateDst.dataNode) + fmt.Fprintf(writer, "volume %d: replicating from %s to %s\n", vid, sourceAddress, candidateDst.dataNode.Id) + + if copyErr := replicateVolumeToServer(commandEnv.option.GrpcDialOption, writer, vid, sourceAddress, candidateAddress, toDiskType.ReadableString()); copyErr != nil { + return nil, fmt.Errorf("replicate volume %d to %s: %v", vid, candidateDst.dataNode.Id, copyErr) + } + + // Configure replication on the new replica if an explicit -toReplication was given. + // Without it, VolumeCopy already preserves the source's replication from the super block. + if replicationString != "" { + if configErr := configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, candidateAddress, replicationString); configErr != nil { + return nil, fmt.Errorf("volume %d: failed to configure replication on %s: %v", vid, candidateDst.dataNode.Id, configErr) + } + } + + // Track the new replica for placement decisions + targetTierReplicas = append(targetTierReplicas, &VolumeReplica{ + location: &candidateDst, + info: targetTierReplicas[0].info, + }) + addVolumeCount(candidateDst.dataNode.DiskInfos[string(toDiskType)], 1) + copiesMade++ + } + + if copiesMade < additionalCopiesNeeded { + return nil, fmt.Errorf("could only create %d of %d additional replicas for volume %d (replication %s): not enough eligible destinations", copiesMade, additionalCopiesNeeded, vid, replicaPlacement) + } + + fmt.Fprintf(writer, "volume %d: replication %s fulfilled with %d total copies\n", vid, replicaPlacement, requiredCopies) + return preserveServers, nil +} + func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { quietSeconds := int64(quietPeriod / time.Second)