shell: volume.tier.move can move volumes between data centers (#9925)

* shell: volume.tier.move can move volumes between data centers

-fromDataCenter scopes volume selection to volumes with a replica in
that data center. -toDataCenter constrains move destinations and
replication fulfillment. With identical disk types both flags are
required, moving full volumes between data centers on the same tier.

* shell: assert node identity in data center filter test

* shell: tier move resumes when the volume is already on the target

A replica already on the target tier and data center, typically left by
an interrupted earlier run, anchors the move: skip the copy and only
complete replication fulfillment and old replica cleanup. Previously
such volumes hit the no-destination path and the stale source replicas
were never removed.
This commit is contained in:
Chris Lu
2026-06-11 10:46:34 -07:00
committed by GitHub
parent c3b06bf809
commit 42030381ae
3 changed files with 124 additions and 21 deletions
+1 -1
View File
@@ -358,7 +358,7 @@ func TestBalanceDoesNotDrainOntoOneNode(t *testing.T) {
func TestVolumeSelection(t *testing.T) {
topologyInfo := parseOutput(topoData)
vids, err := collectVolumeIdsForTierChange(topologyInfo, 1000, types.ToDiskType(types.HddType), "", 20.0, 0)
vids, err := collectVolumeIdsForTierChange(topologyInfo, 1000, types.ToDiskType(types.HddType), "", "", 20.0, 0)
if err != nil {
t.Errorf("collectVolumeIdsForTierChange: %v", err)
}
+87 -20
View File
@@ -26,6 +26,8 @@ func init() {
type volumeTierMoveJob struct {
src pb.ServerAddress
vid needle.VolumeId
// replica already on target: skip the copy, just fulfill replication and clean up
alreadyPlaced bool
}
type commandVolumeTierMove struct {
@@ -41,9 +43,15 @@ func (c *commandVolumeTierMove) Name() string {
}
func (c *commandVolumeTierMove) Help() string {
return `change a volume from one disk type to another
return `change a volume from one disk type or data center to another
volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h] [-parallelLimit=4] [-toReplication=XYZ]
volume.tier.move -fromDataCenter=dc1 -toDataCenter=dc2 [-collectionPattern=""] [-fullPercent=95] [-quietFor=1h]
-fromDataCenter limits the volumes to move to those with a replica in that data center.
-toDataCenter places the moved volumes in that data center.
When fromDiskType and toDiskType are the same, both data center flags are required,
and volumes are moved between data centers on the same disk type.
The command ensures the target replication is fully achieved on the destination tier
before deleting old replicas. This prevents data loss if a destination disk fails
@@ -71,6 +79,8 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
source := tierCommand.String("fromDiskType", "", "the source disk type")
target := tierCommand.String("toDiskType", "", "the target disk type")
fromDataCenter := tierCommand.String("fromDataCenter", "", "only move volumes with a replica in this data center")
toDataCenter := tierCommand.String("toDataCenter", "", "the target data center")
parallelLimit := tierCommand.Int("parallelLimit", 0, "limit the number of parallel copying jobs")
applyChange := tierCommand.Bool("apply", false, "actually apply the changes")
// TODO: remove this alias
@@ -92,7 +102,12 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
toDiskType := types.ToDiskType(*target)
if fromDiskType == toDiskType {
return fmt.Errorf("source tier %s is the same as target tier %s", fromDiskType, toDiskType)
if *fromDataCenter == "" || *toDataCenter == "" {
return fmt.Errorf("source tier %s is the same as target tier %s; specify -fromDataCenter and -toDataCenter to move volumes between data centers", fromDiskType, toDiskType)
}
if *fromDataCenter == *toDataCenter {
return fmt.Errorf("source data center %s is the same as target data center %s", *fromDataCenter, *toDataCenter)
}
}
// collect topology information
@@ -102,7 +117,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
}
// collect all volumes that should change
volumeIds, err := collectVolumeIdsForTierChange(topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod)
volumeIds, err := collectVolumeIdsForTierChange(topologyInfo, volumeSizeLimitMb, fromDiskType, *fromDataCenter, *collectionPattern, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
@@ -111,9 +126,15 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
// Collect volume ID to collection name mapping for the sync operation
volumeIdToCollection := collectVolumeIdToCollection(topologyInfo, volumeIds)
_, allLocations := collectVolumeReplicaLocations(topologyInfo)
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
allLocations = filterLocationsByDiskType(allLocations, toDiskType)
if *toDataCenter != "" {
allLocations = filterLocationsByDataCenter(allLocations, *toDataCenter)
}
keepDataNodesSorted(allLocations, toDiskType)
if len(allLocations) == 0 {
return fmt.Errorf("no volume server found with disk type %s%s", toDiskType.ReadableString(), dataCenterSuffix(*toDataCenter))
}
if len(allLocations) > 0 && *parallelLimit > 0 && *parallelLimit < len(allLocations) {
allLocations = allLocations[:*parallelLimit]
@@ -131,7 +152,11 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
go func(dst location, jobs <-chan volumeTierMoveJob, applyChanges bool) {
defer wg.Done()
for job := range jobs {
fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString())
if job.alreadyPlaced {
fmt.Fprintf(writer, "completing move of volume %d already on %s ...\n", job.vid, dst.dataNode.Id)
} else {
fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", job.vid, job.src, dst.dataNode.Id, toDiskType.ReadableString())
}
locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(job.vid))
if !found {
@@ -142,7 +167,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
unlock := c.Lock(job.src)
if applyChanges {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, locations, job.src, dst, *ioBytePerSecond, replicationString); err != nil {
if err := c.doMoveOneVolume(commandEnv, writer, job.vid, toDiskType, *toDataCenter, locations, job.src, dst, *ioBytePerSecond, replicationString, job.alreadyPlaced); err != nil {
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", job.vid, job.src, dst.dataNode.Id, err)
}
}
@@ -153,7 +178,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
for _, vid := range volumeIds {
collection := volumeIdToCollection[vid]
if err = c.doVolumeTierMove(commandEnv, writer, vid, collection, toDiskType, allLocations); err != nil {
if err = c.doVolumeTierMove(commandEnv, writer, vid, collection, toDiskType, *toDataCenter, allLocations, volumeReplicas[uint32(vid)]); err != nil {
fmt.Printf("tier move volume %d: %v\n", vid, err)
}
allLocations = rotateDataNodes(allLocations)
@@ -185,6 +210,22 @@ func filterLocationsByDiskType(dataNodes []location, diskType types.DiskType) (r
return
}
func filterLocationsByDataCenter(dataNodes []location, dataCenter string) (ret []location) {
for _, loc := range dataNodes {
if loc.dc == dataCenter {
ret = append(ret, loc)
}
}
return
}
func dataCenterSuffix(dataCenter string) string {
if dataCenter == "" {
return ""
}
return " in data center " + dataCenter
}
func rotateDataNodes(dataNodes []location) []location {
if len(dataNodes) > 0 {
return append(dataNodes[1:], dataNodes[0])
@@ -202,13 +243,27 @@ func isOneOf(server string, locations []wdclient.Location) bool {
return false
}
func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, collection string, toDiskType types.DiskType, allLocations []location) (err error) {
func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, collection string, toDiskType types.DiskType, toDataCenter string, allLocations []location, replicas []*VolumeReplica) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
if !found {
return fmt.Errorf("volume %d not found", vid)
}
// a replica already on the target tier (e.g. left by an interrupted earlier run)
// anchors the move: skip the copy, only fulfill replication and clean up old replicas
for _, r := range replicas {
if types.ToDiskType(r.info.DiskType) != toDiskType || (toDataCenter != "" && r.location.dc != toDataCenter) {
continue
}
anchorAddress := pb.NewServerAddressFromDataNode(r.location.dataNode)
if queue, found := c.queues[anchorAddress]; found {
fmt.Fprintf(writer, "volume %d is already on %s, will complete replication and cleanup\n", vid, r.location.dataNode.Id)
queue <- volumeTierMoveJob{src: anchorAddress, vid: vid, alreadyPlaced: true}
return nil
}
}
// find one server with the most empty volume slots with target disk type
hasFoundTarget := false
fn := capacityByFreeVolumeCount(toDiskType)
@@ -239,18 +294,18 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
addVolumeCount(dst.dataNode.DiskInfos[string(toDiskType)], 1)
destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
c.queues[destServerAddress] <- volumeTierMoveJob{sourceVolumeServer, vid}
c.queues[destServerAddress] <- volumeTierMoveJob{src: sourceVolumeServer, vid: vid}
}
}
if !hasFoundTarget {
fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid)
fmt.Fprintf(writer, "can not find disk type %s%s for volume %d\n", toDiskType.ReadableString(), dataCenterSuffix(toDataCenter), vid)
}
return nil
}
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64, replicationString *string) (err error) {
func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, toDataCenter string, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location, ioBytePerSecond int64, replicationString *string, alreadyPlaced bool) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@@ -262,7 +317,11 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
}
newAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// when already placed nothing is deleted yet, so failure paths restore every replica
deletedSource := sourceVolumeServer
if alreadyPlaced {
deletedSource = ""
} else if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, newAddress, 5*time.Second, toDiskType.ReadableString(), ioBytePerSecond, true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true, false); err != nil {
glog.Errorf("mark volume %d as writable on %s: %v", vid, locations[0].Url, err)
@@ -276,7 +335,7 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
if err = configureVolumeReplication(commandEnv.option.GrpcDialOption, vid, newAddress, *replicationString); err != nil {
// LiveMoveVolume already deleted sourceVolumeServer; mark surviving
// old replicas writable before aborting so the volume stays accessible.
restoreSurvivingReplicasWritable(commandEnv, vid, locations, sourceVolumeServer)
restoreSurvivingReplicasWritable(commandEnv, vid, locations, deletedSource)
return fmt.Errorf("configure replication %s on volume %d at %s: %v", *replicationString, vid, newAddress, err)
}
}
@@ -285,10 +344,10 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
// deleting old replicas to avoid data-loss risk.
// Use the explicit -toReplication if given, otherwise preserve the volume's
// existing replication from the source tier.
preserveServers, replicateErr := c.ensureReplicationFulfilled(commandEnv, writer, vid, toDiskType, dst, *replicationString)
preserveServers, replicateErr := c.ensureReplicationFulfilled(commandEnv, writer, vid, toDiskType, toDataCenter, dst, *replicationString)
if replicateErr != nil {
// Replication not fully achieved — do NOT delete old replicas.
restoreSurvivingReplicasWritable(commandEnv, vid, locations, sourceVolumeServer)
restoreSurvivingReplicasWritable(commandEnv, vid, locations, deletedSource)
return fmt.Errorf("volume %d moved to %s but failed to fulfill replication, old replicas preserved: %v", vid, dst.dataNode.Id, replicateErr)
}
@@ -338,8 +397,10 @@ func restoreSurvivingReplicasWritable(commandEnv *CommandEnv, vid needle.VolumeI
// move so it can see the newly placed volume and find suitable destinations for additional copies.
// It returns a set of server URLs (from the original locations) that host target-tier replicas
// counted toward fulfillment, so the caller can avoid deleting them during cleanup.
func (c *commandVolumeTierMove) ensureReplicationFulfilled(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, movedDst location, replicationString string) (preserveServers map[string]bool, err error) {
func (c *commandVolumeTierMove) ensureReplicationFulfilled(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, toDataCenter string, movedDst location, replicationString string) (preserveServers map[string]bool, err error) {
preserveServers = make(map[string]bool)
// keeps an anchored pre-existing replica writable on the early-return paths
preserveServers[movedDst.dataNode.Id] = true
sourceAddress := pb.NewServerAddressFromDataNode(movedDst.dataNode)
// Wait briefly for the master to receive heartbeats reflecting the move,
@@ -351,6 +412,9 @@ func (c *commandVolumeTierMove) ensureReplicationFulfilled(commandEnv *CommandEn
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
allLocations = filterLocationsByDiskType(allLocations, toDiskType)
if toDataCenter != "" {
allLocations = filterLocationsByDataCenter(allLocations, toDataCenter)
}
keepDataNodesSorted(allLocations, toDiskType)
existingReplicas := volumeReplicas[uint32(vid)]
@@ -386,17 +450,17 @@ func (c *commandVolumeTierMove) ensureReplicationFulfilled(commandEnv *CommandEn
return preserveServers, nil
}
// Filter to only replicas on the target disk type (the newly moved one).
// Filter to only replicas on the target disk type and data center (the newly moved one).
var targetTierReplicas []*VolumeReplica
for _, r := range existingReplicas {
if types.ToDiskType(r.info.DiskType) == toDiskType {
if types.ToDiskType(r.info.DiskType) == toDiskType && (toDataCenter == "" || r.location.dc == toDataCenter) {
targetTierReplicas = append(targetTierReplicas, r)
// Track pre-existing target-tier replicas so the caller won't delete them.
preserveServers[r.location.dataNode.Id] = true
}
}
if len(targetTierReplicas) == 0 {
return nil, fmt.Errorf("volume %d not found on target tier %s in topology after move", vid, toDiskType)
return nil, fmt.Errorf("volume %d not found on target tier %s%s in topology after move", vid, toDiskType, dataCenterSuffix(toDataCenter))
}
// Ensure all existing target-tier replicas have the correct replication metadata.
@@ -468,7 +532,7 @@ func (c *commandVolumeTierMove) ensureReplicationFulfilled(commandEnv *CommandEn
return preserveServers, nil
}
func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, sourceDataCenter string, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
@@ -477,6 +541,9 @@ func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeS
vidMap := make(map[uint32]bool)
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if sourceDataCenter != "" && string(dc) != sourceDataCenter {
return
}
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
// check collection name pattern
@@ -0,0 +1,36 @@
package shell
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/stretchr/testify/assert"
)
func TestVolumeSelectionByDataCenter(t *testing.T) {
topologyInfo := parseOutput(topoData)
vids, err := collectVolumeIdsForTierChange(topologyInfo, 1000, types.ToDiskType(types.HddType), "dc2", "", 20.0, 0)
assert.NoError(t, err)
assert.Equal(t, 83, len(vids))
vids, err = collectVolumeIdsForTierChange(topologyInfo, 1000, types.ToDiskType(types.HddType), "dc1", "", 20.0, 0)
assert.NoError(t, err)
assert.Equal(t, 0, len(vids))
}
func TestFilterLocationsByDataCenter(t *testing.T) {
_, allLocations := collectVolumeReplicaLocations(parseOutput(topoData))
assert.Equal(t, 5, len(allLocations))
assert.ElementsMatch(t, []string{"192.168.1.4:8080", "192.168.1.2:8080"}, locationNodeIds(filterLocationsByDataCenter(allLocations, "dc2")))
assert.ElementsMatch(t, []string{"192.168.1.6:8080"}, locationNodeIds(filterLocationsByDataCenter(allLocations, "dc3")))
assert.Empty(t, filterLocationsByDataCenter(allLocations, "dc9"))
}
func locationNodeIds(locations []location) (ids []string) {
for _, loc := range locations {
ids = append(ids, loc.dataNode.Id)
}
return
}