PipelineClinet: added MaxBatchDelay option

This commit is contained in:
Aliaksandr Valialkin
2016-04-02 23:18:34 +03:00
parent ca21b21eba
commit 8ae2d3e53c
2 changed files with 48 additions and 12 deletions
+36 -6
View File
@@ -1377,6 +1377,12 @@ type PipelineClient struct {
// DefaultMaxPendingRequests is used by default.
MaxPendingRequests int
// The maximum delay before sending pipelined requests as a batch
// to the server.
//
// By default requests are sent immediately to the server.
MaxBatchDelay time.Duration
// Callback for connection establishing to the host.
//
// Default Dial is used if not set.
@@ -1649,6 +1655,7 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
writeBufferSize = defaultWriteBufferSize
}
bw := bufio.NewWriterSize(conn, writeBufferSize)
defer bw.Flush()
chR := c.chR
chW := c.chW
@@ -1656,13 +1663,20 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
if maxIdleConnDuration <= 0 {
maxIdleConnDuration = DefaultMaxIdleConnDuration
}
maxBatchDelay := c.MaxBatchDelay
stopTimer := time.NewTimer(time.Hour)
var (
stopTimer = time.NewTimer(time.Hour)
flushTimer = time.NewTimer(time.Hour)
flushTimerCh <-chan time.Time
instantTimerCh = make(chan time.Time)
w *pipelineWork
err error
)
close(instantTimerCh)
for {
againChW:
select {
case w = <-chW:
// Fast path: len(chW) > 0
@@ -1675,6 +1689,12 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
return nil
case <-stopCh:
return nil
case <-flushTimerCh:
if err = bw.Flush(); err != nil {
return err
}
flushTimerCh = nil
goto againChW
}
}
@@ -1690,14 +1710,16 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
w.done <- struct{}{}
return err
}
if len(chW) == 0 || len(chR) == cap(chR) {
if err = bw.Flush(); err != nil {
w.err = err
w.done <- struct{}{}
return err
if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
if maxBatchDelay > 0 {
flushTimer.Reset(maxBatchDelay)
flushTimerCh = flushTimer.C
} else {
flushTimerCh = instantTimerCh
}
}
againChR:
select {
case chR <- w:
// Fast path: len(chR) < cap(chR)
@@ -1709,6 +1731,14 @@ func (c *PipelineClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
w.err = errPipelineClientStopped
w.done <- struct{}{}
return nil
case <-flushTimerCh:
if err = bw.Flush(); err != nil {
w.err = err
w.done <- struct{}{}
return err
}
flushTimerCh = nil
goto againChR
}
}
}
+12 -6
View File
@@ -16,14 +16,18 @@ import (
)
func TestPipelineClientDoSerial(t *testing.T) {
testPipelineClientDoConcurrent(t, 1)
testPipelineClientDoConcurrent(t, 1, 0)
}
func TestPipelineClientDoConcurrent(t *testing.T) {
testPipelineClientDoConcurrent(t, 10)
testPipelineClientDoConcurrent(t, 10, 0)
}
func testPipelineClientDoConcurrent(t *testing.T, concurrency int) {
func TestPipelineClientDoBatchDelayConcurrent(t *testing.T) {
testPipelineClientDoConcurrent(t, 10, 5*time.Millisecond)
}
func testPipelineClientDoConcurrent(t *testing.T, concurrency int, maxBatchDelay time.Duration) {
ln := fasthttputil.NewInmemoryListener()
s := &Server{
@@ -44,8 +48,10 @@ func testPipelineClientDoConcurrent(t *testing.T, concurrency int) {
Dial: func(addr string) (net.Conn, error) {
return ln.Dial()
},
MaxIdleConnDuration: 5 * time.Millisecond,
MaxPendingRequests: 2,
MaxIdleConnDuration: 23 * time.Millisecond,
MaxPendingRequests: 6,
MaxBatchDelay: maxBatchDelay,
Logger: &customLogger{},
}
clientStopCh := make(chan struct{}, concurrency)
@@ -59,7 +65,7 @@ func testPipelineClientDoConcurrent(t *testing.T, concurrency int) {
for i := 0; i < concurrency; i++ {
select {
case <-clientStopCh:
case <-time.After(time.Second):
case <-time.After(3*time.Second):
t.Fatalf("timeout")
}
}