Server worker pool optimization - now incoming connections are served by workers in 'most recently used worker' order. This should keep CPU caches hot

This commit is contained in:
Aliaksandr Valialkin
2015-11-10 15:24:36 +02:00
parent 8081e14ca0
commit fa05bcd728
+115 -53
View File
@@ -277,7 +277,7 @@ func (ctx *RequestCtx) TimeoutError(msg string) {
}
// Default concurrency used by Server.Serve().
const DefaultConcurrency = 64 * 1024
const DefaultConcurrency = 256 * 1024
// Serve serves incoming connections from the given listener.
//
@@ -293,82 +293,144 @@ func (s *Server) Serve(ln net.Listener) error {
// ServeConcurrency blocks until the given listener returns permanent error.
// This error is returned from ServeConcurrency.
func (s *Server) ServeConcurrency(ln net.Listener, concurrency int) error {
ch := make(chan net.Conn, 2*concurrency)
stopCh := make(chan struct{})
go connWorkersMonitor(s, ch, concurrency, stopCh)
var lastOverflowErrorTime time.Time
var lastPerIPErrorTime time.Time
var c net.Conn
var err error
wp := &workerPool{
s: s,
maxWorkersCount: concurrency,
}
for {
c, err := acceptConn(s, ln, &lastPerIPErrorTime)
if err != nil {
close(stopCh)
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
wp.stop()
return err
}
select {
case ch <- c:
default:
for attempts := 4; attempts > 0; attempts-- {
if wp.tryServe(c) {
c = nil
break
}
runtime.Gosched()
}
if c != nil {
c.Close()
c = nil
if time.Since(lastOverflowErrorTime) > time.Minute {
s.logger().Printf("The incoming connection cannot be served, because all %d workers are busy. "+
"Try increasing concurrency in Server.ServeWorkers()", concurrency)
"Try increasing concurrency in Server.ServeConcurrency()", concurrency)
lastOverflowErrorTime = time.Now()
}
}
}
}
func connWorkersMonitor(s *Server, ch <-chan net.Conn, maxWorkers int, stopCh <-chan struct{}) {
workersCount := uint32(0)
var tc *time.Timer
for {
n := int(atomic.LoadUint32(&workersCount))
pendingConns := len(ch)
if n < maxWorkers && pendingConns > 0 {
for i := 0; i < pendingConns; i++ {
atomic.AddUint32(&workersCount, 1)
go func() {
connWorker(s, ch)
atomic.AddUint32(&workersCount, ^uint32(0))
}()
}
runtime.Gosched()
} else {
tc = initTimer(tc, 100*time.Millisecond)
select {
case <-stopCh:
stopTimer(tc)
return
case <-tc.C:
stopTimer(tc)
}
}
}
// workerPool serves incoming connections via a pool of workers
// in FIFO order, i.e. the most recently stopped worker will serve the next
// incoming connection.
type workerPool struct {
s *Server
maxWorkersCount int
lock sync.Mutex
workersCount int
mustStop bool
ready []*workerChan
}
func connWorker(s *Server, ch <-chan net.Conn) {
type workerChan struct {
t time.Time
ch chan net.Conn
}
func (wp *workerPool) stop() {
// Stop all the workers waiting for incoming connections
// Do not wait for busy workers - they will stop after
// serving the connection and noticing wp.mustStop = true.
wp.lock.Lock()
for _, ch := range wp.ready {
ch.ch <- nil
}
wp.ready = nil
wp.mustStop = true
wp.lock.Unlock()
}
func (wp *workerPool) tryServe(c net.Conn) bool {
var ch *workerChan
createWorker := false
wp.lock.Lock()
chans := wp.ready
// stop workers, which didn't work for more than one second.
for len(chans) > 1 && time.Since(chans[0].t) > time.Second {
chans[0].ch <- nil
copy(chans, chans[1:])
chans = chans[:len(chans)-1]
wp.ready = chans
wp.workersCount--
}
n := len(chans) - 1
if n < 0 {
if wp.workersCount < wp.maxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = chans[n]
wp.ready = chans[:n]
}
wp.lock.Unlock()
if ch == nil {
if !createWorker {
return false
}
vch := workerChanPool.Get()
if vch == nil {
vch = &workerChan{
ch: make(chan net.Conn, 1),
}
}
ch = vch.(*workerChan)
go func() {
connWorker(wp, ch)
workerChanPool.Put(vch)
}()
}
ch.ch <- c
return true
}
var workerChanPool sync.Pool
func connWorker(wp *workerPool, ch *workerChan) {
defer func() {
if r := recover(); r != nil {
s.logger().Printf("panic: %s\nStack trace:\n%s", r, debug.Stack())
wp.s.logger().Printf("panic: %s\nStack trace:\n%s", r, debug.Stack())
}
}()
var c net.Conn
var tc *time.Timer
for {
select {
case c = <-ch:
default:
tc = initTimer(tc, time.Second)
select {
case c = <-ch:
stopTimer(tc)
case <-tc.C:
stopTimer(tc)
return
}
for c = range ch.ch {
if c == nil {
break
}
s.serveConn(c)
wp.s.serveConn(c)
c = nil
ch.t = time.Now()
wp.lock.Lock()
if wp.mustStop {
break
}
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()
}
}