diff --git a/server.go b/server.go index 32e1d18..f5ddfbb 100644 --- a/server.go +++ b/server.go @@ -9,8 +9,6 @@ import ( "net" "os" "runtime" - "runtime/debug" - "strings" "sync" "sync/atomic" "time" @@ -303,30 +301,19 @@ func (s *Server) ServeConcurrency(ln net.Listener, concurrency int) error { var err error wp := &workerPool{ - s: s, - maxWorkersCount: concurrency, + WorkerFunc: s.serveConn, + MaxWorkersCount: concurrency, + Logger: s.logger(), } - stopCh := make(chan struct{}) - go func() { - for { - select { - case <-stopCh: - return - default: - time.Sleep(time.Second) - } - wp.clean() - } - }() + wp.Start() for { if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { - wp.stop() - close(stopCh) + wp.Stop() return err } for attempts := 4; attempts > 0; attempts-- { - if wp.tryServe(c) { + if wp.TryServe(c) { c = nil break } @@ -344,118 +331,6 @@ func (s *Server) ServeConcurrency(ln net.Listener, concurrency int) error { } } -// workerPool serves incoming connections via a pool of workers -// in FIFO order, i.e. the most recently stopped worker will serve the next -// incoming connection. -type workerPool struct { - s *Server - maxWorkersCount int - - lock sync.Mutex - workersCount int - mustStop bool - - ready []*workerChan -} - -type workerChan struct { - t time.Time - ch chan net.Conn -} - -func (wp *workerPool) stop() { - // Stop all the workers waiting for incoming connections. - // Do not wait for busy workers - they will stop after - // serving the connection and noticing wp.mustStop = true. - wp.lock.Lock() - for _, ch := range wp.ready { - ch.ch <- nil - } - wp.ready = nil - wp.mustStop = true - wp.lock.Unlock() -} - -func (wp *workerPool) clean() { - // Clean least recently used workers if they didn't serve connections - // for more than one second. - wp.lock.Lock() - chans := wp.ready - for len(chans) > 1 && time.Since(chans[0].t) > time.Second { - chans[0].ch <- nil - copy(chans, chans[1:]) - chans = chans[:len(chans)-1] - wp.ready = chans - wp.workersCount-- - } - wp.lock.Unlock() -} - -func (wp *workerPool) tryServe(c net.Conn) bool { - var ch *workerChan - createWorker := false - - wp.lock.Lock() - chans := wp.ready - n := len(chans) - 1 - if n < 0 { - if wp.workersCount < wp.maxWorkersCount { - createWorker = true - wp.workersCount++ - } - } else { - ch = chans[n] - wp.ready = chans[:n] - } - wp.lock.Unlock() - - if ch == nil { - if !createWorker { - return false - } - vch := workerChanPool.Get() - if vch == nil { - vch = &workerChan{ - ch: make(chan net.Conn, 1), - } - } - ch = vch.(*workerChan) - go func() { - connWorker(wp, ch) - workerChanPool.Put(vch) - }() - } - ch.ch <- c - return true -} - -var workerChanPool sync.Pool - -func connWorker(wp *workerPool, ch *workerChan) { - defer func() { - if r := recover(); r != nil { - wp.s.logger().Printf("panic: %s\nStack trace:\n%s", r, debug.Stack()) - } - }() - - var c net.Conn - for c = range ch.ch { - if c == nil { - break - } - wp.s.serveConn(c) - c = nil - - ch.t = time.Now() - wp.lock.Lock() - if wp.mustStop { - break - } - wp.ready = append(wp.ready, ch) - wp.lock.Unlock() - } -} - func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) { for { c, err := ln.Accept() @@ -671,10 +546,6 @@ func (s *Server) serveConn(c io.ReadWriteCloser) error { if bw != nil { releaseWriter(ctx, bw) } - - if err != nil && !strings.Contains(err.Error(), "connection reset by peer") { - ctx.Logger().Printf("Error when serving network connection: %s", err) - } s.releaseCtx(ctx) err1 := c.Close() diff --git a/workerpool.go b/workerpool.go new file mode 100644 index 0000000..fe5cf56 --- /dev/null +++ b/workerpool.go @@ -0,0 +1,165 @@ +package fasthttp + +import ( + "io" + "net" + "runtime/debug" + "sync" + "time" +) + +// workerPool serves incoming connections via a pool of workers +// in FIFO order, i.e. the most recently stopped worker will serve the next +// incoming connection. +type workerPool struct { + // Function for serving incoming connections. + // It must close c before returning. + WorkerFunc func(c io.ReadWriteCloser) error + + // Maximum number of workers to create. + MaxWorkersCount int + + // Logger used by workerPool. + Logger Logger + + lock sync.Mutex + workersCount int + mustStop bool + + ready []*workerChan + + stopCh chan struct{} +} + +type workerChan struct { + t time.Time + ch chan net.Conn +} + +func (wp *workerPool) Start() { + if wp.stopCh != nil { + panic("BUG: workerPool already started") + } + wp.stopCh = make(chan struct{}) + stopCh := wp.stopCh + go func() { + for { + select { + case <-stopCh: + return + default: + time.Sleep(time.Second) + } + wp.clean() + } + }() +} + +func (wp *workerPool) Stop() { + if wp.stopCh == nil { + panic("BUG: workerPool wasn't started") + } + close(wp.stopCh) + wp.stopCh = nil + + // Stop all the workers waiting for incoming connections. + // Do not wait for busy workers - they will stop after + // serving the connection and noticing wp.mustStop = true. + wp.lock.Lock() + for _, ch := range wp.ready { + ch.ch <- nil + } + wp.ready = nil + wp.mustStop = true + wp.lock.Unlock() +} + +func (wp *workerPool) clean() { + // Clean least recently used workers if they didn't serve connections + // for more than one second. + wp.lock.Lock() + chans := wp.ready + for len(chans) > 1 && time.Since(chans[0].t) > time.Second { + chans[0].ch <- nil + copy(chans, chans[1:]) + chans = chans[:len(chans)-1] + wp.ready = chans + wp.workersCount-- + } + wp.lock.Unlock() +} + +func (wp *workerPool) TryServe(c net.Conn) bool { + var ch *workerChan + createWorker := false + + wp.lock.Lock() + chans := wp.ready + n := len(chans) - 1 + if n < 0 { + if wp.workersCount < wp.MaxWorkersCount { + createWorker = true + wp.workersCount++ + } + } else { + ch = chans[n] + wp.ready = chans[:n] + } + wp.lock.Unlock() + + if ch == nil { + if !createWorker { + return false + } + vch := workerChanPool.Get() + if vch == nil { + vch = &workerChan{ + ch: make(chan net.Conn, 1), + } + } + ch = vch.(*workerChan) + go func() { + wp.workerFunc(ch) + workerChanPool.Put(vch) + }() + } + ch.ch <- c + return true +} + +func (wp *workerPool) release(ch *workerChan) bool { + ch.t = time.Now() + wp.lock.Lock() + if wp.mustStop { + return false + } + wp.ready = append(wp.ready, ch) + wp.lock.Unlock() + return true +} + +var workerChanPool sync.Pool + +func (wp *workerPool) workerFunc(ch *workerChan) { + defer func() { + if r := recover(); r != nil { + wp.Logger.Printf("panic: %s\nStack trace:\n%s", r, debug.Stack()) + } + }() + + var c net.Conn + var err error + for c = range ch.ch { + if c == nil { + break + } + if err = wp.WorkerFunc(c); err != nil { + wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) + } + c = nil + + if !wp.release(ch) { + break + } + } +}