Files
fasthttp/workerpool.go
T
Moritz Poldrack d0f2727a4d get rid of some panics (#1526)
* client: simplify (*HostClient).do()

Remove an allocation in favour of deferring a call to release the
response.

* client: remove panic in dialAddr

Return an error instead of panicking if the user supplied a nonsensical
DialFunc.

* compression: remove panic on invalid compression level

If a compression level exceeding gzip's boundaries is provided, fasthttp
will panic. Instead it would be better to handle this error for them by
limiting it to the minimum or maximum value, depending on the direction
the user has exceeded the limits.

Clamp the value of gzip to always be between gzip.BestSpeed and
gzip.BestCompression.

* peripconn: remove panic on negative connection count

When a negative count is reached when unregistering a connection, a
panic is caused even though data-integrity is not at risk.

Replace the panic() with a simple clamp on the value to ensure the
value does not exceed it's expected lower bounds.

References: #1504

* compress: remove error on failed nonblocking writes

Since there is no way of handling or even logging non-critical errors in
stateless non-blocking writecalls, just drop them and hope the user
notices and tries again.

* workerPool: remove panic on redundant Start and Stop calls

Instead of panicking for invalid behaviour, it's preferable to just turn
the function into a noop.

* http: remove panic on invalid form boundary

* http: remove panic on negative reads

Since bufio already panics on negative reads, it is not necessary to do
so as well. If the length is zero and for some reason no error is
returned, readBodyIdentity and appendBodyFixedSize now errors in these
cases.

Link: https://github.com/golang/go/blob/851f6fd61425c810959c7ab51e6dc86f8a63c970/src/bufio/bufio.go#L246

* fs: remove panic on negative reader count

When a negative count is reached when unregistering a reader, a panic is
thrown even though data-integrity is not at risk.

Replace the panic() with a simple clamp on the value to ensure the
value does not exceed it's expected lower bounds.

* server: remove panic in favour of a segfault

Panicking with "BUG: " obscures the error. As the segfault causes a
panic anyway, just let the chaos unfold.

* server: remove panic in favour of returning an error

Writing on a timed-out response is not endangering data integrity and
just fails.

* chore: add comments to all panics

* chore: fix minor typo
2023-03-30 03:38:28 +02:00

252 lines
5.0 KiB
Go

package fasthttp
import (
"errors"
"net"
"runtime"
"strings"
"sync"
"time"
)
// workerPool serves incoming connections via a pool of workers
// in FILO order, i.e. the most recently stopped worker will serve the next
// incoming connection.
//
// Such a scheme keeps CPU caches hot (in theory).
type workerPool struct {
// Function for serving server connections.
// It must leave c unclosed.
WorkerFunc ServeHandler
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time.Duration
Logger Logger
lock sync.Mutex
workersCount int
mustStop bool
ready []*workerChan
stopCh chan struct{}
workerChanPool sync.Pool
connState func(net.Conn, ConnState)
}
type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
}
func (wp *workerPool) Start() {
if wp.stopCh != nil {
return
}
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
wp.workerChanPool.New = func() interface{} {
return &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
go func() {
var scratch []*workerChan
for {
wp.clean(&scratch)
select {
case <-stopCh:
return
default:
time.Sleep(wp.getMaxIdleWorkerDuration())
}
}
}()
}
func (wp *workerPool) Stop() {
if wp.stopCh == nil {
return
}
close(wp.stopCh)
wp.stopCh = nil
// Stop all the workers waiting for incoming connections.
// Do not wait for busy workers - they will stop after
// serving the connection and noticing wp.mustStop = true.
wp.lock.Lock()
ready := wp.ready
for i := range ready {
ready[i].ch <- nil
ready[i] = nil
}
wp.ready = ready[:0]
wp.mustStop = true
wp.lock.Unlock()
}
func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
if wp.MaxIdleWorkerDuration <= 0 {
return 10 * time.Second
}
return wp.MaxIdleWorkerDuration
}
func (wp *workerPool) clean(scratch *[]*workerChan) {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
// Clean least recently used workers if they didn't serve connections
// for more than maxIdleWorkerDuration.
criticalTime := time.Now().Add(-maxIdleWorkerDuration)
wp.lock.Lock()
ready := wp.ready
n := len(ready)
// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
l, r, mid := 0, n-1, 0
for l <= r {
mid = (l + r) / 2
if criticalTime.After(wp.ready[mid].lastUseTime) {
l = mid + 1
} else {
r = mid - 1
}
}
i := r
if i == -1 {
wp.lock.Unlock()
return
}
*scratch = append((*scratch)[:0], ready[:i+1]...)
m := copy(ready, ready[i+1:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
wp.lock.Unlock()
// Notify obsolete workers to stop.
// This notification must be outside the wp.lock, since ch.ch
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
tmp := *scratch
for i := range tmp {
tmp[i].ch <- nil
tmp[i] = nil
}
}
func (wp *workerPool) Serve(c net.Conn) bool {
ch := wp.getCh()
if ch == nil {
return false
}
ch.ch <- c
return true
}
var workerChanCap = func() int {
// Use blocking workerChan if GOMAXPROCS=1.
// This immediately switches Serve to WorkerFunc, which results
// in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the Serve caller (Acceptor) may lag accepting
// new connections if WorkerFunc is CPU-bound.
return 1
}()
func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false
wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()
if ch == nil {
if !createWorker {
return nil
}
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return ch
}
func (wp *workerPool) release(ch *workerChan) bool {
ch.lastUseTime = time.Now()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
return false
}
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()
return true
}
func (wp *workerPool) workerFunc(ch *workerChan) {
var c net.Conn
var err error
for c = range ch.ch {
if c == nil {
break
}
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
errStr := err.Error()
if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
strings.Contains(errStr, "reset by peer") ||
strings.Contains(errStr, "request headers: small read buffer") ||
strings.Contains(errStr, "unexpected EOF") ||
strings.Contains(errStr, "i/o timeout") ||
errors.Is(err, ErrBadTrailer)) {
wp.Logger.Printf("error when serving connection %q<->%q: %v", c.LocalAddr(), c.RemoteAddr(), err)
}
}
if err == errHijacked {
wp.connState(c, StateHijacked)
} else {
_ = c.Close()
wp.connState(c, StateClosed)
}
c = nil
if !wp.release(ch) {
break
}
}
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}