Return ErrPipelineOverflow from PipelineClient.Do if the pending requests' queue is overflown. This should prevent from caller's goroutines leak on stalled pipeline client

This commit is contained in:
Aliaksandr Valialkin
2016-04-01 18:21:35 +03:00
parent 22c9594090
commit ca21b21eba
2 changed files with 25 additions and 2 deletions
+17 -2
View File
@@ -1547,7 +1547,15 @@ func (c *PipelineClient) Do(req *Request, resp *Response) error {
w.resp = &w.respCopy
}
c.chW <- w
// Put the request to outgoing queue
select {
case c.chW <- w:
default:
releasePipelineWork(&c.workPool, w)
return ErrPipelineOverflow
}
// Wait for the response
<-w.done
err := w.err
@@ -1556,6 +1564,10 @@ func (c *PipelineClient) Do(req *Request, resp *Response) error {
return err
}
// ErrPipelineOverflow may be returned from PipelineClient.Do
// if the requests' queue is overflown.
var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxPendingRequests")
// DefaultMaxPendingRequests is the default value
// for PipelineClient.MaxPendingRequests.
const DefaultMaxPendingRequests = 1024
@@ -1756,7 +1768,10 @@ func (c *PipelineClient) logger() Logger {
func (c *PipelineClient) PendingRequests() int {
c.init()
return len(c.chR)
c.chLock.Lock()
n := len(c.chR)
c.chLock.Unlock()
return n
}
var errPipelineClientStopped = errors.New("pipeline client has been stopped")
+8
View File
@@ -64,6 +64,10 @@ func testPipelineClientDoConcurrent(t *testing.T, concurrency int) {
}
}
if c.PendingRequests() != 0 {
t.Fatalf("unexpected number of pending requests: %d. Expecting zero", c.PendingRequests())
}
if err := ln.Close(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
@@ -86,6 +90,10 @@ func testPipelineClientDo(t *testing.T, c *PipelineClient) {
err = c.Do(req, resp)
}
if err != nil {
if err == ErrPipelineOverflow {
time.Sleep(10 * time.Millisecond)
continue
}
t.Fatalf("unexpected error on iteration %d: %s", i, err)
}
if resp.StatusCode() != StatusOK {