mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-14 23:46:39 +03:00
0e9fc6c5ba
The dedicated ec_balance task worker handles EC shard balancing now, so the periodic admin script no longer needs to run it.
673 lines
19 KiB
Go
673 lines
19 KiB
Go
package pluginworker
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/shell"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
const (
|
|
adminScriptJobType = "admin_script"
|
|
maxAdminScriptOutputBytes = 16 * 1024
|
|
defaultAdminScriptRunMins = 17
|
|
adminScriptDetectTickMinutes = 17
|
|
)
|
|
|
|
const defaultAdminScript = `fs.log.purge -daysAgo=7
|
|
volume.deleteEmpty -quietFor=24h -apply
|
|
volume.fix.replication -apply
|
|
s3.clean.uploads -timeAgo=24h`
|
|
|
|
var adminScriptTokenRegex = regexp.MustCompile(`'.*?'|".*?"|\S+`)
|
|
|
|
func init() {
|
|
RegisterHandler(HandlerFactory{
|
|
JobType: "admin_script",
|
|
Category: CategoryDefault,
|
|
Aliases: []string{"admin-script", "admin.script", "script", "admin"},
|
|
Build: func(opts HandlerBuildOptions) (JobHandler, error) {
|
|
return NewAdminScriptHandler(opts.GrpcDialOption), nil
|
|
},
|
|
})
|
|
}
|
|
|
|
type AdminScriptHandler struct {
|
|
grpcDialOption grpc.DialOption
|
|
}
|
|
|
|
func NewAdminScriptHandler(grpcDialOption grpc.DialOption) *AdminScriptHandler {
|
|
return &AdminScriptHandler{grpcDialOption: grpcDialOption}
|
|
}
|
|
|
|
func (h *AdminScriptHandler) Capability() *plugin_pb.JobTypeCapability {
|
|
return &plugin_pb.JobTypeCapability{
|
|
JobType: adminScriptJobType,
|
|
CanDetect: true,
|
|
CanExecute: true,
|
|
MaxDetectionConcurrency: 1,
|
|
MaxExecutionConcurrency: 1,
|
|
DisplayName: "Admin Script",
|
|
Description: "Execute custom admin shell scripts",
|
|
Weight: 20,
|
|
}
|
|
}
|
|
|
|
func (h *AdminScriptHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
|
return &plugin_pb.JobTypeDescriptor{
|
|
JobType: adminScriptJobType,
|
|
DisplayName: "Admin Script",
|
|
Description: "Run custom admin shell scripts not covered by built-in job types",
|
|
Icon: "fas fa-terminal",
|
|
DescriptorVersion: 1,
|
|
AdminConfigForm: &plugin_pb.ConfigForm{
|
|
FormId: "admin-script-admin",
|
|
Title: "Admin Script Configuration",
|
|
Description: "Define the admin shell script to execute.",
|
|
Sections: []*plugin_pb.ConfigSection{
|
|
{
|
|
SectionId: "script",
|
|
Title: "Script",
|
|
Description: "Commands run sequentially by the admin script worker.",
|
|
Fields: []*plugin_pb.ConfigField{
|
|
{
|
|
Name: "script",
|
|
Label: "Script",
|
|
Description: "Admin shell commands to execute (one per line).",
|
|
HelpText: "Lock/unlock are handled by the admin server; omit explicit lock/unlock commands.",
|
|
Placeholder: "volume.balance -apply\nvolume.fix.replication -apply",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXTAREA,
|
|
Required: true,
|
|
},
|
|
{
|
|
Name: "run_interval_minutes",
|
|
Label: "Run Interval (minutes)",
|
|
Description: "Minimum interval between successful admin script runs.",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
|
Required: true,
|
|
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
DefaultValues: map[string]*plugin_pb.ConfigValue{
|
|
"script": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultAdminScript},
|
|
},
|
|
"run_interval_minutes": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultAdminScriptRunMins},
|
|
},
|
|
},
|
|
},
|
|
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
|
|
Enabled: true,
|
|
DetectionIntervalMinutes: adminScriptDetectTickMinutes,
|
|
DetectionTimeoutSeconds: 300,
|
|
MaxJobsPerDetection: 1,
|
|
GlobalExecutionConcurrency: 1,
|
|
PerWorkerExecutionConcurrency: 1,
|
|
RetryLimit: 0,
|
|
RetryBackoffSeconds: 30,
|
|
JobTypeMaxRuntimeSeconds: 1800,
|
|
ExecutionTimeoutSeconds: 1800,
|
|
},
|
|
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{},
|
|
}
|
|
}
|
|
|
|
func (h *AdminScriptHandler) Detect(ctx context.Context, request *plugin_pb.RunDetectionRequest, sender DetectionSender) error {
|
|
if request == nil {
|
|
return fmt.Errorf("run detection request is nil")
|
|
}
|
|
if sender == nil {
|
|
return fmt.Errorf("detection sender is nil")
|
|
}
|
|
if request.JobType != "" && request.JobType != adminScriptJobType {
|
|
return fmt.Errorf("job type %q is not handled by admin_script worker", request.JobType)
|
|
}
|
|
|
|
script := normalizeAdminScript(ReadStringConfig(request.GetAdminConfigValues(), "script", ""))
|
|
scriptName := strings.TrimSpace(ReadStringConfig(request.GetAdminConfigValues(), "script_name", ""))
|
|
runIntervalMinutes := readAdminScriptRunIntervalMinutes(request.GetAdminConfigValues())
|
|
if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), runIntervalMinutes*60) {
|
|
_ = sender.SendActivity(BuildDetectorActivity(
|
|
"skipped_by_interval",
|
|
fmt.Sprintf("ADMIN SCRIPT: Detection skipped due to run interval (%dm)", runIntervalMinutes),
|
|
map[string]*plugin_pb.ConfigValue{
|
|
"run_interval_minutes": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(runIntervalMinutes)},
|
|
},
|
|
},
|
|
))
|
|
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
|
|
JobType: adminScriptJobType,
|
|
Proposals: []*plugin_pb.JobProposal{},
|
|
HasMore: false,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return sender.SendComplete(&plugin_pb.DetectionComplete{
|
|
JobType: adminScriptJobType,
|
|
Success: true,
|
|
TotalProposals: 0,
|
|
})
|
|
}
|
|
|
|
commands := parseAdminScriptCommands(script)
|
|
execCount := countExecutableCommands(commands)
|
|
if execCount == 0 {
|
|
_ = sender.SendActivity(BuildDetectorActivity(
|
|
"no_script",
|
|
"ADMIN SCRIPT: No executable commands configured",
|
|
map[string]*plugin_pb.ConfigValue{
|
|
"command_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(execCount)},
|
|
},
|
|
},
|
|
))
|
|
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
|
|
JobType: adminScriptJobType,
|
|
Proposals: []*plugin_pb.JobProposal{},
|
|
HasMore: false,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return sender.SendComplete(&plugin_pb.DetectionComplete{
|
|
JobType: adminScriptJobType,
|
|
Success: true,
|
|
TotalProposals: 0,
|
|
})
|
|
}
|
|
|
|
proposal := buildAdminScriptProposal(script, scriptName, execCount)
|
|
proposals := []*plugin_pb.JobProposal{proposal}
|
|
hasMore := false
|
|
maxResults := int(request.MaxResults)
|
|
if maxResults > 0 && len(proposals) > maxResults {
|
|
proposals = proposals[:maxResults]
|
|
hasMore = true
|
|
}
|
|
|
|
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
|
|
JobType: adminScriptJobType,
|
|
Proposals: proposals,
|
|
HasMore: hasMore,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return sender.SendComplete(&plugin_pb.DetectionComplete{
|
|
JobType: adminScriptJobType,
|
|
Success: true,
|
|
TotalProposals: 1,
|
|
})
|
|
}
|
|
|
|
func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequest, sender ExecutionSender) error {
|
|
if request == nil || request.Job == nil {
|
|
return fmt.Errorf("execute job request is nil")
|
|
}
|
|
if sender == nil {
|
|
return fmt.Errorf("execution sender is nil")
|
|
}
|
|
if request.Job.JobType != "" && request.Job.JobType != adminScriptJobType {
|
|
return fmt.Errorf("job type %q is not handled by admin_script worker", request.Job.JobType)
|
|
}
|
|
|
|
script := normalizeAdminScript(ReadStringConfig(request.Job.Parameters, "script", ""))
|
|
scriptName := strings.TrimSpace(ReadStringConfig(request.Job.Parameters, "script_name", ""))
|
|
if script == "" {
|
|
script = normalizeAdminScript(ReadStringConfig(request.GetAdminConfigValues(), "script", ""))
|
|
}
|
|
if scriptName == "" {
|
|
scriptName = strings.TrimSpace(ReadStringConfig(request.GetAdminConfigValues(), "script_name", ""))
|
|
}
|
|
|
|
commands := parseAdminScriptCommands(script)
|
|
execCommands := filterExecutableCommands(commands)
|
|
if len(execCommands) == 0 {
|
|
return sender.SendCompleted(&plugin_pb.JobCompleted{
|
|
Success: false,
|
|
ErrorMessage: "no executable admin script commands configured",
|
|
})
|
|
}
|
|
|
|
commandEnv, cancel, err := h.buildAdminScriptCommandEnv(ctx, request.ClusterContext)
|
|
if err != nil {
|
|
return sender.SendCompleted(&plugin_pb.JobCompleted{
|
|
Success: false,
|
|
ErrorMessage: err.Error(),
|
|
})
|
|
}
|
|
defer cancel()
|
|
|
|
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
|
|
ProgressPercent: 0,
|
|
Stage: "assigned",
|
|
Message: "admin script job accepted",
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("assigned", "admin script job accepted"),
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
output := &limitedBuffer{maxBytes: maxAdminScriptOutputBytes}
|
|
executed := 0
|
|
errorMessages := make([]string, 0)
|
|
executedCommands := make([]string, 0, len(execCommands))
|
|
|
|
for _, cmd := range execCommands {
|
|
if ctx.Err() != nil {
|
|
errorMessages = append(errorMessages, ctx.Err().Error())
|
|
break
|
|
}
|
|
|
|
commandLine := formatAdminScriptCommand(cmd)
|
|
executedCommands = append(executedCommands, commandLine)
|
|
_, _ = fmt.Fprintf(output, "$ %s\n", commandLine)
|
|
|
|
found := false
|
|
sendBroken := false
|
|
for _, command := range shell.Commands {
|
|
if command.Name() != cmd.Name {
|
|
continue
|
|
}
|
|
found = true
|
|
if err := command.Do(cmd.Args, commandEnv, output); err != nil {
|
|
msg := fmt.Sprintf("%s: %v", cmd.Name, err)
|
|
errorMessages = append(errorMessages, msg)
|
|
if sendErr := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
State: plugin_pb.JobState_JOB_STATE_RUNNING,
|
|
ProgressPercent: percentProgress(executed+1, len(execCommands)),
|
|
Stage: "error",
|
|
Message: msg,
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("error", msg),
|
|
},
|
|
}); sendErr != nil {
|
|
sendBroken = true
|
|
}
|
|
}
|
|
break
|
|
}
|
|
if sendBroken {
|
|
break
|
|
}
|
|
|
|
if !found {
|
|
msg := fmt.Sprintf("unknown admin command: %s", cmd.Name)
|
|
errorMessages = append(errorMessages, msg)
|
|
if sendErr := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
State: plugin_pb.JobState_JOB_STATE_RUNNING,
|
|
ProgressPercent: percentProgress(executed+1, len(execCommands)),
|
|
Stage: "error",
|
|
Message: msg,
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("error", msg),
|
|
},
|
|
}); sendErr != nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
executed++
|
|
progress := percentProgress(executed, len(execCommands))
|
|
if sendErr := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
State: plugin_pb.JobState_JOB_STATE_RUNNING,
|
|
ProgressPercent: progress,
|
|
Stage: "running",
|
|
Message: fmt.Sprintf("executed %d/%d command(s)", executed, len(execCommands)),
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("running", commandLine),
|
|
},
|
|
}); sendErr != nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
scriptHash := hashAdminScript(script)
|
|
resultSummary := fmt.Sprintf("admin script executed (%d command(s))", executed)
|
|
if scriptName != "" {
|
|
resultSummary = fmt.Sprintf("admin script %q executed (%d command(s))", scriptName, executed)
|
|
}
|
|
|
|
outputValues := map[string]*plugin_pb.ConfigValue{
|
|
"command_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(executed)},
|
|
},
|
|
"error_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(errorMessages))},
|
|
},
|
|
"script_hash": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptHash},
|
|
},
|
|
}
|
|
if scriptName != "" {
|
|
outputValues["script_name"] = &plugin_pb.ConfigValue{
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptName},
|
|
}
|
|
}
|
|
if len(executedCommands) > 0 {
|
|
outputValues["commands"] = &plugin_pb.ConfigValue{
|
|
Kind: &plugin_pb.ConfigValue_StringList{
|
|
StringList: &plugin_pb.StringList{Values: executedCommands},
|
|
},
|
|
}
|
|
}
|
|
if out := strings.TrimSpace(output.String()); out != "" {
|
|
outputValues["output"] = &plugin_pb.ConfigValue{
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: out},
|
|
}
|
|
}
|
|
if output.truncated {
|
|
outputValues["output_truncated"] = &plugin_pb.ConfigValue{
|
|
Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true},
|
|
}
|
|
}
|
|
|
|
success := len(errorMessages) == 0 && ctx.Err() == nil
|
|
errorMessage := ""
|
|
if !success {
|
|
errorMessage = strings.Join(errorMessages, "; ")
|
|
if ctx.Err() != nil {
|
|
if errorMessage == "" {
|
|
errorMessage = ctx.Err().Error()
|
|
} else {
|
|
errorMessage = fmt.Sprintf("%s; %s", errorMessage, ctx.Err().Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
return sender.SendCompleted(&plugin_pb.JobCompleted{
|
|
Success: success,
|
|
ErrorMessage: errorMessage,
|
|
Result: &plugin_pb.JobResult{
|
|
Summary: resultSummary,
|
|
OutputValues: outputValues,
|
|
},
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("completed", resultSummary),
|
|
},
|
|
CompletedAt: timestamppb.Now(),
|
|
})
|
|
}
|
|
|
|
func readAdminScriptRunIntervalMinutes(values map[string]*plugin_pb.ConfigValue) int {
|
|
runIntervalMinutes := int(ReadInt64Config(values, "run_interval_minutes", defaultAdminScriptRunMins))
|
|
if runIntervalMinutes <= 0 {
|
|
return defaultAdminScriptRunMins
|
|
}
|
|
return runIntervalMinutes
|
|
}
|
|
|
|
type adminScriptCommand struct {
|
|
Name string
|
|
Args []string
|
|
Raw string
|
|
}
|
|
|
|
func normalizeAdminScript(script string) string {
|
|
script = strings.ReplaceAll(script, "\r\n", "\n")
|
|
return strings.TrimSpace(script)
|
|
}
|
|
|
|
func parseAdminScriptCommands(script string) []adminScriptCommand {
|
|
script = normalizeAdminScript(script)
|
|
if script == "" {
|
|
return nil
|
|
}
|
|
lines := strings.Split(script, "\n")
|
|
commands := make([]adminScriptCommand, 0)
|
|
for _, line := range lines {
|
|
line = strings.TrimSpace(line)
|
|
if line == "" || strings.HasPrefix(line, "#") {
|
|
continue
|
|
}
|
|
for _, chunk := range strings.Split(line, ";") {
|
|
chunk = strings.TrimSpace(chunk)
|
|
if chunk == "" {
|
|
continue
|
|
}
|
|
parts := adminScriptTokenRegex.FindAllString(chunk, -1)
|
|
if len(parts) == 0 {
|
|
continue
|
|
}
|
|
args := make([]string, 0, len(parts)-1)
|
|
for _, arg := range parts[1:] {
|
|
args = append(args, strings.Trim(arg, "\"'"))
|
|
}
|
|
commands = append(commands, adminScriptCommand{
|
|
Name: strings.TrimSpace(parts[0]),
|
|
Args: args,
|
|
Raw: chunk,
|
|
})
|
|
}
|
|
}
|
|
return commands
|
|
}
|
|
|
|
func filterExecutableCommands(commands []adminScriptCommand) []adminScriptCommand {
|
|
exec := make([]adminScriptCommand, 0, len(commands))
|
|
for _, cmd := range commands {
|
|
if cmd.Name == "" {
|
|
continue
|
|
}
|
|
if isAdminScriptLockCommand(cmd.Name) {
|
|
continue
|
|
}
|
|
exec = append(exec, cmd)
|
|
}
|
|
return exec
|
|
}
|
|
|
|
func countExecutableCommands(commands []adminScriptCommand) int {
|
|
count := 0
|
|
for _, cmd := range commands {
|
|
if cmd.Name == "" {
|
|
continue
|
|
}
|
|
if isAdminScriptLockCommand(cmd.Name) {
|
|
continue
|
|
}
|
|
count++
|
|
}
|
|
return count
|
|
}
|
|
|
|
func isAdminScriptLockCommand(name string) bool {
|
|
switch strings.ToLower(strings.TrimSpace(name)) {
|
|
case "lock", "unlock":
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func buildAdminScriptProposal(script, scriptName string, commandCount int) *plugin_pb.JobProposal {
|
|
scriptHash := hashAdminScript(script)
|
|
summary := "Run admin script"
|
|
if scriptName != "" {
|
|
summary = fmt.Sprintf("Run admin script: %s", scriptName)
|
|
}
|
|
detail := fmt.Sprintf("Admin script with %d command(s)", commandCount)
|
|
proposalID := fmt.Sprintf("admin-script-%s-%d", scriptHash[:8], time.Now().UnixNano())
|
|
|
|
labels := map[string]string{
|
|
"script_hash": scriptHash,
|
|
}
|
|
if scriptName != "" {
|
|
labels["script_name"] = scriptName
|
|
}
|
|
|
|
return &plugin_pb.JobProposal{
|
|
ProposalId: proposalID,
|
|
DedupeKey: "admin-script:" + scriptHash,
|
|
JobType: adminScriptJobType,
|
|
Priority: plugin_pb.JobPriority_JOB_PRIORITY_NORMAL,
|
|
Summary: summary,
|
|
Detail: detail,
|
|
Parameters: map[string]*plugin_pb.ConfigValue{
|
|
"script": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: script},
|
|
},
|
|
"script_name": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptName},
|
|
},
|
|
"script_hash": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: scriptHash},
|
|
},
|
|
"command_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(commandCount)},
|
|
},
|
|
},
|
|
Labels: labels,
|
|
}
|
|
}
|
|
|
|
func (h *AdminScriptHandler) buildAdminScriptCommandEnv(
|
|
ctx context.Context,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
) (*shell.CommandEnv, context.CancelFunc, error) {
|
|
options, err := h.buildAdminScriptShellOptions(clusterContext)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
commandEnv := shell.NewCommandEnv(&options)
|
|
commandEnv.ForceNoLock()
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
go commandEnv.MasterClient.KeepConnectedToMaster(ctx)
|
|
|
|
return commandEnv, cancel, nil
|
|
}
|
|
|
|
// buildAdminScriptShellOptions maps a cluster context onto shell.ShellOptions.
|
|
// It is split out from buildAdminScriptCommandEnv so the address handling is
|
|
// testable without standing up a master client.
|
|
func (h *AdminScriptHandler) buildAdminScriptShellOptions(clusterContext *plugin_pb.ClusterContext) (shell.ShellOptions, error) {
|
|
if clusterContext == nil {
|
|
return shell.ShellOptions{}, fmt.Errorf("cluster context is required")
|
|
}
|
|
|
|
masters := normalizeAddressList(clusterContext.MasterGrpcAddresses)
|
|
if len(masters) == 0 {
|
|
return shell.ShellOptions{}, fmt.Errorf("missing master addresses for admin script")
|
|
}
|
|
|
|
filerGroup := ""
|
|
mastersValue := strings.Join(masters, ",")
|
|
options := shell.ShellOptions{
|
|
Masters: &mastersValue,
|
|
GrpcDialOption: h.grpcDialOption,
|
|
FilerGroup: &filerGroup,
|
|
Directory: "/",
|
|
}
|
|
|
|
// FilerAddresses are pb.ServerAddress strings (host:httpPort.grpcPort).
|
|
// ShellOptions.FilerAddress is itself a ServerAddress; shell commands derive
|
|
// the gRPC or HTTP port from it as needed, so store it verbatim.
|
|
filers := normalizeAddressList(clusterContext.FilerAddresses)
|
|
if len(filers) > 0 {
|
|
options.FilerAddress = pb.ServerAddress(filers[0])
|
|
} else {
|
|
glog.V(1).Infof("admin script worker missing filer address; filer-dependent commands may fail")
|
|
}
|
|
|
|
return options, nil
|
|
}
|
|
|
|
func normalizeAddressList(addresses []string) []string {
|
|
normalized := make([]string, 0, len(addresses))
|
|
seen := make(map[string]struct{}, len(addresses))
|
|
for _, address := range addresses {
|
|
address = strings.TrimSpace(address)
|
|
if address == "" {
|
|
continue
|
|
}
|
|
if _, exists := seen[address]; exists {
|
|
continue
|
|
}
|
|
seen[address] = struct{}{}
|
|
normalized = append(normalized, address)
|
|
}
|
|
return normalized
|
|
}
|
|
|
|
func hashAdminScript(script string) string {
|
|
sum := sha256.Sum256([]byte(script))
|
|
return hex.EncodeToString(sum[:])
|
|
}
|
|
|
|
func formatAdminScriptCommand(cmd adminScriptCommand) string {
|
|
if len(cmd.Args) == 0 {
|
|
return cmd.Name
|
|
}
|
|
return fmt.Sprintf("%s %s", cmd.Name, strings.Join(cmd.Args, " "))
|
|
}
|
|
|
|
func percentProgress(done, total int) float64 {
|
|
if total <= 0 {
|
|
return 0
|
|
}
|
|
if done < 0 {
|
|
done = 0
|
|
}
|
|
if done > total {
|
|
done = total
|
|
}
|
|
return float64(done) / float64(total) * 100
|
|
}
|
|
|
|
type limitedBuffer struct {
|
|
buf bytes.Buffer
|
|
maxBytes int
|
|
truncated bool
|
|
}
|
|
|
|
func (b *limitedBuffer) Write(p []byte) (int, error) {
|
|
if b == nil {
|
|
return len(p), nil
|
|
}
|
|
if b.maxBytes <= 0 {
|
|
b.truncated = true
|
|
return len(p), nil
|
|
}
|
|
remaining := b.maxBytes - b.buf.Len()
|
|
if remaining <= 0 {
|
|
b.truncated = true
|
|
return len(p), nil
|
|
}
|
|
if len(p) > remaining {
|
|
_, _ = b.buf.Write(p[:remaining])
|
|
b.truncated = true
|
|
return len(p), nil
|
|
}
|
|
_, _ = b.buf.Write(p)
|
|
return len(p), nil
|
|
}
|
|
|
|
func (b *limitedBuffer) String() string {
|
|
if b == nil {
|
|
return ""
|
|
}
|
|
return b.buf.String()
|
|
}
|