InmemoryListener: re-use byteBuffer channels

This commit is contained in:
Aliaksandr Valialkin
2016-02-04 14:30:09 +02:00
parent 484b819453
commit 8f1c8f7bbe
2 changed files with 129 additions and 48 deletions
+14 -3
View File
@@ -19,16 +19,23 @@ func TestInmemoryListener(t *testing.T) {
}
defer conn.Close()
req := fmt.Sprintf("request_%d", n)
if _, err = conn.Write([]byte(req)); err != nil {
nn, err := conn.Write([]byte(req))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if nn != len(req) {
t.Fatalf("unexpected number of bytes written: %d. Expecting %d", nn, len(req))
}
buf := make([]byte, 30)
nn, err := conn.Read(buf)
nn, err = conn.Read(buf)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
buf = buf[:nn]
resp := fmt.Sprintf("response_%d", n)
if nn != len(resp) {
t.Fatalf("unexpected number of bytes read: %d. Expecting %d", nn, len(resp))
}
if string(buf) != resp {
t.Fatalf("unexpected response %q. Expecting %q", buf, resp)
}
@@ -55,9 +62,13 @@ func TestInmemoryListener(t *testing.T) {
t.Fatalf("unexpected request prefix %q. Expecting %q", buf, "request_")
}
resp := fmt.Sprintf("response_%s", buf[len("request_"):])
if _, err = conn.Write([]byte(resp)); err != nil {
n, err = conn.Write([]byte(resp))
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if n != len(resp) {
t.Fatalf("unexpected number of bytes written: %d. Expecting %d", n, len(resp))
}
}
}()
+115 -45
View File
@@ -9,11 +9,14 @@ import (
)
func newPipeConns() *pipeConns {
ch1 := acquirePipeChan()
ch2 := acquirePipeChan()
pc := &pipeConns{}
pc.c1.r = make(chan *byteBuffer, 1024)
pc.c1.w = make(chan *byteBuffer, 1024)
pc.c2.r = pc.c1.w
pc.c2.w = pc.c1.r
pc.c1.r = ch1
pc.c1.w = ch2
pc.c2.r = ch2
pc.c2.w = ch1
pc.c1.parent = pc
pc.c2.parent = pc
return pc
@@ -24,40 +27,92 @@ type pipeConns struct {
c2 pipeConn
}
func (pc *pipeConns) release() {
pc.c1.wlock.Lock()
pc.c2.wlock.Lock()
mustRelease := pc.c1.wclosed && pc.c2.wclosed
pc.c1.wlock.Unlock()
pc.c2.wlock.Unlock()
if mustRelease {
pc.c1.release()
pc.c2.release()
}
}
type pipeConn struct {
r chan *byteBuffer
w chan *byteBuffer
b *byteBuffer
bb []byte
lock sync.RWMutex
closed bool
r *pipeChan
w *pipeChan
b *byteBuffer
bb []byte
rclosed bool
wlock sync.Mutex
wclosed bool
parent *pipeConns
}
func (c *pipeConn) Write(p []byte) (int, error) {
c.lock.RLock()
if c.closed {
c.lock.RUnlock()
return 0, errors.New("connection closed")
}
b := acquireByteBuffer()
b.b = append(b.b[:0], p...)
c.w <- b
c.lock.RUnlock()
c.wlock.Lock()
if c.wclosed {
c.wlock.Unlock()
releaseByteBuffer(b)
return 0, errors.New("connection closed for writing")
}
c.w.ch <- b
c.wlock.Unlock()
return len(p), nil
}
func (c *pipeConn) Read(p []byte) (int, error) {
mayBlock := true
nn := 0
for len(p) > 0 {
n, err := c.read(p, mayBlock)
nn += n
if err != nil {
if !mayBlock && err == errWouldBlock {
err = nil
} else {
}
return nn, err
}
p = p[n:]
mayBlock = false
}
return nn, nil
}
func (c *pipeConn) read(p []byte, mayBlock bool) (int, error) {
if len(c.bb) == 0 {
releaseByteBuffer(c.b)
c.b = nil
b, ok := <-c.r
if !ok {
if c.rclosed {
return 0, io.EOF
}
if mayBlock {
c.b = <-c.r.ch
} else {
select {
case c.b = <-c.r.ch:
default:
return 0, errWouldBlock
}
}
if c.b == nil {
c.rclosed = true
return 0, io.EOF
}
c.b = b
c.bb = c.b.b
}
n := copy(p, c.bb)
@@ -66,17 +121,26 @@ func (c *pipeConn) Read(p []byte) (int, error) {
return n, nil
}
var errWouldBlock = errors.New("would block")
func (c *pipeConn) Close() error {
c.lock.Lock()
if !c.closed {
close(c.w)
c.closed = true
c.lock.Unlock()
freeBuffers(c.parent)
return nil
}
c.lock.Unlock()
return errors.New("already closed")
c.wlock.Lock()
c.wclosed = true
c.w.ch <- nil
c.wlock.Unlock()
c.parent.release()
return nil
}
func (c *pipeConn) release() {
releaseByteBuffer(c.b)
releasePipeChan(c.r)
c.r = nil
c.w = nil
c.b = nil
c.bb = nil
}
func (p *pipeConn) LocalAddr() net.Addr {
@@ -129,23 +193,29 @@ var byteBufferPool = &sync.Pool{
},
}
func freeBuffers(pc *pipeConns) {
pc.c1.lock.RLock()
pc.c2.lock.RLock()
mustFree := pc.c1.closed && pc.c2.closed
pc.c1.lock.RUnlock()
pc.c2.lock.RUnlock()
if mustFree {
freeBufs(pc.c1.r)
freeBufs(pc.c2.r)
func acquirePipeChan() *pipeChan {
ch := pipeChanPool.Get().(*pipeChan)
if len(ch.ch) > 0 {
panic("BUG: non-empty pipeChan acquired")
}
return ch
}
func freeBufs(ch chan *byteBuffer) {
for b := range ch {
func releasePipeChan(ch *pipeChan) {
for b := range ch.ch {
releaseByteBuffer(b)
}
pipeChanPool.Put(ch)
}
var pipeChanPool = &sync.Pool{
New: func() interface{} {
return &pipeChan{
ch: make(chan *byteBuffer, 4),
}
},
}
type pipeChan struct {
ch chan *byteBuffer
}