mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-14 23:46:39 +03:00
c2591b4395
* volume: verify before destroy in VolumeCopy and replication repair Four data-safety fixes around copy/repair paths that could destroy or resurrect data before verifying the source or survivors. (a) VolumeCopy no longer deletes a pre-existing local replica up front. The delete is deferred until ReadVolumeFileStatus on the source succeeds, so a transient source outage (or a retry after one) can no longer wipe a healthy destination replica. Gated on source readability only; size/count comparisons are intentionally not used because they invert legitimately after divergent vacuum/compaction. Mirrored in the Rust volume server. (b) volume.check.disk no longer resurrects vacuumed-deleted needles. A key present-and-live on the source but entirely absent on the target is ambiguous: it may be a genuine missing write, or a needle deleted on the target and then vacuumed (its index entry and any tombstone are gone). An individual needle AppendAtNs has no monotonic relation to a vacuum watermark, so the old cutoff heuristic could not tell them apart. Without positive proof the absence is a missing write, the safe default is to NOT push it back. Tradeoff: a real missing write may go unrepaired until a tombstone-aware path exists, but we never raise back deleted data. (c) Over-replication trim no longer resurrects needles or removes the wrong replica. The pre-delete sync now runs read-only (divergence check only) instead of writing the doomed replica's needles into the survivor. pickOneReplicaToDelete only ever removes the smallest of multiple healthy writable replicas; it refuses the trim when doing so would leave only read-only/integrity-flagged survivors, since file_count>0 alone cannot prove the survivor's .dat is readable. (d) Incomplete-volume (.note) cleanup keeps the shared .vif when an .ecx for the same vid coexists on the disk, so removing an interrupted regular copy cannot strip a coexisting EC volume's info file. VolumeCopy now surfaces .note write/remove errors instead of ignoring them. In the Rust volume server (where a persisting note is actually reachable) the .note check moves below the empty-stub sweep and EC validation, keeps the .vif on EC coexistence, and the mount path fails when a .note still persists. * shell: scope the over-replication writable-survivor guard to the trim path only The writable-survivor guard (never trim down to a read-only survivor) lived inside the shared pickOneReplicaToDelete, so it also gated the misplaced-volume relocation via pickOneMisplacedVolume -- a misplaced read-only volume (e.g. a full one) would silently stop being rebalanced. Extract pickSmallestReplica for the relocation path (which deletes-and-recreates and must act on read-only replicas), and keep the writable-survivor guard only in pickOneReplicaToDelete used by the over-replication trim. * seaweed-volume: recompute keep_vif after invalid-EC cleanup in the .note path keep_vif used the pre-validation ecx_exists snapshot, so when the EC-validation step above removed the invalid .ecx/shards, the .note cleanup still preserved a now-orphaned .vif. Re-check .ecx existence at cleanup time, matching the Go hasEcxFile re-check. * shell: keep placement when picking an over-replication victim to delete The trim picked the smallest writable replica without regard to placement, so it could delete the only replica in a required failure domain (e.g. with "100" and replicas dc1 + two in dc2, deleting dc1 leaves both survivors in dc2). Prefer a writable replica whose removal still satisfies placement, falling back to the smallest writable only when none does.
743 lines
24 KiB
Go
743 lines
24 KiB
Go
package shell
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"slices"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
)
|
|
|
|
func init() {
|
|
Commands = append(Commands, &commandVolumeFixReplication{})
|
|
}
|
|
|
|
type commandVolumeFixReplication struct {
|
|
collectionPattern *string
|
|
// TODO: move parameter flags here so we don't shuffle them around via function calls.
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) Name() string {
|
|
return "volume.fix.replication"
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) Help() string {
|
|
return `add or remove replicas to volumes that are missing replicas or over-replicated
|
|
|
|
This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
|
|
|
|
This command also finds all under-replicated volumes, and finds volume servers with free slots.
|
|
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
|
|
|
|
volume.fix.replication # do not take action
|
|
volume.fix.replication -apply # actually deleting or copying the volume files and mount the volume
|
|
volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
|
|
|
|
Note:
|
|
* each time this will only add back one replica for each volume id that is under replicated.
|
|
If there are multiple replicas are missing, e.g. replica count is > 2, you may need to run this multiple times.
|
|
* do not run this too quickly within seconds, since the new volume replica may take a few seconds
|
|
to register itself to the master.
|
|
|
|
`
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) HasTag(tag CommandTag) bool {
|
|
return false && tag == ResourceHeavy // resource intensive only when deleting and checking with replicas.
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
|
|
|
|
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
|
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
|
|
applyChanges := volFixReplicationCommand.Bool("apply", false, "apply the fix")
|
|
// TODO: remove this alias
|
|
applyChangesAlias := volFixReplicationCommand.Bool("force", false, "apply the fix (alias for -apply)")
|
|
verbose := volFixReplicationCommand.Bool("verbose", false, "show volumes being checked and their statuses")
|
|
doDelete := volFixReplicationCommand.Bool("doDelete", true, "Also delete over-replicated volumes besides fixing under-replication")
|
|
doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting")
|
|
maxParallelization := volFixReplicationCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
|
|
retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry")
|
|
volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
|
|
|
|
if err = volFixReplicationCommand.Parse(args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
handleDeprecatedForceFlag(writer, volFixReplicationCommand, applyChangesAlias, applyChanges)
|
|
infoAboutSimulationMode(writer, *applyChanges, "-apply")
|
|
commandEnv.noLock = !*applyChanges
|
|
|
|
if err = commandEnv.confirmIsLocked(args); *applyChanges && err != nil {
|
|
return
|
|
}
|
|
|
|
ewg := NewErrorWaitGroup(*maxParallelization)
|
|
underReplicatedVolumeIdsCount := 1
|
|
for underReplicatedVolumeIdsCount > 0 {
|
|
fixedVolumeReplicas := map[string]int{}
|
|
|
|
// collect topology information
|
|
if *verbose {
|
|
fmt.Fprintf(writer, "wait 15 seconds and then collect topology information...\n")
|
|
}
|
|
topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// find all volumes that needs replication
|
|
// collect all data nodes
|
|
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
|
|
|
|
if *verbose {
|
|
fmt.Fprintf(writer, "collected topology: %d locations, %d volumes to check\n", len(allLocations), len(volumeReplicas))
|
|
}
|
|
|
|
if len(allLocations) == 0 {
|
|
return fmt.Errorf("no data nodes at all")
|
|
}
|
|
|
|
// find all under replicated volumes
|
|
var underReplicatedVolumeIds, overReplicatedVolumeIds, misplacedVolumeIds []uint32
|
|
for vid, replicas := range volumeReplicas {
|
|
replica := replicas[0]
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
|
|
|
|
// build locations list for optional verbose output
|
|
locations := make([]string, 0, len(replicas))
|
|
for _, r := range replicas {
|
|
locations = append(locations, r.location.String())
|
|
}
|
|
|
|
if *verbose {
|
|
fmt.Fprintf(writer, "checking volume %d replication %s has %d replicas [%s]\n", replica.info.Id, replicaPlacement, len(replicas), strings.Join(locations, ", "))
|
|
}
|
|
|
|
switch {
|
|
case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas):
|
|
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
|
|
fmt.Fprintf(writer, "volume %d replication %s, but under replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
|
|
case isMisplaced(replicas, replicaPlacement):
|
|
misplacedVolumeIds = append(misplacedVolumeIds, vid)
|
|
fmt.Fprintf(writer, "volume %d replication %s is not well placed [%s]\n", replica.info.Id, replicaPlacement, strings.Join(locations, ", "))
|
|
case replicaPlacement.GetCopyCount() < len(replicas):
|
|
overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
|
|
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
|
|
}
|
|
}
|
|
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
|
|
|
|
if !commandEnv.isLocked() {
|
|
return fmt.Errorf("lock is lost")
|
|
}
|
|
|
|
ewg.Reset()
|
|
ewg.Add(func() error {
|
|
// find the most underpopulated data nodes
|
|
fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
|
|
return err
|
|
})
|
|
if *doDelete {
|
|
ewg.Add(func() error {
|
|
return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete)
|
|
})
|
|
ewg.Add(func() error {
|
|
return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume)
|
|
})
|
|
}
|
|
if err := ewg.Wait(); err != nil {
|
|
return nil
|
|
}
|
|
|
|
if !*applyChanges {
|
|
break
|
|
}
|
|
|
|
// check that the topology has been updated
|
|
if len(fixedVolumeReplicas) > 0 {
|
|
fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
|
|
for k, _ := range fixedVolumeReplicas {
|
|
fixedVolumes = append(fixedVolumes, k)
|
|
}
|
|
volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, volumeIdLocation := range volumeIdLocations {
|
|
volumeId := volumeIdLocation.VolumeOrFileId
|
|
volumeIdLocationCount := len(volumeIdLocation.Locations)
|
|
i := 0
|
|
for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
|
|
fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
|
|
time.Sleep(time.Duration(i+1) * time.Second * 7)
|
|
volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
volumeIdLocationCount = len(volumeLocIds[0].Locations)
|
|
if *retryCount <= i {
|
|
return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
|
|
}
|
|
i += 1
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
|
|
volumeReplicas := make(map[uint32][]*VolumeReplica)
|
|
var allLocations []location
|
|
eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
|
loc := newLocation(string(dc), string(rack), dn)
|
|
for _, diskInfo := range dn.DiskInfos {
|
|
for _, v := range diskInfo.VolumeInfos {
|
|
volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
|
|
location: &loc,
|
|
info: v,
|
|
})
|
|
}
|
|
}
|
|
allLocations = append(allLocations, loc)
|
|
})
|
|
return volumeReplicas, allLocations
|
|
}
|
|
|
|
type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica
|
|
|
|
// checkOneVolume compares the index of replica a against b. With
|
|
// applyChanges=false it is a read-only divergence check; the over-replication
|
|
// trim must use that mode so it does not push the soon-to-be-deleted replica's
|
|
// needles into the survivor (which would resurrect data and is the opposite of
|
|
// a safe trim).
|
|
func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, commandEnv *CommandEnv, applyChanges bool) (err error) {
|
|
aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
|
|
defer func() {
|
|
aDB.Close()
|
|
bDB.Close()
|
|
}()
|
|
|
|
vcd := &volumeCheckDisk{
|
|
writer: writer,
|
|
commandEnv: commandEnv,
|
|
now: time.Now(),
|
|
|
|
verbose: false,
|
|
applyChanges: applyChanges,
|
|
syncDeletions: false,
|
|
nonRepairThreshold: float64(1),
|
|
}
|
|
|
|
// read index db
|
|
if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil {
|
|
return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
|
|
}
|
|
if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil {
|
|
return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
|
|
}
|
|
if _, err = vcd.doVolumeCheckDisk(aDB, bDB, a, b); err != nil {
|
|
return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error {
|
|
if len(volumeIds) == 0 {
|
|
// nothing to do
|
|
return nil
|
|
}
|
|
|
|
for _, vid := range volumeIds {
|
|
replicas := volumeReplicas[vid]
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
|
|
|
|
replica := selectOneVolumeFn(replicas, replicaPlacement)
|
|
if replica == nil {
|
|
fmt.Fprintf(writer, "skip trimming volume %d: no safe replica to delete (would leave only read-only survivors)\n", vid)
|
|
continue
|
|
}
|
|
|
|
// check collection name pattern
|
|
if *c.collectionPattern != "" {
|
|
var matched bool
|
|
if *c.collectionPattern == CollectionDefault {
|
|
matched = replica.info.Collection == ""
|
|
} else {
|
|
var err error
|
|
matched, err = filepath.Match(*c.collectionPattern, replica.info.Collection)
|
|
if err != nil {
|
|
return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
|
|
}
|
|
}
|
|
if !matched {
|
|
continue
|
|
}
|
|
}
|
|
|
|
collectionIsMismatch := false
|
|
for _, volumeReplica := range replicas {
|
|
if volumeReplica.info.Collection != replica.info.Collection {
|
|
fmt.Fprintf(writer, "skip delete volume %d as collection %s is mismatch: %s\n", replica.info.Id, replica.info.Collection, volumeReplica.info.Collection)
|
|
collectionIsMismatch = true
|
|
}
|
|
}
|
|
if collectionIsMismatch {
|
|
continue
|
|
}
|
|
|
|
fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
|
|
|
|
if !applyChanges {
|
|
break
|
|
}
|
|
|
|
if doCheck {
|
|
var checkErr error
|
|
for _, replicaB := range replicas {
|
|
if replicaB.location.dataNode == replica.location.dataNode {
|
|
continue
|
|
}
|
|
// Read-only divergence check only: never write the doomed
|
|
// replica's needles into a survivor while trimming.
|
|
if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv, false); checkErr != nil {
|
|
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, checkErr)
|
|
break
|
|
}
|
|
}
|
|
if checkErr != nil {
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Surplus replica being trimmed; keep the remote object since other
|
|
// replicas of the same .vif still reference it.
|
|
if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id),
|
|
pb.NewServerAddressFromDataNode(replica.location.dataNode), false, true); err != nil {
|
|
fmt.Fprintf(writer, "deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) {
|
|
fixedVolumes = map[string]int{}
|
|
|
|
if len(volumeIds) == 0 {
|
|
return fixedVolumes, nil
|
|
}
|
|
|
|
if len(volumeIds) > volumesPerStep && volumesPerStep > 0 {
|
|
volumeIds = volumeIds[0:volumesPerStep]
|
|
}
|
|
for _, vid := range volumeIds {
|
|
for i := 0; i < retryCount+1; i++ {
|
|
var copied bool
|
|
if copied, err = c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil {
|
|
if applyChanges && copied {
|
|
fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
|
|
}
|
|
break
|
|
} else {
|
|
fmt.Fprintf(writer, "fixing under replicated volume %d: %v\n", vid, err)
|
|
}
|
|
}
|
|
}
|
|
return fixedVolumes, nil
|
|
}
|
|
|
|
func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) (bool, error) {
|
|
replicas := volumeReplicas[vid]
|
|
replica := pickOneReplicaToCopyFrom(replicas)
|
|
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
|
|
foundNewLocation := false
|
|
hasSkippedCollection := false
|
|
keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
|
|
fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
|
|
for _, dst := range allLocations {
|
|
// check whether data nodes satisfy the constraints
|
|
if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
|
|
// check collection name pattern
|
|
if *c.collectionPattern != "" {
|
|
var matched bool
|
|
if *c.collectionPattern == CollectionDefault {
|
|
matched = replica.info.Collection == ""
|
|
} else {
|
|
var err error
|
|
matched, err = filepath.Match(*c.collectionPattern, replica.info.Collection)
|
|
if err != nil {
|
|
return false, fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
|
|
}
|
|
}
|
|
if !matched {
|
|
hasSkippedCollection = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// ask the volume server to replicate the volume
|
|
foundNewLocation = true
|
|
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
|
|
|
|
if !applyChanges {
|
|
// adjust volume count
|
|
addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
|
|
return true, nil
|
|
}
|
|
|
|
err := replicateVolumeToServer(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(replica.info.Id),
|
|
pb.NewServerAddressFromDataNode(replica.location.dataNode),
|
|
pb.NewServerAddressFromDataNode(dst.dataNode),
|
|
replica.info.DiskType)
|
|
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// adjust volume count
|
|
addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
if !foundNewLocation && !hasSkippedCollection {
|
|
fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func addVolumeCount(info *master_pb.DiskInfo, count int) {
|
|
if info == nil {
|
|
return
|
|
}
|
|
info.VolumeCount += int64(count)
|
|
info.FreeVolumeCount -= int64(count)
|
|
}
|
|
|
|
func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
|
|
fn := capacityByFreeVolumeCount(diskType)
|
|
slices.SortFunc(dataNodes, func(a, b location) int {
|
|
return int(fn(b.dataNode) - fn(a.dataNode))
|
|
})
|
|
}
|
|
|
|
func satisfyReplicaCurrentLocation(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica) bool {
|
|
existingDataCenters, existingRacks, _ := countReplicas(replicas)
|
|
|
|
if replicaPlacement.DiffDataCenterCount+1 > len(existingDataCenters) {
|
|
return false
|
|
}
|
|
if replicaPlacement.DiffRackCount+1 > len(existingRacks) {
|
|
return false
|
|
}
|
|
if replicaPlacement.SameRackCount > 0 {
|
|
foundSatisfyRack := false
|
|
for _, rackCount := range existingRacks {
|
|
if rackCount >= replicaPlacement.SameRackCount+1 {
|
|
foundSatisfyRack = true
|
|
}
|
|
}
|
|
return foundSatisfyRack
|
|
}
|
|
return true
|
|
}
|
|
|
|
/*
|
|
if on an existing data node {
|
|
return false
|
|
}
|
|
|
|
if different from existing dcs {
|
|
if lack on different dcs {
|
|
return true
|
|
}else{
|
|
return false
|
|
}
|
|
}
|
|
|
|
if not on primary dc {
|
|
return false
|
|
}
|
|
|
|
if different from existing racks {
|
|
if lack on different racks {
|
|
return true
|
|
}else{
|
|
return false
|
|
}
|
|
}
|
|
|
|
if not on primary rack {
|
|
return false
|
|
}
|
|
|
|
if lacks on same rack {
|
|
return true
|
|
} else {
|
|
|
|
return false
|
|
}
|
|
*/
|
|
func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
|
|
|
|
existingDataCenters, _, existingDataNodes := countReplicas(replicas)
|
|
|
|
if _, found := existingDataNodes[possibleLocation.String()]; found {
|
|
// avoid duplicated volume on the same data node
|
|
return false
|
|
}
|
|
|
|
primaryDataCenters, _ := findTopKeys(existingDataCenters)
|
|
|
|
// ensure data center count is within limit
|
|
if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
|
|
// different from existing dcs
|
|
if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
|
|
// lack on different dcs
|
|
return true
|
|
} else {
|
|
// adding this would go over the different dcs limit
|
|
return false
|
|
}
|
|
}
|
|
// now this is same as one of the existing data center
|
|
if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
|
|
// not on one of the primary dcs
|
|
return false
|
|
}
|
|
|
|
// now this is one of the primary dcs
|
|
primaryDcRacks := make(map[string]int)
|
|
for _, replica := range replicas {
|
|
if replica.location.DataCenter() != possibleLocation.DataCenter() {
|
|
continue
|
|
}
|
|
primaryDcRacks[replica.location.Rack()] += 1
|
|
}
|
|
primaryRacks, _ := findTopKeys(primaryDcRacks)
|
|
sameRackCount := primaryDcRacks[possibleLocation.Rack()]
|
|
|
|
// ensure rack count is within limit
|
|
if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
|
|
// different from existing racks
|
|
if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
|
|
// lack on different racks
|
|
return true
|
|
} else {
|
|
// adding this would go over the different racks limit
|
|
return false
|
|
}
|
|
}
|
|
// now this is same as one of the existing racks
|
|
if !isAmong(possibleLocation.Rack(), primaryRacks) {
|
|
// not on the primary rack
|
|
return false
|
|
}
|
|
|
|
// now this is on the primary rack
|
|
|
|
// different from existing data nodes
|
|
if sameRackCount < replicaPlacement.SameRackCount+1 {
|
|
// lack on same rack
|
|
return true
|
|
} else {
|
|
// adding this would go over the same data node limit
|
|
return false
|
|
}
|
|
|
|
}
|
|
|
|
func findTopKeys(m map[string]int) (topKeys []string, max int) {
|
|
for k, c := range m {
|
|
if max < c {
|
|
topKeys = topKeys[:0]
|
|
topKeys = append(topKeys, k)
|
|
max = c
|
|
} else if max == c {
|
|
topKeys = append(topKeys, k)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func isAmong(key string, keys []string) bool {
|
|
for _, k := range keys {
|
|
if k == key {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type VolumeReplica struct {
|
|
location *location
|
|
info *master_pb.VolumeInformationMessage
|
|
}
|
|
|
|
type location struct {
|
|
dc string
|
|
rack string
|
|
dataNode *master_pb.DataNodeInfo
|
|
}
|
|
|
|
func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
|
|
return location{
|
|
dc: dc,
|
|
rack: rack,
|
|
dataNode: dataNode,
|
|
}
|
|
}
|
|
|
|
func (l location) String() string {
|
|
return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
|
|
}
|
|
|
|
func (l location) Rack() string {
|
|
return fmt.Sprintf("%s %s", l.dc, l.rack)
|
|
}
|
|
|
|
func (l location) DataCenter() string {
|
|
return l.dc
|
|
}
|
|
|
|
func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
|
|
mostRecent := replicas[0]
|
|
for _, replica := range replicas {
|
|
if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
|
|
mostRecent = replica
|
|
}
|
|
}
|
|
return mostRecent
|
|
}
|
|
|
|
func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
|
|
diffDc = make(map[string]int)
|
|
diffRack = make(map[string]int)
|
|
diffNode = make(map[string]int)
|
|
for _, replica := range replicas {
|
|
diffDc[replica.location.DataCenter()] += 1
|
|
diffRack[replica.location.Rack()] += 1
|
|
diffNode[replica.location.String()] += 1
|
|
}
|
|
return
|
|
}
|
|
|
|
// pickOneReplicaToDelete selects the replica to trim when over-replicated.
|
|
// It only ever removes the smallest of multiple healthy writable replicas: a
|
|
// ReadOnly/integrity-flagged replica is never chosen for deletion, and the
|
|
// trim is refused (returns nil) when removing a writable replica would leave
|
|
// only ReadOnly survivors. VolumeStatus file_count>0 alone cannot prove the
|
|
// survivors' .dat is readable, so we do not over-claim survivor health.
|
|
// pickSmallestReplica returns the smallest replica (ties broken by oldest then
|
|
// lowest compact revision), or nil for an empty set.
|
|
func pickSmallestReplica(replicas []*VolumeReplica) *VolumeReplica {
|
|
if len(replicas) == 0 {
|
|
return nil
|
|
}
|
|
sorted := slices.Clone(replicas)
|
|
slices.SortFunc(sorted, func(a, b *VolumeReplica) int {
|
|
if a.info.Size != b.info.Size {
|
|
return int(a.info.Size - b.info.Size)
|
|
}
|
|
if a.info.ModifiedAtSecond != b.info.ModifiedAtSecond {
|
|
return int(a.info.ModifiedAtSecond - b.info.ModifiedAtSecond)
|
|
}
|
|
if a.info.CompactRevision != b.info.CompactRevision {
|
|
return int(a.info.CompactRevision - b.info.CompactRevision)
|
|
}
|
|
return 0
|
|
})
|
|
return sorted[0]
|
|
}
|
|
|
|
func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
|
|
// Over-replication trim: only ever remove a writable replica, and only
|
|
// when another writable one survives, so a healthy copy is never deleted
|
|
// down to a read-only (e.g. full or integrity-flagged) survivor.
|
|
var writable []*VolumeReplica
|
|
for _, r := range replicas {
|
|
if !r.info.ReadOnly {
|
|
writable = append(writable, r)
|
|
}
|
|
}
|
|
if len(writable) < 2 {
|
|
return nil
|
|
}
|
|
// Prefer a writable replica whose removal still satisfies placement, so the
|
|
// trim does not strip the only replica in a required failure domain. Fall
|
|
// back to the smallest writable if none keeps placement (a later misplaced
|
|
// cycle then re-balances).
|
|
var placementSafe []*VolumeReplica
|
|
for i, r := range replicas {
|
|
if r.info.ReadOnly {
|
|
continue
|
|
}
|
|
if !isMisplaced(otherThan(replicas, i), replicaPlacement) {
|
|
placementSafe = append(placementSafe, r)
|
|
}
|
|
}
|
|
if len(placementSafe) > 0 {
|
|
return pickSmallestReplica(placementSafe)
|
|
}
|
|
return pickSmallestReplica(writable)
|
|
}
|
|
|
|
// check and fix misplaced volumes
|
|
|
|
func isMisplaced(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) bool {
|
|
|
|
for i := 0; i < len(replicas); i++ {
|
|
others := otherThan(replicas, i)
|
|
if !satisfyReplicaPlacement(replicaPlacement, others, *replicas[i].location) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
func otherThan(replicas []*VolumeReplica, index int) (others []*VolumeReplica) {
|
|
for i := 0; i < len(replicas); i++ {
|
|
if index != i {
|
|
others = append(others, replicas[i])
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func pickOneMisplacedVolume(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) (toDelete *VolumeReplica) {
|
|
|
|
// Relocation, not over-replication: pick the smallest replica to delete
|
|
// and recreate at a correct placement. Unlike the trim this must still act
|
|
// on read-only replicas (e.g. a full but misplaced volume), so it does not
|
|
// use pickOneReplicaToDelete's writable-survivor guard.
|
|
var deletionCandidates []*VolumeReplica
|
|
for i := 0; i < len(replicas); i++ {
|
|
others := otherThan(replicas, i)
|
|
if !isMisplaced(others, replicaPlacement) {
|
|
deletionCandidates = append(deletionCandidates, replicas[i])
|
|
}
|
|
}
|
|
if toDelete = pickSmallestReplica(deletionCandidates); toDelete != nil {
|
|
return toDelete
|
|
}
|
|
|
|
return pickSmallestReplica(replicas)
|
|
|
|
}
|