diff --git a/workerpool.go b/workerpool.go index 84b1464..9b1987e 100644 --- a/workerpool.go +++ b/workerpool.go @@ -50,6 +50,11 @@ func (wp *workerPool) Start() { } wp.stopCh = make(chan struct{}) stopCh := wp.stopCh + wp.workerChanPool.New = func() interface{} { + return &workerChan{ + ch: make(chan net.Conn, workerChanCap), + } + } go func() { var scratch []*workerChan for { @@ -76,8 +81,8 @@ func (wp *workerPool) Stop() { // serving the connection and noticing wp.mustStop = true. wp.lock.Lock() ready := wp.ready - for i, ch := range ready { - ch.ch <- nil + for i := range ready { + ready[i].ch <- nil ready[i] = nil } wp.ready = ready[:0] @@ -97,23 +102,34 @@ func (wp *workerPool) clean(scratch *[]*workerChan) { // Clean least recently used workers if they didn't serve connections // for more than maxIdleWorkerDuration. - currentTime := time.Now() + criticalTime := time.Now().Add(-maxIdleWorkerDuration) wp.lock.Lock() ready := wp.ready n := len(ready) - i := 0 - for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration { - i++ - } - *scratch = append((*scratch)[:0], ready[:i]...) - if i > 0 { - m := copy(ready, ready[i:]) - for i = m; i < n; i++ { - ready[i] = nil + + // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up. + l, r, mid := 0, n-1, 0 + for l <= r { + mid = (l + r) / 2 + if criticalTime.After(wp.ready[mid].lastUseTime) { + l = mid + 1 + } else { + r = mid - 1 } - wp.ready = ready[:m] } + i := r + if i == -1 { + wp.lock.Unlock() + return + } + + *scratch = append((*scratch)[:0], ready[:i+1]...) + m := copy(ready, ready[i+1:]) + for i = m; i < n; i++ { + ready[i] = nil + } + wp.ready = ready[:m] wp.lock.Unlock() // Notify obsolete workers to stop. @@ -121,8 +137,8 @@ func (wp *workerPool) clean(scratch *[]*workerChan) { // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. tmp := *scratch - for i, ch := range tmp { - ch.ch <- nil + for i := range tmp { + tmp[i].ch <- nil tmp[i] = nil } } @@ -174,11 +190,6 @@ func (wp *workerPool) getCh() *workerChan { return nil } vch := wp.workerChanPool.Get() - if vch == nil { - vch = &workerChan{ - ch: make(chan net.Conn, workerChanCap), - } - } ch = vch.(*workerChan) go func() { wp.workerFunc(ch) @@ -222,7 +233,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) { if err == errHijacked { wp.connState(c, StateHijacked) } else { - c.Close() + _ = c.Close() wp.connState(c, StateClosed) } c = nil