fix(weed/command) address unhandled errors (#9208)

* fix(weed/command) address unhandled errors

* fix(command): don't log graceful-shutdown sentinels; plug response-body leak

- s3: Serve on unix socket treated http.ErrServerClosed as fatal; now
  excluded like the other Serve/ServeTLS paths in this file.
- mq_agent, mq_broker: filter grpc.ErrServerStopped so clean shutdown
  doesn't log as an error.
- worker_runtime: the added decodeErr early-continue skipped
  resp.Body.Close(); drop it since the existing check below already
  surfaces the decode error.
- mount_std: the pre-mount Unmount commonly fails when nothing is
  mounted; demote to V(1) Infof.
- fuse_std: tidy panic message to match sibling cases.

* fix(mq_broker): filter grpc.ErrServerStopped on localhost listener

The localhost listener goroutine logged any Serve error unconditionally,
which includes grpc.ErrServerStopped on graceful shutdown. Match the
main listener's check so clean stops don't surface as errors.

---------

Co-authored-by: Chris Lu <chris.lu@gmail.com>
This commit is contained in:
Lars Lehtonen
2026-04-23 22:15:05 -07:00
committed by GitHub
parent 88c2f3c34d
commit 29e14f89f1
11 changed files with 67 additions and 27 deletions
+12 -3
View File
@@ -511,7 +511,10 @@ func (fo *FilerOptions) startFiler() {
ClientCAs: caCertPool,
}
security.FixTlsConfig(util.GetViper(), tlsConfig)
err = security.FixTlsConfig(util.GetViper(), tlsConfig)
if err != nil {
glog.Fatalf("Filer failed to fix TLS config: %v", err)
}
var localTLSServer *http.Server
if filerLocalListener != nil {
@@ -534,10 +537,16 @@ func (fo *FilerOptions) startFiler() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
if socketServer != nil {
socketServer.Shutdown(shutdownCtx)
err = socketServer.Shutdown(shutdownCtx)
if err != nil {
glog.Warningf("socket server shutdown: %v", err)
}
}
if localTLSServer != nil {
localTLSServer.Shutdown(shutdownCtx)
err = localTLSServer.Shutdown(shutdownCtx)
if err != nil {
glog.Warningf("local TLS server shutdown: %v", err)
}
}
if err := httpS.Shutdown(shutdownCtx); err != nil {
glog.Warningf("HTTPS server shutdown: %v", err)
+4 -2
View File
@@ -161,11 +161,13 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
// create filer sink
filerSource := &source.FilerSource{}
filerSource.DoInitialize(
if err := filerSource.DoInitialize(
sourceFiler.ToHttpAddress(),
sourceFiler.ToGrpcAddress(),
sourcePath,
*backupOption.proxyByFiler)
*backupOption.proxyByFiler); err != nil {
return fmt.Errorf("filersource initialization failed: %v", err)
}
if err := repl_util.InitializeSSEForReplication(filerSource); err != nil {
return fmt.Errorf("SSE initialization failed: %v", err)
+3 -1
View File
@@ -195,7 +195,9 @@ func doFixOneVolume(basepath string, baseFileName string, collection string, vol
if *fixIgnoreError {
glog.Error(err)
} else {
os.Remove(indexFileName)
if err := os.Remove(indexFileName); err != nil {
glog.Errorf("failed to cleanup file %s:%v", indexFileName, err)
}
glog.Fatal(err)
}
}
+3 -1
View File
@@ -243,7 +243,9 @@ func runFuse(cmd *Command, args []string) bool {
case "fusermount.path":
fusermountPath = parameter.value
case "config_dir":
util.ConfigurationFileDirectory.Set(parameter.value)
if err := util.ConfigurationFileDirectory.Set(parameter.value); err != nil {
panic(fmt.Errorf("config_dir %s: %w", parameter.value, err))
}
// FUSE performance options
case "writebackCache":
if parsed, err := strconv.ParseBool(parameter.value); err == nil {
+6 -2
View File
@@ -148,7 +148,9 @@ func runMaster(cmd *Command, args []string) bool {
parent, _ := util.FullPath(*m.metaFolder).DirAndName()
if util.FileExists(string(parent)) && !util.FileExists(*m.metaFolder) {
os.MkdirAll(*m.metaFolder, 0755)
if err := os.MkdirAll(*m.metaFolder, 0755); err != nil {
glog.Fatalf("Could not create Meta Folder %s: %v", *m.metaFolder, err)
}
}
if err := util.TestFolderWritable(util.ResolvePath(*m.metaFolder)); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *m.metaFolder, err)
@@ -324,7 +326,9 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
var tlsConfig *tls.Config
if useMTLS {
tlsConfig = security.LoadClientTLSHTTP(clientCertFile)
security.FixTlsConfig(util.GetViper(), tlsConfig)
if err := security.FixTlsConfig(util.GetViper(), tlsConfig); err != nil {
glog.Fatalf("failed to fix TLS config: %v", err)
}
}
if useTLS {
+13 -8
View File
@@ -82,11 +82,11 @@ var (
// goroutines (registered via onMiniClientsShutdown / trackMiniClient) to
// drain.
type miniClientsState struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
preCancelMu sync.Mutex
preCancelFns []func()
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
preCancelMu sync.Mutex
preCancelFns []func()
}
var miniClients *miniClientsState
@@ -807,8 +807,11 @@ func applyConfigFileOptions(options map[string]string) {
if flag != nil {
// Only set if not already set (by command line)
if flag.Value.String() == flag.DefValue {
flag.Value.Set(value)
glog.V(2).Infof("Applied config file option: %s=%s", key, value)
if err := flag.Value.Set(value); err != nil {
glog.Warningf("Failed to apply config file option: %s=%s: %v", key, value, err)
} else {
glog.V(2).Infof("Applied config file option: %s=%s", key, value)
}
}
}
}
@@ -1021,7 +1024,9 @@ func runMini(cmd *Command, args []string) bool {
printWelcomeMessage()
// Save configuration to file for persistence and documentation
saveMiniConfiguration(*miniDataFolders)
if err := saveMiniConfiguration(*miniDataFolders); err != nil {
glog.Warningf("failed to save mini configuration in %s: %v", *miniDataFolders, err)
}
if MiniClusterCtx != nil {
<-MiniClusterCtx.Done()
+11 -5
View File
@@ -166,7 +166,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
return false
}
unmount.Unmount(dir)
if err := unmount.Unmount(dir); err != nil {
glog.V(1).Infof("pre-mount cleanup unmount %s: %v", dir, err)
}
// start on local unix socket
if *option.localSocket == "" {
@@ -186,7 +188,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
// detect mount folder mode
if *option.dirAutoCreate {
os.MkdirAll(dir, os.FileMode(0777)&^umask)
if err := os.MkdirAll(dir, os.FileMode(0777)&^umask); err != nil {
glog.Fatalf("failed to create directory %s:%v", dir, err)
}
}
fileInfo, err := os.Stat(dir)
@@ -357,8 +361,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
WritebackCache: option.writebackCache != nil && *option.writebackCache,
PosixDirNlink: option.posixDirNlink != nil && *option.posixDirNlink,
// Peer chunk sharing
PeerEnabled: option.peerEnabled != nil && *option.peerEnabled,
PeerListen: peerStringOrEmpty(option.peerListen),
PeerEnabled: option.peerEnabled != nil && *option.peerEnabled,
PeerListen: peerStringOrEmpty(option.peerListen),
PeerAdvertise: peerStringOrEmpty(option.peerAdvertise),
PeerDataCenter: peerStringOrEmpty(option.peerDataCenter),
PeerRack: peerStringOrEmpty(option.peerRack),
@@ -381,7 +385,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
glog.Fatalf("Mount fail: %v", err)
}
grace.OnInterrupt(func() {
unmount.Unmount(dir)
if err := unmount.Unmount(dir); err != nil {
glog.Errorf("failed to unmount %s: %v", dir, err)
}
})
if mountOptions.fuseCommandPid != 0 {
+4 -1
View File
@@ -3,6 +3,7 @@ package command
import (
"github.com/seaweedfs/seaweedfs/weed/mq/agent"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -84,7 +85,9 @@ func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool {
}
glog.Infof("Start Seaweed Message Queue Agent on %s:%d", *mqAgentOpt.ip, *mqAgentOpt.port)
grpcS.Serve(grpcL)
if err := grpcS.Serve(grpcL); err != nil && err != grpc.ErrServerStopped {
glog.Errorf("MQ Agent failed to start: %v", err)
}
return true
+5 -2
View File
@@ -1,6 +1,7 @@
package command
import (
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -109,14 +110,16 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
reflection.Register(localGrpcS)
go func() {
glog.V(0).Infof("MQ Broker listening on localhost:%d", *mqBrokerOpt.port)
if err := localGrpcS.Serve(localL); err != nil {
if err := localGrpcS.Serve(localL); err != nil && err != grpc.ErrServerStopped {
glog.Errorf("MQ Broker localhost listener error: %v", err)
}
}()
}
glog.V(0).Infof("MQ Broker listening on %s:%d", *mqBrokerOpt.ip, *mqBrokerOpt.port)
grpcS.Serve(grpcL)
if err := grpcS.Serve(grpcL); err != nil && err != grpc.ErrServerStopped {
glog.Errorf("Failed to serve MQ Broker: %v", err)
}
return true
+3 -1
View File
@@ -364,7 +364,9 @@ func (s3opt *S3Options) startS3Server() bool {
if err != nil {
glog.Fatalf("Failed to listen on %s: %v", localSocket, err)
}
newHttpServer(router, nil).Serve(s3SocketListener)
if err := newHttpServer(router, nil).Serve(s3SocketListener); err != nil && err != http.ErrServerClosed {
glog.Fatalf("Failed to start S3 http server: %v", err)
}
}()
}
+3 -1
View File
@@ -480,7 +480,9 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) (http
if viper.GetString("https.volume.ca") != "" {
clientCertFile := viper.GetString("https.volume.ca")
httpS.TLSConfig = security.LoadClientTLSHTTP(clientCertFile)
security.FixTlsConfig(util.GetViper(), httpS.TLSConfig)
if err := security.FixTlsConfig(util.GetViper(), httpS.TLSConfig); err != nil {
glog.Fatalf("Could not fix TLS config: %v", err)
}
}
var closeCert func()