From 8f1c8f7bbee834e568b24eaf3ef1fa3ecc8c426d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 4 Feb 2016 14:30:09 +0200 Subject: [PATCH] InmemoryListener: re-use byteBuffer channels --- fasthttputil/inmemory_listener_test.go | 17 ++- fasthttputil/pipe.go | 160 ++++++++++++++++++------- 2 files changed, 129 insertions(+), 48 deletions(-) diff --git a/fasthttputil/inmemory_listener_test.go b/fasthttputil/inmemory_listener_test.go index 03dcf73..86aab68 100644 --- a/fasthttputil/inmemory_listener_test.go +++ b/fasthttputil/inmemory_listener_test.go @@ -19,16 +19,23 @@ func TestInmemoryListener(t *testing.T) { } defer conn.Close() req := fmt.Sprintf("request_%d", n) - if _, err = conn.Write([]byte(req)); err != nil { + nn, err := conn.Write([]byte(req)) + if err != nil { t.Fatalf("unexpected error: %s", err) } + if nn != len(req) { + t.Fatalf("unexpected number of bytes written: %d. Expecting %d", nn, len(req)) + } buf := make([]byte, 30) - nn, err := conn.Read(buf) + nn, err = conn.Read(buf) if err != nil { t.Fatalf("unexpected error: %s", err) } buf = buf[:nn] resp := fmt.Sprintf("response_%d", n) + if nn != len(resp) { + t.Fatalf("unexpected number of bytes read: %d. Expecting %d", nn, len(resp)) + } if string(buf) != resp { t.Fatalf("unexpected response %q. Expecting %q", buf, resp) } @@ -55,9 +62,13 @@ func TestInmemoryListener(t *testing.T) { t.Fatalf("unexpected request prefix %q. Expecting %q", buf, "request_") } resp := fmt.Sprintf("response_%s", buf[len("request_"):]) - if _, err = conn.Write([]byte(resp)); err != nil { + n, err = conn.Write([]byte(resp)) + if err != nil { t.Fatalf("unexpected error: %s", err) } + if n != len(resp) { + t.Fatalf("unexpected number of bytes written: %d. Expecting %d", n, len(resp)) + } } }() diff --git a/fasthttputil/pipe.go b/fasthttputil/pipe.go index bef1481..1b62a73 100644 --- a/fasthttputil/pipe.go +++ b/fasthttputil/pipe.go @@ -9,11 +9,14 @@ import ( ) func newPipeConns() *pipeConns { + ch1 := acquirePipeChan() + ch2 := acquirePipeChan() + pc := &pipeConns{} - pc.c1.r = make(chan *byteBuffer, 1024) - pc.c1.w = make(chan *byteBuffer, 1024) - pc.c2.r = pc.c1.w - pc.c2.w = pc.c1.r + pc.c1.r = ch1 + pc.c1.w = ch2 + pc.c2.r = ch2 + pc.c2.w = ch1 pc.c1.parent = pc pc.c2.parent = pc return pc @@ -24,40 +27,92 @@ type pipeConns struct { c2 pipeConn } +func (pc *pipeConns) release() { + pc.c1.wlock.Lock() + pc.c2.wlock.Lock() + mustRelease := pc.c1.wclosed && pc.c2.wclosed + pc.c1.wlock.Unlock() + pc.c2.wlock.Unlock() + + if mustRelease { + pc.c1.release() + pc.c2.release() + } +} + type pipeConn struct { - r chan *byteBuffer - w chan *byteBuffer - b *byteBuffer - bb []byte - lock sync.RWMutex - closed bool + r *pipeChan + w *pipeChan + b *byteBuffer + bb []byte + + rclosed bool + + wlock sync.Mutex + wclosed bool + parent *pipeConns } func (c *pipeConn) Write(p []byte) (int, error) { - c.lock.RLock() - if c.closed { - c.lock.RUnlock() - return 0, errors.New("connection closed") - } - b := acquireByteBuffer() b.b = append(b.b[:0], p...) - c.w <- b - c.lock.RUnlock() + c.wlock.Lock() + if c.wclosed { + c.wlock.Unlock() + releaseByteBuffer(b) + return 0, errors.New("connection closed for writing") + } + c.w.ch <- b + c.wlock.Unlock() + return len(p), nil } func (c *pipeConn) Read(p []byte) (int, error) { + mayBlock := true + nn := 0 + for len(p) > 0 { + n, err := c.read(p, mayBlock) + nn += n + if err != nil { + if !mayBlock && err == errWouldBlock { + err = nil + } else { + } + return nn, err + } + p = p[n:] + mayBlock = false + } + + return nn, nil +} + +func (c *pipeConn) read(p []byte, mayBlock bool) (int, error) { if len(c.bb) == 0 { releaseByteBuffer(c.b) c.b = nil - b, ok := <-c.r - if !ok { + + if c.rclosed { + return 0, io.EOF + } + + if mayBlock { + c.b = <-c.r.ch + } else { + select { + case c.b = <-c.r.ch: + default: + return 0, errWouldBlock + } + } + + if c.b == nil { + c.rclosed = true return 0, io.EOF } - c.b = b c.bb = c.b.b } n := copy(p, c.bb) @@ -66,17 +121,26 @@ func (c *pipeConn) Read(p []byte) (int, error) { return n, nil } +var errWouldBlock = errors.New("would block") + func (c *pipeConn) Close() error { - c.lock.Lock() - if !c.closed { - close(c.w) - c.closed = true - c.lock.Unlock() - freeBuffers(c.parent) - return nil - } - c.lock.Unlock() - return errors.New("already closed") + c.wlock.Lock() + c.wclosed = true + c.w.ch <- nil + c.wlock.Unlock() + + c.parent.release() + return nil +} + +func (c *pipeConn) release() { + releaseByteBuffer(c.b) + releasePipeChan(c.r) + + c.r = nil + c.w = nil + c.b = nil + c.bb = nil } func (p *pipeConn) LocalAddr() net.Addr { @@ -129,23 +193,29 @@ var byteBufferPool = &sync.Pool{ }, } -func freeBuffers(pc *pipeConns) { - pc.c1.lock.RLock() - pc.c2.lock.RLock() - - mustFree := pc.c1.closed && pc.c2.closed - - pc.c1.lock.RUnlock() - pc.c2.lock.RUnlock() - - if mustFree { - freeBufs(pc.c1.r) - freeBufs(pc.c2.r) +func acquirePipeChan() *pipeChan { + ch := pipeChanPool.Get().(*pipeChan) + if len(ch.ch) > 0 { + panic("BUG: non-empty pipeChan acquired") } + return ch } -func freeBufs(ch chan *byteBuffer) { - for b := range ch { +func releasePipeChan(ch *pipeChan) { + for b := range ch.ch { releaseByteBuffer(b) } + pipeChanPool.Put(ch) +} + +var pipeChanPool = &sync.Pool{ + New: func() interface{} { + return &pipeChan{ + ch: make(chan *byteBuffer, 4), + } + }, +} + +type pipeChan struct { + ch chan *byteBuffer }