mirror of
https://github.com/valyala/fasthttp.git
synced 2026-06-14 15:56:44 +03:00
d0f2727a4d
* client: simplify (*HostClient).do() Remove an allocation in favour of deferring a call to release the response. * client: remove panic in dialAddr Return an error instead of panicking if the user supplied a nonsensical DialFunc. * compression: remove panic on invalid compression level If a compression level exceeding gzip's boundaries is provided, fasthttp will panic. Instead it would be better to handle this error for them by limiting it to the minimum or maximum value, depending on the direction the user has exceeded the limits. Clamp the value of gzip to always be between gzip.BestSpeed and gzip.BestCompression. * peripconn: remove panic on negative connection count When a negative count is reached when unregistering a connection, a panic is caused even though data-integrity is not at risk. Replace the panic() with a simple clamp on the value to ensure the value does not exceed it's expected lower bounds. References: #1504 * compress: remove error on failed nonblocking writes Since there is no way of handling or even logging non-critical errors in stateless non-blocking writecalls, just drop them and hope the user notices and tries again. * workerPool: remove panic on redundant Start and Stop calls Instead of panicking for invalid behaviour, it's preferable to just turn the function into a noop. * http: remove panic on invalid form boundary * http: remove panic on negative reads Since bufio already panics on negative reads, it is not necessary to do so as well. If the length is zero and for some reason no error is returned, readBodyIdentity and appendBodyFixedSize now errors in these cases. Link: https://github.com/golang/go/blob/851f6fd61425c810959c7ab51e6dc86f8a63c970/src/bufio/bufio.go#L246 * fs: remove panic on negative reader count When a negative count is reached when unregistering a reader, a panic is thrown even though data-integrity is not at risk. Replace the panic() with a simple clamp on the value to ensure the value does not exceed it's expected lower bounds. * server: remove panic in favour of a segfault Panicking with "BUG: " obscures the error. As the segfault causes a panic anyway, just let the chaos unfold. * server: remove panic in favour of returning an error Writing on a timed-out response is not endangering data integrity and just fails. * chore: add comments to all panics * chore: fix minor typo
252 lines
5.0 KiB
Go
252 lines
5.0 KiB
Go
package fasthttp
|
|
|
|
import (
|
|
"errors"
|
|
"net"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// workerPool serves incoming connections via a pool of workers
|
|
// in FILO order, i.e. the most recently stopped worker will serve the next
|
|
// incoming connection.
|
|
//
|
|
// Such a scheme keeps CPU caches hot (in theory).
|
|
type workerPool struct {
|
|
// Function for serving server connections.
|
|
// It must leave c unclosed.
|
|
WorkerFunc ServeHandler
|
|
|
|
MaxWorkersCount int
|
|
|
|
LogAllErrors bool
|
|
|
|
MaxIdleWorkerDuration time.Duration
|
|
|
|
Logger Logger
|
|
|
|
lock sync.Mutex
|
|
workersCount int
|
|
mustStop bool
|
|
|
|
ready []*workerChan
|
|
|
|
stopCh chan struct{}
|
|
|
|
workerChanPool sync.Pool
|
|
|
|
connState func(net.Conn, ConnState)
|
|
}
|
|
|
|
type workerChan struct {
|
|
lastUseTime time.Time
|
|
ch chan net.Conn
|
|
}
|
|
|
|
func (wp *workerPool) Start() {
|
|
if wp.stopCh != nil {
|
|
return
|
|
}
|
|
wp.stopCh = make(chan struct{})
|
|
stopCh := wp.stopCh
|
|
wp.workerChanPool.New = func() interface{} {
|
|
return &workerChan{
|
|
ch: make(chan net.Conn, workerChanCap),
|
|
}
|
|
}
|
|
go func() {
|
|
var scratch []*workerChan
|
|
for {
|
|
wp.clean(&scratch)
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
time.Sleep(wp.getMaxIdleWorkerDuration())
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (wp *workerPool) Stop() {
|
|
if wp.stopCh == nil {
|
|
return
|
|
}
|
|
close(wp.stopCh)
|
|
wp.stopCh = nil
|
|
|
|
// Stop all the workers waiting for incoming connections.
|
|
// Do not wait for busy workers - they will stop after
|
|
// serving the connection and noticing wp.mustStop = true.
|
|
wp.lock.Lock()
|
|
ready := wp.ready
|
|
for i := range ready {
|
|
ready[i].ch <- nil
|
|
ready[i] = nil
|
|
}
|
|
wp.ready = ready[:0]
|
|
wp.mustStop = true
|
|
wp.lock.Unlock()
|
|
}
|
|
|
|
func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
|
|
if wp.MaxIdleWorkerDuration <= 0 {
|
|
return 10 * time.Second
|
|
}
|
|
return wp.MaxIdleWorkerDuration
|
|
}
|
|
|
|
func (wp *workerPool) clean(scratch *[]*workerChan) {
|
|
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
|
|
|
|
// Clean least recently used workers if they didn't serve connections
|
|
// for more than maxIdleWorkerDuration.
|
|
criticalTime := time.Now().Add(-maxIdleWorkerDuration)
|
|
|
|
wp.lock.Lock()
|
|
ready := wp.ready
|
|
n := len(ready)
|
|
|
|
// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
|
|
l, r, mid := 0, n-1, 0
|
|
for l <= r {
|
|
mid = (l + r) / 2
|
|
if criticalTime.After(wp.ready[mid].lastUseTime) {
|
|
l = mid + 1
|
|
} else {
|
|
r = mid - 1
|
|
}
|
|
}
|
|
i := r
|
|
if i == -1 {
|
|
wp.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
*scratch = append((*scratch)[:0], ready[:i+1]...)
|
|
m := copy(ready, ready[i+1:])
|
|
for i = m; i < n; i++ {
|
|
ready[i] = nil
|
|
}
|
|
wp.ready = ready[:m]
|
|
wp.lock.Unlock()
|
|
|
|
// Notify obsolete workers to stop.
|
|
// This notification must be outside the wp.lock, since ch.ch
|
|
// may be blocking and may consume a lot of time if many workers
|
|
// are located on non-local CPUs.
|
|
tmp := *scratch
|
|
for i := range tmp {
|
|
tmp[i].ch <- nil
|
|
tmp[i] = nil
|
|
}
|
|
}
|
|
|
|
func (wp *workerPool) Serve(c net.Conn) bool {
|
|
ch := wp.getCh()
|
|
if ch == nil {
|
|
return false
|
|
}
|
|
ch.ch <- c
|
|
return true
|
|
}
|
|
|
|
var workerChanCap = func() int {
|
|
// Use blocking workerChan if GOMAXPROCS=1.
|
|
// This immediately switches Serve to WorkerFunc, which results
|
|
// in higher performance (under go1.5 at least).
|
|
if runtime.GOMAXPROCS(0) == 1 {
|
|
return 0
|
|
}
|
|
|
|
// Use non-blocking workerChan if GOMAXPROCS>1,
|
|
// since otherwise the Serve caller (Acceptor) may lag accepting
|
|
// new connections if WorkerFunc is CPU-bound.
|
|
return 1
|
|
}()
|
|
|
|
func (wp *workerPool) getCh() *workerChan {
|
|
var ch *workerChan
|
|
createWorker := false
|
|
|
|
wp.lock.Lock()
|
|
ready := wp.ready
|
|
n := len(ready) - 1
|
|
if n < 0 {
|
|
if wp.workersCount < wp.MaxWorkersCount {
|
|
createWorker = true
|
|
wp.workersCount++
|
|
}
|
|
} else {
|
|
ch = ready[n]
|
|
ready[n] = nil
|
|
wp.ready = ready[:n]
|
|
}
|
|
wp.lock.Unlock()
|
|
|
|
if ch == nil {
|
|
if !createWorker {
|
|
return nil
|
|
}
|
|
vch := wp.workerChanPool.Get()
|
|
ch = vch.(*workerChan)
|
|
go func() {
|
|
wp.workerFunc(ch)
|
|
wp.workerChanPool.Put(vch)
|
|
}()
|
|
}
|
|
return ch
|
|
}
|
|
|
|
func (wp *workerPool) release(ch *workerChan) bool {
|
|
ch.lastUseTime = time.Now()
|
|
wp.lock.Lock()
|
|
if wp.mustStop {
|
|
wp.lock.Unlock()
|
|
return false
|
|
}
|
|
wp.ready = append(wp.ready, ch)
|
|
wp.lock.Unlock()
|
|
return true
|
|
}
|
|
|
|
func (wp *workerPool) workerFunc(ch *workerChan) {
|
|
var c net.Conn
|
|
|
|
var err error
|
|
for c = range ch.ch {
|
|
if c == nil {
|
|
break
|
|
}
|
|
|
|
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
|
|
errStr := err.Error()
|
|
if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
|
|
strings.Contains(errStr, "reset by peer") ||
|
|
strings.Contains(errStr, "request headers: small read buffer") ||
|
|
strings.Contains(errStr, "unexpected EOF") ||
|
|
strings.Contains(errStr, "i/o timeout") ||
|
|
errors.Is(err, ErrBadTrailer)) {
|
|
wp.Logger.Printf("error when serving connection %q<->%q: %v", c.LocalAddr(), c.RemoteAddr(), err)
|
|
}
|
|
}
|
|
if err == errHijacked {
|
|
wp.connState(c, StateHijacked)
|
|
} else {
|
|
_ = c.Close()
|
|
wp.connState(c, StateClosed)
|
|
}
|
|
c = nil
|
|
|
|
if !wp.release(ch) {
|
|
break
|
|
}
|
|
}
|
|
|
|
wp.lock.Lock()
|
|
wp.workersCount--
|
|
wp.lock.Unlock()
|
|
}
|