mirror of
https://github.com/valyala/fasthttp.git
synced 2026-06-14 15:56:44 +03:00
Extracted workerPool from server.go into workerpool.go
This commit is contained in:
@@ -9,8 +9,6 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -303,30 +301,19 @@ func (s *Server) ServeConcurrency(ln net.Listener, concurrency int) error {
|
||||
var err error
|
||||
|
||||
wp := &workerPool{
|
||||
s: s,
|
||||
maxWorkersCount: concurrency,
|
||||
WorkerFunc: s.serveConn,
|
||||
MaxWorkersCount: concurrency,
|
||||
Logger: s.logger(),
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
default:
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
wp.clean()
|
||||
}
|
||||
}()
|
||||
wp.Start()
|
||||
|
||||
for {
|
||||
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
|
||||
wp.stop()
|
||||
close(stopCh)
|
||||
wp.Stop()
|
||||
return err
|
||||
}
|
||||
for attempts := 4; attempts > 0; attempts-- {
|
||||
if wp.tryServe(c) {
|
||||
if wp.TryServe(c) {
|
||||
c = nil
|
||||
break
|
||||
}
|
||||
@@ -344,118 +331,6 @@ func (s *Server) ServeConcurrency(ln net.Listener, concurrency int) error {
|
||||
}
|
||||
}
|
||||
|
||||
// workerPool serves incoming connections via a pool of workers
|
||||
// in FIFO order, i.e. the most recently stopped worker will serve the next
|
||||
// incoming connection.
|
||||
type workerPool struct {
|
||||
s *Server
|
||||
maxWorkersCount int
|
||||
|
||||
lock sync.Mutex
|
||||
workersCount int
|
||||
mustStop bool
|
||||
|
||||
ready []*workerChan
|
||||
}
|
||||
|
||||
type workerChan struct {
|
||||
t time.Time
|
||||
ch chan net.Conn
|
||||
}
|
||||
|
||||
func (wp *workerPool) stop() {
|
||||
// Stop all the workers waiting for incoming connections.
|
||||
// Do not wait for busy workers - they will stop after
|
||||
// serving the connection and noticing wp.mustStop = true.
|
||||
wp.lock.Lock()
|
||||
for _, ch := range wp.ready {
|
||||
ch.ch <- nil
|
||||
}
|
||||
wp.ready = nil
|
||||
wp.mustStop = true
|
||||
wp.lock.Unlock()
|
||||
}
|
||||
|
||||
func (wp *workerPool) clean() {
|
||||
// Clean least recently used workers if they didn't serve connections
|
||||
// for more than one second.
|
||||
wp.lock.Lock()
|
||||
chans := wp.ready
|
||||
for len(chans) > 1 && time.Since(chans[0].t) > time.Second {
|
||||
chans[0].ch <- nil
|
||||
copy(chans, chans[1:])
|
||||
chans = chans[:len(chans)-1]
|
||||
wp.ready = chans
|
||||
wp.workersCount--
|
||||
}
|
||||
wp.lock.Unlock()
|
||||
}
|
||||
|
||||
func (wp *workerPool) tryServe(c net.Conn) bool {
|
||||
var ch *workerChan
|
||||
createWorker := false
|
||||
|
||||
wp.lock.Lock()
|
||||
chans := wp.ready
|
||||
n := len(chans) - 1
|
||||
if n < 0 {
|
||||
if wp.workersCount < wp.maxWorkersCount {
|
||||
createWorker = true
|
||||
wp.workersCount++
|
||||
}
|
||||
} else {
|
||||
ch = chans[n]
|
||||
wp.ready = chans[:n]
|
||||
}
|
||||
wp.lock.Unlock()
|
||||
|
||||
if ch == nil {
|
||||
if !createWorker {
|
||||
return false
|
||||
}
|
||||
vch := workerChanPool.Get()
|
||||
if vch == nil {
|
||||
vch = &workerChan{
|
||||
ch: make(chan net.Conn, 1),
|
||||
}
|
||||
}
|
||||
ch = vch.(*workerChan)
|
||||
go func() {
|
||||
connWorker(wp, ch)
|
||||
workerChanPool.Put(vch)
|
||||
}()
|
||||
}
|
||||
ch.ch <- c
|
||||
return true
|
||||
}
|
||||
|
||||
var workerChanPool sync.Pool
|
||||
|
||||
func connWorker(wp *workerPool, ch *workerChan) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
wp.s.logger().Printf("panic: %s\nStack trace:\n%s", r, debug.Stack())
|
||||
}
|
||||
}()
|
||||
|
||||
var c net.Conn
|
||||
for c = range ch.ch {
|
||||
if c == nil {
|
||||
break
|
||||
}
|
||||
wp.s.serveConn(c)
|
||||
c = nil
|
||||
|
||||
ch.t = time.Now()
|
||||
wp.lock.Lock()
|
||||
if wp.mustStop {
|
||||
break
|
||||
}
|
||||
wp.ready = append(wp.ready, ch)
|
||||
wp.lock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
|
||||
for {
|
||||
c, err := ln.Accept()
|
||||
@@ -671,10 +546,6 @@ func (s *Server) serveConn(c io.ReadWriteCloser) error {
|
||||
if bw != nil {
|
||||
releaseWriter(ctx, bw)
|
||||
}
|
||||
|
||||
if err != nil && !strings.Contains(err.Error(), "connection reset by peer") {
|
||||
ctx.Logger().Printf("Error when serving network connection: %s", err)
|
||||
}
|
||||
s.releaseCtx(ctx)
|
||||
|
||||
err1 := c.Close()
|
||||
|
||||
+165
@@ -0,0 +1,165 @@
|
||||
package fasthttp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// workerPool serves incoming connections via a pool of workers
|
||||
// in FIFO order, i.e. the most recently stopped worker will serve the next
|
||||
// incoming connection.
|
||||
type workerPool struct {
|
||||
// Function for serving incoming connections.
|
||||
// It must close c before returning.
|
||||
WorkerFunc func(c io.ReadWriteCloser) error
|
||||
|
||||
// Maximum number of workers to create.
|
||||
MaxWorkersCount int
|
||||
|
||||
// Logger used by workerPool.
|
||||
Logger Logger
|
||||
|
||||
lock sync.Mutex
|
||||
workersCount int
|
||||
mustStop bool
|
||||
|
||||
ready []*workerChan
|
||||
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
type workerChan struct {
|
||||
t time.Time
|
||||
ch chan net.Conn
|
||||
}
|
||||
|
||||
func (wp *workerPool) Start() {
|
||||
if wp.stopCh != nil {
|
||||
panic("BUG: workerPool already started")
|
||||
}
|
||||
wp.stopCh = make(chan struct{})
|
||||
stopCh := wp.stopCh
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
default:
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
wp.clean()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (wp *workerPool) Stop() {
|
||||
if wp.stopCh == nil {
|
||||
panic("BUG: workerPool wasn't started")
|
||||
}
|
||||
close(wp.stopCh)
|
||||
wp.stopCh = nil
|
||||
|
||||
// Stop all the workers waiting for incoming connections.
|
||||
// Do not wait for busy workers - they will stop after
|
||||
// serving the connection and noticing wp.mustStop = true.
|
||||
wp.lock.Lock()
|
||||
for _, ch := range wp.ready {
|
||||
ch.ch <- nil
|
||||
}
|
||||
wp.ready = nil
|
||||
wp.mustStop = true
|
||||
wp.lock.Unlock()
|
||||
}
|
||||
|
||||
func (wp *workerPool) clean() {
|
||||
// Clean least recently used workers if they didn't serve connections
|
||||
// for more than one second.
|
||||
wp.lock.Lock()
|
||||
chans := wp.ready
|
||||
for len(chans) > 1 && time.Since(chans[0].t) > time.Second {
|
||||
chans[0].ch <- nil
|
||||
copy(chans, chans[1:])
|
||||
chans = chans[:len(chans)-1]
|
||||
wp.ready = chans
|
||||
wp.workersCount--
|
||||
}
|
||||
wp.lock.Unlock()
|
||||
}
|
||||
|
||||
func (wp *workerPool) TryServe(c net.Conn) bool {
|
||||
var ch *workerChan
|
||||
createWorker := false
|
||||
|
||||
wp.lock.Lock()
|
||||
chans := wp.ready
|
||||
n := len(chans) - 1
|
||||
if n < 0 {
|
||||
if wp.workersCount < wp.MaxWorkersCount {
|
||||
createWorker = true
|
||||
wp.workersCount++
|
||||
}
|
||||
} else {
|
||||
ch = chans[n]
|
||||
wp.ready = chans[:n]
|
||||
}
|
||||
wp.lock.Unlock()
|
||||
|
||||
if ch == nil {
|
||||
if !createWorker {
|
||||
return false
|
||||
}
|
||||
vch := workerChanPool.Get()
|
||||
if vch == nil {
|
||||
vch = &workerChan{
|
||||
ch: make(chan net.Conn, 1),
|
||||
}
|
||||
}
|
||||
ch = vch.(*workerChan)
|
||||
go func() {
|
||||
wp.workerFunc(ch)
|
||||
workerChanPool.Put(vch)
|
||||
}()
|
||||
}
|
||||
ch.ch <- c
|
||||
return true
|
||||
}
|
||||
|
||||
func (wp *workerPool) release(ch *workerChan) bool {
|
||||
ch.t = time.Now()
|
||||
wp.lock.Lock()
|
||||
if wp.mustStop {
|
||||
return false
|
||||
}
|
||||
wp.ready = append(wp.ready, ch)
|
||||
wp.lock.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
var workerChanPool sync.Pool
|
||||
|
||||
func (wp *workerPool) workerFunc(ch *workerChan) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
wp.Logger.Printf("panic: %s\nStack trace:\n%s", r, debug.Stack())
|
||||
}
|
||||
}()
|
||||
|
||||
var c net.Conn
|
||||
var err error
|
||||
for c = range ch.ch {
|
||||
if c == nil {
|
||||
break
|
||||
}
|
||||
if err = wp.WorkerFunc(c); err != nil {
|
||||
wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
|
||||
}
|
||||
c = nil
|
||||
|
||||
if !wp.release(ch) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user