diff --git a/server.go b/server.go index 2d7de4d..81d7532 100644 --- a/server.go +++ b/server.go @@ -277,7 +277,7 @@ func (ctx *RequestCtx) TimeoutError(msg string) { } // Default concurrency used by Server.Serve(). -const DefaultConcurrency = 64 * 1024 +const DefaultConcurrency = 256 * 1024 // Serve serves incoming connections from the given listener. // @@ -293,82 +293,144 @@ func (s *Server) Serve(ln net.Listener) error { // ServeConcurrency blocks until the given listener returns permanent error. // This error is returned from ServeConcurrency. func (s *Server) ServeConcurrency(ln net.Listener, concurrency int) error { - ch := make(chan net.Conn, 2*concurrency) - stopCh := make(chan struct{}) - go connWorkersMonitor(s, ch, concurrency, stopCh) var lastOverflowErrorTime time.Time var lastPerIPErrorTime time.Time + var c net.Conn + var err error + + wp := &workerPool{ + s: s, + maxWorkersCount: concurrency, + } for { - c, err := acceptConn(s, ln, &lastPerIPErrorTime) - if err != nil { - close(stopCh) + if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { + wp.stop() return err } - select { - case ch <- c: - default: + for attempts := 4; attempts > 0; attempts-- { + if wp.tryServe(c) { + c = nil + break + } + runtime.Gosched() + } + if c != nil { c.Close() + c = nil if time.Since(lastOverflowErrorTime) > time.Minute { s.logger().Printf("The incoming connection cannot be served, because all %d workers are busy. "+ - "Try increasing concurrency in Server.ServeWorkers()", concurrency) + "Try increasing concurrency in Server.ServeConcurrency()", concurrency) lastOverflowErrorTime = time.Now() } } } } -func connWorkersMonitor(s *Server, ch <-chan net.Conn, maxWorkers int, stopCh <-chan struct{}) { - workersCount := uint32(0) - var tc *time.Timer - for { - n := int(atomic.LoadUint32(&workersCount)) - pendingConns := len(ch) - if n < maxWorkers && pendingConns > 0 { - for i := 0; i < pendingConns; i++ { - atomic.AddUint32(&workersCount, 1) - go func() { - connWorker(s, ch) - atomic.AddUint32(&workersCount, ^uint32(0)) - }() - } - runtime.Gosched() - } else { - tc = initTimer(tc, 100*time.Millisecond) - select { - case <-stopCh: - stopTimer(tc) - return - case <-tc.C: - stopTimer(tc) - } - } - } +// workerPool serves incoming connections via a pool of workers +// in FIFO order, i.e. the most recently stopped worker will serve the next +// incoming connection. +type workerPool struct { + s *Server + maxWorkersCount int + + lock sync.Mutex + workersCount int + mustStop bool + + ready []*workerChan } -func connWorker(s *Server, ch <-chan net.Conn) { +type workerChan struct { + t time.Time + ch chan net.Conn +} + +func (wp *workerPool) stop() { + // Stop all the workers waiting for incoming connections + // Do not wait for busy workers - they will stop after + // serving the connection and noticing wp.mustStop = true. + wp.lock.Lock() + for _, ch := range wp.ready { + ch.ch <- nil + } + wp.ready = nil + wp.mustStop = true + wp.lock.Unlock() +} + +func (wp *workerPool) tryServe(c net.Conn) bool { + var ch *workerChan + createWorker := false + + wp.lock.Lock() + chans := wp.ready + + // stop workers, which didn't work for more than one second. + for len(chans) > 1 && time.Since(chans[0].t) > time.Second { + chans[0].ch <- nil + copy(chans, chans[1:]) + chans = chans[:len(chans)-1] + wp.ready = chans + wp.workersCount-- + } + + n := len(chans) - 1 + if n < 0 { + if wp.workersCount < wp.maxWorkersCount { + createWorker = true + wp.workersCount++ + } + } else { + ch = chans[n] + wp.ready = chans[:n] + } + + wp.lock.Unlock() + + if ch == nil { + if !createWorker { + return false + } + vch := workerChanPool.Get() + if vch == nil { + vch = &workerChan{ + ch: make(chan net.Conn, 1), + } + } + ch = vch.(*workerChan) + go func() { + connWorker(wp, ch) + workerChanPool.Put(vch) + }() + } + ch.ch <- c + return true +} + +var workerChanPool sync.Pool + +func connWorker(wp *workerPool, ch *workerChan) { defer func() { if r := recover(); r != nil { - s.logger().Printf("panic: %s\nStack trace:\n%s", r, debug.Stack()) + wp.s.logger().Printf("panic: %s\nStack trace:\n%s", r, debug.Stack()) } }() var c net.Conn - var tc *time.Timer - for { - select { - case c = <-ch: - default: - tc = initTimer(tc, time.Second) - select { - case c = <-ch: - stopTimer(tc) - case <-tc.C: - stopTimer(tc) - return - } + for c = range ch.ch { + if c == nil { + break } - s.serveConn(c) + wp.s.serveConn(c) c = nil + + ch.t = time.Now() + wp.lock.Lock() + if wp.mustStop { + break + } + wp.ready = append(wp.ready, ch) + wp.lock.Unlock() } }