mirror of
https://github.com/valyala/fasthttp.git
synced 2026-06-13 15:46:49 +03:00
Make several optimizations to worker pool (#680)
* Use binary-search algorithm to speed up cleaning up workers * Speed it up when iterating the slice of workerChan * Use sync.Pool as a more canonical way * Add benchmark test between binary-search and linear search * Optimize range to the slice of workerChan, avoiding elements copy * Perfect the benchmark of work pool * Make binary-search code inline and remove benchmark test code
This commit is contained in:
committed by
Erik Dubbelboer
parent
f82a6460e9
commit
9f11af2968
+32
-21
@@ -50,6 +50,11 @@ func (wp *workerPool) Start() {
|
||||
}
|
||||
wp.stopCh = make(chan struct{})
|
||||
stopCh := wp.stopCh
|
||||
wp.workerChanPool.New = func() interface{} {
|
||||
return &workerChan{
|
||||
ch: make(chan net.Conn, workerChanCap),
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
var scratch []*workerChan
|
||||
for {
|
||||
@@ -76,8 +81,8 @@ func (wp *workerPool) Stop() {
|
||||
// serving the connection and noticing wp.mustStop = true.
|
||||
wp.lock.Lock()
|
||||
ready := wp.ready
|
||||
for i, ch := range ready {
|
||||
ch.ch <- nil
|
||||
for i := range ready {
|
||||
ready[i].ch <- nil
|
||||
ready[i] = nil
|
||||
}
|
||||
wp.ready = ready[:0]
|
||||
@@ -97,23 +102,34 @@ func (wp *workerPool) clean(scratch *[]*workerChan) {
|
||||
|
||||
// Clean least recently used workers if they didn't serve connections
|
||||
// for more than maxIdleWorkerDuration.
|
||||
currentTime := time.Now()
|
||||
criticalTime := time.Now().Add(-maxIdleWorkerDuration)
|
||||
|
||||
wp.lock.Lock()
|
||||
ready := wp.ready
|
||||
n := len(ready)
|
||||
i := 0
|
||||
for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
|
||||
i++
|
||||
}
|
||||
*scratch = append((*scratch)[:0], ready[:i]...)
|
||||
if i > 0 {
|
||||
m := copy(ready, ready[i:])
|
||||
for i = m; i < n; i++ {
|
||||
ready[i] = nil
|
||||
|
||||
// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
|
||||
l, r, mid := 0, n-1, 0
|
||||
for l <= r {
|
||||
mid = (l + r) / 2
|
||||
if criticalTime.After(wp.ready[mid].lastUseTime) {
|
||||
l = mid + 1
|
||||
} else {
|
||||
r = mid - 1
|
||||
}
|
||||
wp.ready = ready[:m]
|
||||
}
|
||||
i := r
|
||||
if i == -1 {
|
||||
wp.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
*scratch = append((*scratch)[:0], ready[:i+1]...)
|
||||
m := copy(ready, ready[i+1:])
|
||||
for i = m; i < n; i++ {
|
||||
ready[i] = nil
|
||||
}
|
||||
wp.ready = ready[:m]
|
||||
wp.lock.Unlock()
|
||||
|
||||
// Notify obsolete workers to stop.
|
||||
@@ -121,8 +137,8 @@ func (wp *workerPool) clean(scratch *[]*workerChan) {
|
||||
// may be blocking and may consume a lot of time if many workers
|
||||
// are located on non-local CPUs.
|
||||
tmp := *scratch
|
||||
for i, ch := range tmp {
|
||||
ch.ch <- nil
|
||||
for i := range tmp {
|
||||
tmp[i].ch <- nil
|
||||
tmp[i] = nil
|
||||
}
|
||||
}
|
||||
@@ -174,11 +190,6 @@ func (wp *workerPool) getCh() *workerChan {
|
||||
return nil
|
||||
}
|
||||
vch := wp.workerChanPool.Get()
|
||||
if vch == nil {
|
||||
vch = &workerChan{
|
||||
ch: make(chan net.Conn, workerChanCap),
|
||||
}
|
||||
}
|
||||
ch = vch.(*workerChan)
|
||||
go func() {
|
||||
wp.workerFunc(ch)
|
||||
@@ -222,7 +233,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
|
||||
if err == errHijacked {
|
||||
wp.connState(c, StateHijacked)
|
||||
} else {
|
||||
c.Close()
|
||||
_ = c.Close()
|
||||
wp.connState(c, StateClosed)
|
||||
}
|
||||
c = nil
|
||||
|
||||
Reference in New Issue
Block a user