mirror of
https://github.com/valyala/fasthttp.git
synced 2026-06-14 15:56:44 +03:00
Using atomic instead of mutex and delete scratch slice (#1833)
* using atomic instead of mutex and delete scratch slice * optimize struct * fix default bool * escape data race * avoid the momentary change of wp.workersCount. * removed unused tail (for now) * little fixes * fixed allocations This option immediately exits the loop when the maximum number of vorkers is reached, rather than creating a new vorker if the limit is reached. This reduces the frequency of unnecessary operations and potential locks in sync.Pool * Update for linter workerpool.go Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com> * Update for lint#2 workerpool.go Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com> * Update for lint#3 workerpool.go Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com> * Update for lint#4 workerpool.go * eliminating potential data races --------- Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>
This commit is contained in:
+74
-91
@@ -6,6 +6,7 @@ import (
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -21,29 +22,57 @@ type workerPool struct {
|
||||
|
||||
// Function for serving server connections.
|
||||
// It must leave c unclosed.
|
||||
ready workerChanStack
|
||||
WorkerFunc ServeHandler
|
||||
|
||||
stopCh chan struct{}
|
||||
|
||||
connState func(net.Conn, ConnState)
|
||||
|
||||
ready []*workerChan
|
||||
|
||||
MaxWorkersCount int
|
||||
|
||||
MaxIdleWorkerDuration time.Duration
|
||||
|
||||
workersCount int
|
||||
workersCount int32
|
||||
|
||||
lock sync.Mutex
|
||||
mustStop atomic.Bool
|
||||
|
||||
LogAllErrors bool
|
||||
mustStop bool
|
||||
}
|
||||
|
||||
type workerChan struct {
|
||||
lastUseTime time.Time
|
||||
ch chan net.Conn
|
||||
next *workerChan
|
||||
|
||||
ch chan net.Conn
|
||||
|
||||
lastUseTime int64
|
||||
}
|
||||
|
||||
type workerChanStack struct {
|
||||
head atomic.Pointer[workerChan]
|
||||
}
|
||||
|
||||
func (s *workerChanStack) push(ch *workerChan) {
|
||||
for {
|
||||
oldHead := s.head.Load()
|
||||
ch.next = oldHead
|
||||
if s.head.CompareAndSwap(oldHead, ch) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *workerChanStack) pop() *workerChan {
|
||||
for {
|
||||
oldHead := s.head.Load()
|
||||
if oldHead == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.head.CompareAndSwap(oldHead, oldHead.next) {
|
||||
return oldHead
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wp *workerPool) Start() {
|
||||
@@ -58,9 +87,8 @@ func (wp *workerPool) Start() {
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
var scratch []*workerChan
|
||||
for {
|
||||
wp.clean(&scratch)
|
||||
wp.clean()
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
@@ -81,15 +109,15 @@ func (wp *workerPool) Stop() {
|
||||
// Stop all the workers waiting for incoming connections.
|
||||
// Do not wait for busy workers - they will stop after
|
||||
// serving the connection and noticing wp.mustStop = true.
|
||||
wp.lock.Lock()
|
||||
ready := wp.ready
|
||||
for i := range ready {
|
||||
ready[i].ch <- nil
|
||||
ready[i] = nil
|
||||
|
||||
for {
|
||||
ch := wp.ready.pop()
|
||||
if ch == nil {
|
||||
break
|
||||
}
|
||||
ch.ch <- nil
|
||||
}
|
||||
wp.ready = ready[:0]
|
||||
wp.mustStop = true
|
||||
wp.lock.Unlock()
|
||||
wp.mustStop.Store(true)
|
||||
}
|
||||
|
||||
func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
|
||||
@@ -99,49 +127,21 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
|
||||
return wp.MaxIdleWorkerDuration
|
||||
}
|
||||
|
||||
func (wp *workerPool) clean(scratch *[]*workerChan) {
|
||||
func (wp *workerPool) clean() {
|
||||
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
|
||||
criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano()
|
||||
|
||||
// Clean least recently used workers if they didn't serve connections
|
||||
// for more than maxIdleWorkerDuration.
|
||||
criticalTime := time.Now().Add(-maxIdleWorkerDuration)
|
||||
|
||||
wp.lock.Lock()
|
||||
ready := wp.ready
|
||||
n := len(ready)
|
||||
|
||||
// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
|
||||
l, r := 0, n-1
|
||||
for l <= r {
|
||||
mid := (l + r) / 2
|
||||
if criticalTime.After(wp.ready[mid].lastUseTime) {
|
||||
l = mid + 1
|
||||
} else {
|
||||
r = mid - 1
|
||||
for {
|
||||
current := wp.ready.head.Load()
|
||||
if current == nil || atomic.LoadInt64(¤t.lastUseTime) >= criticalTime {
|
||||
break
|
||||
}
|
||||
}
|
||||
i := r
|
||||
if i == -1 {
|
||||
wp.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
*scratch = append((*scratch)[:0], ready[:i+1]...)
|
||||
m := copy(ready, ready[i+1:])
|
||||
for i = m; i < n; i++ {
|
||||
ready[i] = nil
|
||||
}
|
||||
wp.ready = ready[:m]
|
||||
wp.lock.Unlock()
|
||||
|
||||
// Notify obsolete workers to stop.
|
||||
// This notification must be outside the wp.lock, since ch.ch
|
||||
// may be blocking and may consume a lot of time if many workers
|
||||
// are located on non-local CPUs.
|
||||
tmp := *scratch
|
||||
for i := range tmp {
|
||||
tmp[i].ch <- nil
|
||||
tmp[i] = nil
|
||||
next := current.next
|
||||
if wp.ready.head.CompareAndSwap(current, next) {
|
||||
current.ch <- nil
|
||||
wp.workerChanPool.Put(current)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,47 +169,32 @@ var workerChanCap = func() int {
|
||||
}()
|
||||
|
||||
func (wp *workerPool) getCh() *workerChan {
|
||||
var ch *workerChan
|
||||
createWorker := false
|
||||
|
||||
wp.lock.Lock()
|
||||
ready := wp.ready
|
||||
n := len(ready) - 1
|
||||
if n < 0 {
|
||||
if wp.workersCount < wp.MaxWorkersCount {
|
||||
createWorker = true
|
||||
wp.workersCount++
|
||||
for {
|
||||
ch := wp.ready.pop()
|
||||
if ch != nil {
|
||||
return ch
|
||||
}
|
||||
} else {
|
||||
ch = ready[n]
|
||||
ready[n] = nil
|
||||
wp.ready = ready[:n]
|
||||
}
|
||||
wp.lock.Unlock()
|
||||
|
||||
if ch == nil {
|
||||
if !createWorker {
|
||||
return nil
|
||||
currentWorkers := atomic.LoadInt32(&wp.workersCount)
|
||||
if currentWorkers < int32(wp.MaxWorkersCount) {
|
||||
if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) {
|
||||
ch = wp.workerChanPool.Get().(*workerChan)
|
||||
go wp.workerFunc(ch)
|
||||
return ch
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
vch := wp.workerChanPool.Get()
|
||||
ch = vch.(*workerChan)
|
||||
go func() {
|
||||
wp.workerFunc(ch)
|
||||
wp.workerChanPool.Put(vch)
|
||||
}()
|
||||
}
|
||||
return ch
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wp *workerPool) release(ch *workerChan) bool {
|
||||
ch.lastUseTime = time.Now()
|
||||
wp.lock.Lock()
|
||||
if wp.mustStop {
|
||||
wp.lock.Unlock()
|
||||
atomic.StoreInt64(&ch.lastUseTime, time.Now().UnixNano())
|
||||
if wp.mustStop.Load() {
|
||||
return false
|
||||
}
|
||||
wp.ready = append(wp.ready, ch)
|
||||
wp.lock.Unlock()
|
||||
wp.ready.push(ch)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -245,7 +230,5 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
|
||||
}
|
||||
}
|
||||
|
||||
wp.lock.Lock()
|
||||
wp.workersCount--
|
||||
wp.lock.Unlock()
|
||||
atomic.AddInt32(&wp.workersCount, -1)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user