diff --git a/client.go b/client.go index 6de3671..4e7359d 100644 --- a/client.go +++ b/client.go @@ -1377,6 +1377,12 @@ type PipelineClient struct { // DefaultMaxPendingRequests is used by default. MaxPendingRequests int + // The maximum delay before sending pipelined requests as a batch + // to the server. + // + // By default requests are sent immediately to the server. + MaxBatchDelay time.Duration + // Callback for connection establishing to the host. // // Default Dial is used if not set. @@ -1649,6 +1655,7 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error { writeBufferSize = defaultWriteBufferSize } bw := bufio.NewWriterSize(conn, writeBufferSize) + defer bw.Flush() chR := c.chR chW := c.chW @@ -1656,13 +1663,20 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error { if maxIdleConnDuration <= 0 { maxIdleConnDuration = DefaultMaxIdleConnDuration } + maxBatchDelay := c.MaxBatchDelay - stopTimer := time.NewTimer(time.Hour) var ( + stopTimer = time.NewTimer(time.Hour) + flushTimer = time.NewTimer(time.Hour) + flushTimerCh <-chan time.Time + instantTimerCh = make(chan time.Time) + w *pipelineWork err error ) + close(instantTimerCh) for { + againChW: select { case w = <-chW: // Fast path: len(chW) > 0 @@ -1675,6 +1689,12 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error { return nil case <-stopCh: return nil + case <-flushTimerCh: + if err = bw.Flush(); err != nil { + return err + } + flushTimerCh = nil + goto againChW } } @@ -1690,14 +1710,16 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error { w.done <- struct{}{} return err } - if len(chW) == 0 || len(chR) == cap(chR) { - if err = bw.Flush(); err != nil { - w.err = err - w.done <- struct{}{} - return err + if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) { + if maxBatchDelay > 0 { + flushTimer.Reset(maxBatchDelay) + flushTimerCh = flushTimer.C + } else { + flushTimerCh = instantTimerCh } } + againChR: select { case chR <- w: // Fast path: len(chR) < cap(chR) @@ -1709,6 +1731,14 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error { w.err = errPipelineClientStopped w.done <- struct{}{} return nil + case <-flushTimerCh: + if err = bw.Flush(); err != nil { + w.err = err + w.done <- struct{}{} + return err + } + flushTimerCh = nil + goto againChR } } } diff --git a/client_test.go b/client_test.go index e675e22..b1a79ce 100644 --- a/client_test.go +++ b/client_test.go @@ -16,14 +16,18 @@ import ( ) func TestPipelineClientDoSerial(t *testing.T) { - testPipelineClientDoConcurrent(t, 1) + testPipelineClientDoConcurrent(t, 1, 0) } func TestPipelineClientDoConcurrent(t *testing.T) { - testPipelineClientDoConcurrent(t, 10) + testPipelineClientDoConcurrent(t, 10, 0) } -func testPipelineClientDoConcurrent(t *testing.T, concurrency int) { +func TestPipelineClientDoBatchDelayConcurrent(t *testing.T) { + testPipelineClientDoConcurrent(t, 10, 5*time.Millisecond) +} + +func testPipelineClientDoConcurrent(t *testing.T, concurrency int, maxBatchDelay time.Duration) { ln := fasthttputil.NewInmemoryListener() s := &Server{ @@ -44,8 +48,10 @@ func testPipelineClientDoConcurrent(t *testing.T, concurrency int) { Dial: func(addr string) (net.Conn, error) { return ln.Dial() }, - MaxIdleConnDuration: 5 * time.Millisecond, - MaxPendingRequests: 2, + MaxIdleConnDuration: 23 * time.Millisecond, + MaxPendingRequests: 6, + MaxBatchDelay: maxBatchDelay, + Logger: &customLogger{}, } clientStopCh := make(chan struct{}, concurrency) @@ -59,7 +65,7 @@ func testPipelineClientDoConcurrent(t *testing.T, concurrency int) { for i := 0; i < concurrency; i++ { select { case <-clientStopCh: - case <-time.After(time.Second): + case <-time.After(3*time.Second): t.Fatalf("timeout") } }