Files
seaweedfs/weed/shell/command_volume_scrub.go
T
Lisandro Pin 6b4d20a6f3 volume.scrub and ec.scrub shell commands: make the display of scrub details optional. (#9911)
On volumes failing scrubs, the detail output can get very verbose, which makes
reading results difficult. Most users won't care about this information to
begin with - just whether or not volumes pass scrub tests.

This MR gates the display of scrub result details behind a `--details` flag.
2026-06-10 13:29:07 -07:00

168 lines
5.1 KiB
Go

package shell
import (
"context"
"flag"
"fmt"
"io"
"strconv"
"strings"
"sync"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
)
func init() {
Commands = append(Commands, &commandVolumeScrub{})
}
type commandVolumeScrub struct {
env *CommandEnv
volumeServerAddrs []pb.ServerAddress
volumeIDs []uint32
mode volume_server_pb.VolumeScrubMode
grpcDialOption grpc.DialOption
}
func (c *commandVolumeScrub) Name() string {
return "volume.scrub"
}
func (c *commandVolumeScrub) Help() string {
return `scrubs volume contents on volume servers.
Supports either scrubbing only needle data, or deep scrubbing file contents as well.
Scrubbing can be limited to specific volume IDs for specific volume servers.
By default, all volume IDs across all servers are processed.
`
}
func (c *commandVolumeScrub) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
volScrubCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server <host>:<port> (optional)")
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated volume IDs to process (optional)")
mode := volScrubCommand.String("mode", "full", "scrubbing mode (index/local/full)")
markBrokenReadonly := volScrubCommand.Bool("markBrokenReadonly", false, "whether to flag volumes with scrub failures as read-only")
maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
showDetails := volScrubCommand.Bool("details", false, "display scrub result details, if available")
if err = volScrubCommand.Parse(args); err != nil {
return err
}
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
c.volumeServerAddrs = []pb.ServerAddress{}
if *nodesStr != "" {
for _, addr := range strings.Split(*nodesStr, ",") {
c.volumeServerAddrs = append(c.volumeServerAddrs, pb.ServerAddress(addr))
}
} else {
dns, err := collectDataNodes(commandEnv, 0)
if err != nil {
return err
}
for _, dn := range dns {
c.volumeServerAddrs = append(c.volumeServerAddrs, pb.ServerAddress(dn.Address))
}
}
c.volumeIDs = []uint32{}
if *volumeIDsStr != "" {
for _, vids := range strings.Split(*volumeIDsStr, ",") {
vids = strings.TrimSpace(vids)
if vids == "" {
continue
}
if vid, err := strconv.ParseUint(vids, 10, 32); err == nil {
c.volumeIDs = append(c.volumeIDs, uint32(vid))
} else {
return fmt.Errorf("invalid volume ID %q", vids)
}
}
}
switch strings.ToUpper(*mode) {
case "INDEX":
c.mode = volume_server_pb.VolumeScrubMode_INDEX
case "LOCAL":
c.mode = volume_server_pb.VolumeScrubMode_LOCAL
case "FULL":
c.mode = volume_server_pb.VolumeScrubMode_FULL
default:
return fmt.Errorf("unsupported scrubbing mode %q", *mode)
}
fmt.Fprintf(writer, "using %s mode\n", c.mode.String())
c.env = commandEnv
return c.scrubVolumes(writer, *maxParallelization, *markBrokenReadonly, *showDetails)
}
func (c *commandVolumeScrub) scrubVolumes(writer io.Writer, maxParallelization int, markBrokenReadonly bool, showDetails bool) error {
var brokenVolumesStr []string
var details []string
var totalVolumes, brokenVolumes, totalFiles uint64
var mu sync.Mutex
ewg := NewErrorWaitGroup(maxParallelization)
count := 0
for _, addr := range c.volumeServerAddrs {
ewg.Add(func() error {
mu.Lock()
count++
fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), count, len(c.volumeServerAddrs))
mu.Unlock()
err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
res, err := volumeServerClient.ScrubVolume(context.Background(), &volume_server_pb.ScrubVolumeRequest{
Mode: c.mode,
VolumeIds: c.volumeIDs,
MarkBrokenVolumesReadonly: markBrokenReadonly,
})
if err != nil {
return err
}
mu.Lock()
defer mu.Unlock()
totalVolumes += res.GetTotalVolumes()
totalFiles += res.GetTotalFiles()
brokenVolumes += uint64(len(res.GetBrokenVolumeIds()))
for _, d := range res.GetDetails() {
details = append(details, fmt.Sprintf("[%s] %s", addr, d))
}
for _, vid := range res.GetBrokenVolumeIds() {
brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid))
}
return nil
})
return err
})
}
if err := ewg.Wait(); err != nil {
return err
}
fmt.Fprintf(writer, "Scrubbed %d files and %d volumes on %d nodes\n", totalFiles, totalVolumes, len(c.volumeServerAddrs))
if brokenVolumes != 0 {
fmt.Fprintf(writer, "\nGot scrub failures on %d volumes :(\n", brokenVolumes)
fmt.Fprintf(writer, "Affected volumes: %s\n", strings.Join(brokenVolumesStr, ", "))
if showDetails && len(details) != 0 {
fmt.Fprintf(writer, "Details:\n\t%s\n", strings.Join(details, "\n\t"))
}
}
return nil
}