Files
fasthttp/prefork/prefork.go
T
Corné Steenhuis ab8c2aceea fix: detect master process death in prefork children (#2158)
* fix: detect master process death in prefork children

Prefork child processes had no mechanism to detect if the master process
died unexpectedly. Children would become orphans, get reparented to
PID 1, and keep running silently with no supervision.

Add a watchMaster goroutine that stores the original parent PID at
startup and exits when the parent PID changes, matching the approach
used in gofiber/fiber.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test: add integration test for watchMaster orphan detection

Verifies that prefork children exit when the master process is killed,
using a two-level subprocess chain (test → master → child) with pipe-based
synchronization to ensure the child has recorded its PPID before the
master is killed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor: pass masterPID to watchMaster and clean up tests

Capture PPID before launching the goroutine to eliminate a race between
the PPID snapshot and the ready signal. Align test style with the rest
of the project (t.Parallel, naming, ASCII-only comments).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: make prefork orphan detection configurable via OnMasterDeath callback

Address review feedback: make watchMaster opt-in via an OnMasterDeath
callback field (nil/off by default for backwards compatibility). Users
can set DefaultOnMasterDeath for os.Exit(1) or provide custom cleanup.
Also fixes ticker leak in watchMaster.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* address review feedback: remove DefaultOnMasterDeath, delete tests, fix log message

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 16:59:29 +09:00

321 lines
7.1 KiB
Go

// Package prefork provides a way to prefork a fasthttp server.
package prefork
import (
"errors"
"log"
"net"
"os"
"os/exec"
"runtime"
"time"
"github.com/valyala/fasthttp"
"github.com/valyala/fasthttp/reuseport"
)
const (
preforkChildEnvVariable = "FASTHTTP_PREFORK_CHILD"
defaultNetwork = "tcp4"
)
var (
defaultLogger = Logger(log.New(os.Stderr, "", log.LstdFlags))
// ErrOverRecovery is returned when the times of starting over child prefork processes exceed
// the threshold.
ErrOverRecovery = errors.New("exceeding the value of RecoverThreshold")
// ErrOnlyReuseportOnWindows is returned when Reuseport is false.
ErrOnlyReuseportOnWindows = errors.New("windows only supports Reuseport = true")
)
// Logger is used for logging formatted messages.
type Logger interface {
// Printf must have the same semantics as log.Printf.
Printf(format string, args ...any)
}
// Prefork implements fasthttp server prefork.
//
// Preforks master process (with all cores) between several child processes
// increases performance significantly, because Go doesn't have to share
// and manage memory between cores.
//
// WARNING: using prefork prevents the use of any global state!
// Things like in-memory caches won't work.
type Prefork struct {
// By default standard logger from log package is used.
Logger Logger
ln net.Listener
ServeFunc func(ln net.Listener) error
ServeTLSFunc func(ln net.Listener, certFile, keyFile string) error
ServeTLSEmbedFunc func(ln net.Listener, certData, keyData []byte) error
// The network must be "tcp", "tcp4" or "tcp6".
//
// By default is "tcp4"
Network string
files []*os.File
// Child prefork processes may exit with failure and will be started over until the times reach
// the value of RecoverThreshold, then it will return and terminate the server.
RecoverThreshold int
// Flag to use a listener with reuseport, if not a file Listener will be used
// See: https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/
//
// It's disabled by default
Reuseport bool
// OnMasterDeath, when non-nil, enables monitoring of the master process
// in child processes. If the master process dies unexpectedly, this
// callback is invoked. This allows custom cleanup before shutdown.
//
// It is recommended to set this to func() { os.Exit(1) } if no custom
// cleanup is needed.
OnMasterDeath func()
}
// IsChild checks if the current thread/process is a child.
func IsChild() bool {
return os.Getenv(preforkChildEnvVariable) == "1"
}
// New wraps the fasthttp server to run with preforked processes.
func New(s *fasthttp.Server) *Prefork {
return &Prefork{
Network: defaultNetwork,
RecoverThreshold: runtime.GOMAXPROCS(0) / 2,
Logger: s.Logger,
ServeFunc: s.Serve,
ServeTLSFunc: s.ServeTLS,
ServeTLSEmbedFunc: s.ServeTLSEmbed,
}
}
func (p *Prefork) logger() Logger {
if p.Logger != nil {
return p.Logger
}
return defaultLogger
}
func (p *Prefork) watchMaster(masterPID int) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
if os.Getppid() != masterPID {
p.logger().Printf("master process died\n")
p.OnMasterDeath()
return
}
}
}
func (p *Prefork) listen(addr string) (net.Listener, error) {
runtime.GOMAXPROCS(1)
if p.Network == "" {
p.Network = defaultNetwork
}
if p.Reuseport {
return reuseport.Listen(p.Network, addr)
}
return net.FileListener(os.NewFile(3, ""))
}
func (p *Prefork) setTCPListenerFiles(addr string) error {
if p.Network == "" {
p.Network = defaultNetwork
}
tcpAddr, err := net.ResolveTCPAddr(p.Network, addr)
if err != nil {
return err
}
tcplistener, err := net.ListenTCP(p.Network, tcpAddr)
if err != nil {
return err
}
p.ln = tcplistener
fl, err := tcplistener.File()
if err != nil {
return err
}
p.files = []*os.File{fl}
return nil
}
func (p *Prefork) doCommand() (*exec.Cmd, error) {
executable, err := os.Executable()
if err != nil {
return nil, err
}
args := make([]string, len(os.Args))
args[0] = executable
copy(args[1:], os.Args[1:])
cmd := &exec.Cmd{
Path: executable,
Args: args,
Stdout: os.Stdout,
Stderr: os.Stderr,
Env: append(os.Environ(), preforkChildEnvVariable+"=1"),
ExtraFiles: p.files,
}
err = cmd.Start()
return cmd, err
}
func (p *Prefork) prefork(addr string) (err error) {
if !p.Reuseport {
if runtime.GOOS == "windows" {
return ErrOnlyReuseportOnWindows
}
if err = p.setTCPListenerFiles(addr); err != nil {
return err
}
// defer for closing the net.Listener opened by setTCPListenerFiles.
defer func() {
e := p.ln.Close()
if err == nil {
err = e
}
}()
}
type procSig struct {
err error
pid int
}
goMaxProcs := runtime.GOMAXPROCS(0)
sigCh := make(chan procSig, goMaxProcs)
childProcs := make(map[int]*exec.Cmd)
defer func() {
for _, proc := range childProcs {
_ = proc.Process.Kill()
}
}()
for range goMaxProcs {
var cmd *exec.Cmd
if cmd, err = p.doCommand(); err != nil {
p.logger().Printf("failed to start a child prefork process, error: %v\n", err)
return err
}
childProcs[cmd.Process.Pid] = cmd
go func() {
sigCh <- procSig{pid: cmd.Process.Pid, err: cmd.Wait()}
}()
}
var exitedProcs int
for sig := range sigCh {
delete(childProcs, sig.pid)
p.logger().Printf("one of the child prefork processes exited with "+
"error: %v", sig.err)
exitedProcs++
if exitedProcs > p.RecoverThreshold {
p.logger().Printf("child prefork processes exit too many times, "+
"which exceeds the value of RecoverThreshold(%d), "+
"exiting the master process.\n", exitedProcs)
err = ErrOverRecovery
break
}
var cmd *exec.Cmd
if cmd, err = p.doCommand(); err != nil {
break
}
childProcs[cmd.Process.Pid] = cmd
go func() {
sigCh <- procSig{pid: cmd.Process.Pid, err: cmd.Wait()}
}()
}
return err
}
// ListenAndServe serves HTTP requests from the given TCP addr.
func (p *Prefork) ListenAndServe(addr string) error {
if IsChild() {
ln, err := p.listen(addr)
if err != nil {
return err
}
p.ln = ln
if p.OnMasterDeath != nil {
go p.watchMaster(os.Getppid())
}
return p.ServeFunc(ln)
}
return p.prefork(addr)
}
// ListenAndServeTLS serves HTTPS requests from the given TCP addr.
//
// certFile and keyFile are paths to TLS certificate and key files.
func (p *Prefork) ListenAndServeTLS(addr, certKey, certFile string) error {
if IsChild() {
ln, err := p.listen(addr)
if err != nil {
return err
}
p.ln = ln
if p.OnMasterDeath != nil {
go p.watchMaster(os.Getppid())
}
return p.ServeTLSFunc(ln, certFile, certKey)
}
return p.prefork(addr)
}
// ListenAndServeTLSEmbed serves HTTPS requests from the given TCP addr.
//
// certData and keyData must contain valid TLS certificate and key data.
func (p *Prefork) ListenAndServeTLSEmbed(addr string, certData, keyData []byte) error {
if IsChild() {
ln, err := p.listen(addr)
if err != nil {
return err
}
p.ln = ln
if p.OnMasterDeath != nil {
go p.watchMaster(os.Getppid())
}
return p.ServeTLSEmbedFunc(ln, certData, keyData)
}
return p.prefork(addr)
}