From ca21b21eba267eac4489fb6b42260129727c4a1b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 1 Apr 2016 18:21:35 +0300 Subject: [PATCH] Return ErrPipelineOverflow from PipelineClient.Do if the pending requests' queue is overflown. This should prevent from caller's goroutines leak on stalled pipeline client --- client.go | 19 +++++++++++++++++-- client_test.go | 8 ++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index f9e123b..6de3671 100644 --- a/client.go +++ b/client.go @@ -1547,7 +1547,15 @@ func (c *PipelineClient) Do(req *Request, resp *Response) error { w.resp = &w.respCopy } - c.chW <- w + // Put the request to outgoing queue + select { + case c.chW <- w: + default: + releasePipelineWork(&c.workPool, w) + return ErrPipelineOverflow + } + + // Wait for the response <-w.done err := w.err @@ -1556,6 +1564,10 @@ func (c *PipelineClient) Do(req *Request, resp *Response) error { return err } +// ErrPipelineOverflow may be returned from PipelineClient.Do +// if the requests' queue is overflown. +var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxPendingRequests") + // DefaultMaxPendingRequests is the default value // for PipelineClient.MaxPendingRequests. const DefaultMaxPendingRequests = 1024 @@ -1756,7 +1768,10 @@ func (c *PipelineClient) logger() Logger { func (c *PipelineClient) PendingRequests() int { c.init() - return len(c.chR) + c.chLock.Lock() + n := len(c.chR) + c.chLock.Unlock() + return n } var errPipelineClientStopped = errors.New("pipeline client has been stopped") diff --git a/client_test.go b/client_test.go index e52a0c2..e675e22 100644 --- a/client_test.go +++ b/client_test.go @@ -64,6 +64,10 @@ func testPipelineClientDoConcurrent(t *testing.T, concurrency int) { } } + if c.PendingRequests() != 0 { + t.Fatalf("unexpected number of pending requests: %d. Expecting zero", c.PendingRequests()) + } + if err := ln.Close(); err != nil { t.Fatalf("unexpected error: %s", err) } @@ -86,6 +90,10 @@ func testPipelineClientDo(t *testing.T, c *PipelineClient) { err = c.Do(req, resp) } if err != nil { + if err == ErrPipelineOverflow { + time.Sleep(10 * time.Millisecond) + continue + } t.Fatalf("unexpected error on iteration %d: %s", i, err) } if resp.StatusCode() != StatusOK {