From e113a6dfce3d5f50e9529b6dbb4638402bcdb164 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 6 Feb 2017 14:23:36 +0200 Subject: [PATCH] stackless: use dedicated worker pool per each stackless func. This reduces tail latencies in our prod when multiple stackless funcs are used concurrently. --- stackless/func.go | 27 +++++++++++++-------------- stackless/func_test.go | 4 ++-- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/stackless/func.go b/stackless/func.go index 4202b49..9a49bcc 100644 --- a/stackless/func.go +++ b/stackless/func.go @@ -22,9 +22,19 @@ func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool { if f == nil { panic("BUG: f cannot be nil") } + + funcWorkCh := make(chan *funcWork, runtime.GOMAXPROCS(-1)*2048) + onceInit := func() { + n := runtime.GOMAXPROCS(-1) + for i := 0; i < n; i++ { + go funcWorker(funcWorkCh, f) + } + } + var once sync.Once + return func(ctx interface{}) bool { + once.Do(onceInit) fw := getFuncWork() - fw.f = f fw.ctx = ctx select { @@ -39,22 +49,13 @@ func NewFunc(f func(ctx interface{})) func(ctx interface{}) bool { } } -func init() { - n := runtime.GOMAXPROCS(-1) - for i := 0; i < n; i++ { - go funcWorker() - } -} - -func funcWorker() { +func funcWorker(funcWorkCh <-chan *funcWork, f func(ctx interface{})) { for fw := range funcWorkCh { - fw.f(fw.ctx) + f(fw.ctx) fw.done <- struct{}{} } } -var funcWorkCh = make(chan *funcWork, runtime.GOMAXPROCS(-1)*1024) - func getFuncWork() *funcWork { v := funcWorkPool.Get() if v == nil { @@ -66,7 +67,6 @@ func getFuncWork() *funcWork { } func putFuncWork(fw *funcWork) { - fw.f = nil fw.ctx = nil funcWorkPool.Put(fw) } @@ -74,7 +74,6 @@ func putFuncWork(fw *funcWork) { var funcWorkPool sync.Pool type funcWork struct { - f func(ctx interface{}) ctx interface{} done chan struct{} } diff --git a/stackless/func_test.go b/stackless/func_test.go index 9fb4588..4f2c492 100644 --- a/stackless/func_test.go +++ b/stackless/func_test.go @@ -13,7 +13,7 @@ func TestNewFuncSimple(t *testing.T) { atomic.AddUint64(&n, uint64(ctx.(int))) }) - iterations := 2 * cap(funcWorkCh) + iterations := 4 * 1024 for i := 0; i < iterations; i++ { if !f(2) { t.Fatalf("f mustn't return false") @@ -33,7 +33,7 @@ func TestNewFuncMulti(t *testing.T) { atomic.AddUint64(&n2, uint64(ctx.(int))) }) - iterations := 2 * cap(funcWorkCh) + iterations := 4 * 1024 f1Done := make(chan error, 1) go func() {