mirror of
https://github.com/valyala/fasthttp.git
synced 2026-06-14 15:56:44 +03:00
86c7e844f4
1. Reduce RequestHeader from 368 bytes to 360 bytes 2. Reduce Request from 816 bytes to 800 bytes 3. Reduce Response from 432 bytes to 416 bytes 4. Reduce Client from 312 bytes to 288 bytes 5. Reduce HostClient from 416 bytes to 392 bytes 6. Reduce PipelineClient from 176 bytes to 168 bytes 7. Reduce pipelineConnClient from 216 bytes to 208 bytes 8. Reduce Cookie from 232 bytes to 224 bytes 9. Reduce FS from 184 bytes to 160 bytes 10. Reduce fsHandler from 168 bytes to 160 bytes 11. Reduce ResponseHeader from 328 bytes to 320 bytes 12. Reduce headerScanner from 128 bytes to 120 bytes 13. Reduce TCPDialer from 104 bytes to 96 bytes 14. Reduce workerPool from 152 btyes to 144 btyes
251 lines
5.0 KiB
Go
251 lines
5.0 KiB
Go
package fasthttp
|
|
|
|
import (
|
|
"errors"
|
|
"net"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// workerPool serves incoming connections via a pool of workers
|
|
// in FILO order, i.e. the most recently stopped worker will serve the next
|
|
// incoming connection.
|
|
//
|
|
// Such a scheme keeps CPU caches hot (in theory).
|
|
type workerPool struct {
|
|
// Function for serving server connections.
|
|
// It must leave c unclosed.
|
|
WorkerFunc ServeHandler
|
|
|
|
MaxWorkersCount int
|
|
|
|
LogAllErrors bool
|
|
mustStop bool
|
|
|
|
MaxIdleWorkerDuration time.Duration
|
|
|
|
Logger Logger
|
|
|
|
lock sync.Mutex
|
|
workersCount int
|
|
|
|
ready []*workerChan
|
|
|
|
stopCh chan struct{}
|
|
|
|
workerChanPool sync.Pool
|
|
|
|
connState func(net.Conn, ConnState)
|
|
}
|
|
|
|
type workerChan struct {
|
|
lastUseTime time.Time
|
|
ch chan net.Conn
|
|
}
|
|
|
|
func (wp *workerPool) Start() {
|
|
if wp.stopCh != nil {
|
|
return
|
|
}
|
|
wp.stopCh = make(chan struct{})
|
|
stopCh := wp.stopCh
|
|
wp.workerChanPool.New = func() any {
|
|
return &workerChan{
|
|
ch: make(chan net.Conn, workerChanCap),
|
|
}
|
|
}
|
|
go func() {
|
|
var scratch []*workerChan
|
|
for {
|
|
wp.clean(&scratch)
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
time.Sleep(wp.getMaxIdleWorkerDuration())
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (wp *workerPool) Stop() {
|
|
if wp.stopCh == nil {
|
|
return
|
|
}
|
|
close(wp.stopCh)
|
|
wp.stopCh = nil
|
|
|
|
// Stop all the workers waiting for incoming connections.
|
|
// Do not wait for busy workers - they will stop after
|
|
// serving the connection and noticing wp.mustStop = true.
|
|
wp.lock.Lock()
|
|
ready := wp.ready
|
|
for i := range ready {
|
|
ready[i].ch <- nil
|
|
ready[i] = nil
|
|
}
|
|
wp.ready = ready[:0]
|
|
wp.mustStop = true
|
|
wp.lock.Unlock()
|
|
}
|
|
|
|
func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
|
|
if wp.MaxIdleWorkerDuration <= 0 {
|
|
return 10 * time.Second
|
|
}
|
|
return wp.MaxIdleWorkerDuration
|
|
}
|
|
|
|
func (wp *workerPool) clean(scratch *[]*workerChan) {
|
|
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
|
|
|
|
// Clean least recently used workers if they didn't serve connections
|
|
// for more than maxIdleWorkerDuration.
|
|
criticalTime := time.Now().Add(-maxIdleWorkerDuration)
|
|
|
|
wp.lock.Lock()
|
|
ready := wp.ready
|
|
n := len(ready)
|
|
|
|
// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
|
|
l, r := 0, n-1
|
|
for l <= r {
|
|
mid := (l + r) / 2
|
|
if criticalTime.After(wp.ready[mid].lastUseTime) {
|
|
l = mid + 1
|
|
} else {
|
|
r = mid - 1
|
|
}
|
|
}
|
|
i := r
|
|
if i == -1 {
|
|
wp.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
*scratch = append((*scratch)[:0], ready[:i+1]...)
|
|
m := copy(ready, ready[i+1:])
|
|
for i = m; i < n; i++ {
|
|
ready[i] = nil
|
|
}
|
|
wp.ready = ready[:m]
|
|
wp.lock.Unlock()
|
|
|
|
// Notify obsolete workers to stop.
|
|
// This notification must be outside the wp.lock, since ch.ch
|
|
// may be blocking and may consume a lot of time if many workers
|
|
// are located on non-local CPUs.
|
|
tmp := *scratch
|
|
for i := range tmp {
|
|
tmp[i].ch <- nil
|
|
tmp[i] = nil
|
|
}
|
|
}
|
|
|
|
func (wp *workerPool) Serve(c net.Conn) bool {
|
|
ch := wp.getCh()
|
|
if ch == nil {
|
|
return false
|
|
}
|
|
ch.ch <- c
|
|
return true
|
|
}
|
|
|
|
var workerChanCap = func() int {
|
|
// Use blocking workerChan if GOMAXPROCS=1.
|
|
// This immediately switches Serve to WorkerFunc, which results
|
|
// in higher performance (under go1.5 at least).
|
|
if runtime.GOMAXPROCS(0) == 1 {
|
|
return 0
|
|
}
|
|
|
|
// Use non-blocking workerChan if GOMAXPROCS>1,
|
|
// since otherwise the Serve caller (Acceptor) may lag accepting
|
|
// new connections if WorkerFunc is CPU-bound.
|
|
return 1
|
|
}()
|
|
|
|
func (wp *workerPool) getCh() *workerChan {
|
|
var ch *workerChan
|
|
createWorker := false
|
|
|
|
wp.lock.Lock()
|
|
ready := wp.ready
|
|
n := len(ready) - 1
|
|
if n < 0 {
|
|
if wp.workersCount < wp.MaxWorkersCount {
|
|
createWorker = true
|
|
wp.workersCount++
|
|
}
|
|
} else {
|
|
ch = ready[n]
|
|
ready[n] = nil
|
|
wp.ready = ready[:n]
|
|
}
|
|
wp.lock.Unlock()
|
|
|
|
if ch == nil {
|
|
if !createWorker {
|
|
return nil
|
|
}
|
|
vch := wp.workerChanPool.Get()
|
|
ch = vch.(*workerChan)
|
|
go func() {
|
|
wp.workerFunc(ch)
|
|
wp.workerChanPool.Put(vch)
|
|
}()
|
|
}
|
|
return ch
|
|
}
|
|
|
|
func (wp *workerPool) release(ch *workerChan) bool {
|
|
ch.lastUseTime = time.Now()
|
|
wp.lock.Lock()
|
|
if wp.mustStop {
|
|
wp.lock.Unlock()
|
|
return false
|
|
}
|
|
wp.ready = append(wp.ready, ch)
|
|
wp.lock.Unlock()
|
|
return true
|
|
}
|
|
|
|
func (wp *workerPool) workerFunc(ch *workerChan) {
|
|
var c net.Conn
|
|
|
|
var err error
|
|
for c = range ch.ch {
|
|
if c == nil {
|
|
break
|
|
}
|
|
|
|
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
|
|
errStr := err.Error()
|
|
if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
|
|
strings.Contains(errStr, "reset by peer") ||
|
|
strings.Contains(errStr, "request headers: small read buffer") ||
|
|
strings.Contains(errStr, "unexpected EOF") ||
|
|
strings.Contains(errStr, "i/o timeout") ||
|
|
errors.Is(err, ErrBadTrailer)) {
|
|
wp.Logger.Printf("error when serving connection %q<->%q: %v", c.LocalAddr(), c.RemoteAddr(), err)
|
|
}
|
|
}
|
|
if err == errHijacked {
|
|
wp.connState(c, StateHijacked)
|
|
} else {
|
|
_ = c.Close()
|
|
wp.connState(c, StateClosed)
|
|
}
|
|
|
|
if !wp.release(ch) {
|
|
break
|
|
}
|
|
}
|
|
|
|
wp.lock.Lock()
|
|
wp.workersCount--
|
|
wp.lock.Unlock()
|
|
}
|