mirror of
https://github.com/valyala/fasthttp.git
synced 2026-06-14 15:56:44 +03:00
stackless: use dedicated worker pool per each stackless func.
This reduces tail latencies in our prod when multiple stackless funcs are used concurrently.
This commit is contained in:
+13
-14
@@ -22,9 +22,19 @@ func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool {
|
||||
if f == nil {
|
||||
panic("BUG: f cannot be nil")
|
||||
}
|
||||
|
||||
funcWorkCh := make(chan *funcWork, runtime.GOMAXPROCS(-1)*2048)
|
||||
onceInit := func() {
|
||||
n := runtime.GOMAXPROCS(-1)
|
||||
for i := 0; i < n; i++ {
|
||||
go funcWorker(funcWorkCh, f)
|
||||
}
|
||||
}
|
||||
var once sync.Once
|
||||
|
||||
return func(ctx interface{}) bool {
|
||||
once.Do(onceInit)
|
||||
fw := getFuncWork()
|
||||
fw.f = f
|
||||
fw.ctx = ctx
|
||||
|
||||
select {
|
||||
@@ -39,22 +49,13 @@ func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool {
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
n := runtime.GOMAXPROCS(-1)
|
||||
for i := 0; i < n; i++ {
|
||||
go funcWorker()
|
||||
}
|
||||
}
|
||||
|
||||
func funcWorker() {
|
||||
func funcWorker(funcWorkCh <-chan *funcWork, f func(ctx interface{})) {
|
||||
for fw := range funcWorkCh {
|
||||
fw.f(fw.ctx)
|
||||
f(fw.ctx)
|
||||
fw.done <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
var funcWorkCh = make(chan *funcWork, runtime.GOMAXPROCS(-1)*1024)
|
||||
|
||||
func getFuncWork() *funcWork {
|
||||
v := funcWorkPool.Get()
|
||||
if v == nil {
|
||||
@@ -66,7 +67,6 @@ func getFuncWork() *funcWork {
|
||||
}
|
||||
|
||||
func putFuncWork(fw *funcWork) {
|
||||
fw.f = nil
|
||||
fw.ctx = nil
|
||||
funcWorkPool.Put(fw)
|
||||
}
|
||||
@@ -74,7 +74,6 @@ func putFuncWork(fw *funcWork) {
|
||||
var funcWorkPool sync.Pool
|
||||
|
||||
type funcWork struct {
|
||||
f func(ctx interface{})
|
||||
ctx interface{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ func TestNewFuncSimple(t *testing.T) {
|
||||
atomic.AddUint64(&n, uint64(ctx.(int)))
|
||||
})
|
||||
|
||||
iterations := 2 * cap(funcWorkCh)
|
||||
iterations := 4 * 1024
|
||||
for i := 0; i < iterations; i++ {
|
||||
if !f(2) {
|
||||
t.Fatalf("f mustn't return false")
|
||||
@@ -33,7 +33,7 @@ func TestNewFuncMulti(t *testing.T) {
|
||||
atomic.AddUint64(&n2, uint64(ctx.(int)))
|
||||
})
|
||||
|
||||
iterations := 2 * cap(funcWorkCh)
|
||||
iterations := 4 * 1024
|
||||
|
||||
f1Done := make(chan error, 1)
|
||||
go func() {
|
||||
|
||||
Reference in New Issue
Block a user