diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 727ee0845..2e6756e4f 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -320,6 +320,18 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m bestReplicas[vid] = bestLoc } + // Re-attempt the orphan sweep on the nodes skipped as unreachable, now that + // any node that recovered during readonly-marking and replica sync answers + // again. A node whose teardown now succeeds is clean (and the generation host + // re-wipes its own disks regardless), so it leaves the skipped set and can be + // a balance source/target — otherwise its shards would never distribute off + // it. A node that is still down stays skipped and excluded, preserving the + // leniency for a genuinely-down node; such a node also cannot be the + // generation host below, since VolumeEcShardsGenerate would fail to read .dat. + if err := resweepSkippedNodes(commandEnv, skippedNodes, volumeIds, volumeIdToCollection, maxParallelization); err != nil { + return nil, err + } + // generate ec shards using the best replica for each volume ewg.Reset() for _, vid := range volumeIds { @@ -452,6 +464,75 @@ func clearPreexistingEcShards(commandEnv *CommandEnv, topologyInfo *master_pb.To return skipped, nil } +// resweepSkippedNodes re-attempts the orphan teardown on the nodes that the +// initial sweep skipped as unreachable, just before shard generation. A node +// that recovered in the meantime — and is therefore eligible to host this +// encode's generation — has its teardown retried; if it now fully succeeds it is +// removed from skipped so the rebalance can use it as a source and move its +// shards off, instead of stranding all shards on the single generation host and +// collapsing fault tolerance. A node still transport-down stays skipped (the +// same leniency the initial sweep grants), and a node that came back reachable +// but whose delete genuinely failed is fatal, exactly as in the initial sweep, +// so a stale generation is never silently left behind. Mutates skipped in place. +func resweepSkippedNodes(commandEnv *CommandEnv, skipped map[pb.ServerAddress]struct{}, volumeIds []needle.VolumeId, volumeIdToCollection map[needle.VolumeId]string, maxParallelization int) error { + if len(skipped) == 0 { + return nil + } + + allShardIds := make([]erasure_coding.ShardId, erasure_coding.MaxShardCount) + for i := range allShardIds { + allShardIds[i] = erasure_coding.ShardId(i) + } + + addrs := make([]pb.ServerAddress, 0, len(skipped)) + for addr := range skipped { + addrs = append(addrs, addr) + } + + fmt.Printf("re-checking %d node(s) skipped by the orphan sweep before generating shards...\n", len(addrs)) + + // A node still down on every retried vid stays skipped; one that fully + // succeeds is un-skipped. Track per-node whether any retry still failed + // (down) so a node whose state is mixed across vids never gets un-skipped. + stillDown := make(map[pb.ServerAddress]struct{}) + var mu sync.Mutex + ewg := NewErrorWaitGroup(maxParallelization) + for _, addr := range addrs { + for _, vid := range volumeIds { + collection := volumeIdToCollection[vid] + ewg.Add(func() error { + if err := unmountAndDeleteEcShardsQuiet(commandEnv.option.GrpcDialOption, collection, vid, addr, allShardIds); err != nil { + // Same decision as the initial sweep: a reachable node whose delete + // genuinely failed (or did not ack a full teardown, or whose liveness is + // inconclusive) is fatal, since it could hold an orphan a later copy + // re-stamps into this generation. Only a node still transport-down stays + // skipped. + if errors.Is(err, errFullTeardownNotAcked) || !isNodeUnreachable(err) || + classifyNodeLiveness(pingVolumeServer(commandEnv.option.GrpcDialOption, addr)) != nodeDown { + return fmt.Errorf("re-clear stale ec shards for volume %d on %s: %w", vid, addr, err) + } + glog.V(1).Infof("orphan re-sweep: volume %d on %s still skipped (unreachable): %v", vid, addr, err) + mu.Lock() + stillDown[addr] = struct{}{} + mu.Unlock() + } + return nil + }) + } + } + if err := ewg.Wait(); err != nil { + return err + } + + for _, addr := range addrs { + if _, down := stillDown[addr]; !down { + delete(skipped, addr) + glog.V(0).Infof("orphan re-sweep: node %s recovered and was cleaned; it will participate in the EC rebalance", addr) + } + } + return nil +} + // isNodeUnreachable reports whether err means the volume server could not be // reached at all, as opposed to an RPC that reached the node and failed. Only an // unreachable node is safe to skip in the orphan sweep. A dead peer surfaces as