Wait for the response of pipelineWork in background and return it to pool (#1436)

This commit is contained in:
Andy Pan
2022-11-17 13:31:03 +08:00
committed by GitHub
parent c367454ffe
commit 8f434434e7
+40 -40
View File
@@ -2376,7 +2376,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t
req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
}
w := acquirePipelineWork(&c.workPool, timeout)
w := c.acquirePipelineWork(timeout)
w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
w.req = &w.reqCopy
w.resp = &w.respCopy
@@ -2394,7 +2394,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t
select {
case c.chW <- w:
case <-w.t.C:
releasePipelineWork(&c.workPool, w)
c.releasePipelineWork(w)
return ErrTimeout
}
}
@@ -2408,7 +2408,7 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t
swapResponseBody(resp, &w.respCopy)
}
err = w.err
releasePipelineWork(&c.workPool, w)
c.releasePipelineWork(w)
case <-w.t.C:
err = ErrTimeout
}
@@ -2416,6 +2416,40 @@ func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline t
return err
}
func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
v := c.workPool.Get()
if v != nil {
w = v.(*pipelineWork)
} else {
w = &pipelineWork{
done: make(chan struct{}, 1),
}
}
if timeout > 0 {
if w.t == nil {
w.t = time.NewTimer(timeout)
} else {
w.t.Reset(timeout)
}
w.deadline = time.Now().Add(timeout)
} else {
w.deadline = zeroTime
}
return w
}
func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
if w.t != nil {
w.t.Stop()
}
w.reqCopy.Reset()
w.respCopy.Reset()
w.req = nil
w.resp = nil
w.err = nil
c.workPool.Put(w)
}
// Do performs the given http request and sets the corresponding response.
//
// Request must contain at least non-zero RequestURI with full url (including
@@ -2443,7 +2477,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
}
w := acquirePipelineWork(&c.workPool, 0)
w := c.acquirePipelineWork(0)
w.req = req
if resp != nil {
resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
@@ -2466,7 +2500,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
select {
case c.chW <- w:
default:
releasePipelineWork(&c.workPool, w)
c.releasePipelineWork(w)
return ErrPipelineOverflow
}
}
@@ -2475,7 +2509,7 @@ func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
<-w.done
err := w.err
releasePipelineWork(&c.workPool, w)
c.releasePipelineWork(w)
return err
}
@@ -2852,37 +2886,3 @@ func (c *pipelineConnClient) getClientName() []byte {
}
var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
func acquirePipelineWork(pool *sync.Pool, timeout time.Duration) (w *pipelineWork) {
v := pool.Get()
if v != nil {
w = v.(*pipelineWork)
} else {
w = &pipelineWork{
done: make(chan struct{}, 1),
}
}
if timeout > 0 {
if w.t == nil {
w.t = time.NewTimer(timeout)
} else {
w.t.Reset(timeout)
}
w.deadline = time.Now().Add(timeout)
} else {
w.deadline = zeroTime
}
return w
}
func releasePipelineWork(pool *sync.Pool, w *pipelineWork) {
if w.t != nil {
w.t.Stop()
}
w.reqCopy.Reset()
w.respCopy.Reset()
w.req = nil
w.resp = nil
w.err = nil
pool.Put(w)
}