From fdbe5bdec44adc349faa1c26cedc2fec8df4d5ed Mon Sep 17 00:00:00 2001 From: Erik Dubbelboer Date: Sat, 13 Jun 2026 07:20:02 +0200 Subject: [PATCH] Fix flaky race tests --- client.go | 12 +++++++++++- compress_test.go | 3 ++- fs_fs_test.go | 2 +- race_disabled_test.go | 5 +++++ race_enabled_test.go | 5 +++++ test_timeout_test.go | 16 ++++++++++++++++ 6 files changed, 40 insertions(+), 3 deletions(-) create mode 100644 race_disabled_test.go create mode 100644 race_enabled_test.go create mode 100644 test_timeout_test.go diff --git a/client.go b/client.go index c209794..3ade63a 100644 --- a/client.go +++ b/client.go @@ -3033,7 +3033,10 @@ func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}, chs * select { case w = <-chW: case <-stopTimer.C: - return nil + if c.canStopPipelineConn(chs) { + return nil + } + goto againChW case <-stopCh: return nil case <-flushTimerCh: @@ -3102,6 +3105,13 @@ func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}, chs * } } +func (c *pipelineConnClient) canStopPipelineConn(chs *pipelineConnChannels) bool { + c.chLock.Lock() + canStop := c.chs == chs && chs.users == 0 + c.chLock.Unlock() + return canStop && len(chs.chR) == 0 && len(chs.chW) == 0 +} + func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}, chs *pipelineConnChannels) error { readBufferSize := c.ReadBufferSize if readBufferSize <= 0 { diff --git a/compress_test.go b/compress_test.go index 53e8b11..2e778fc 100644 --- a/compress_test.go +++ b/compress_test.go @@ -216,6 +216,7 @@ func testConcurrent(concurrency int, f func() error) error { err := f() if err != nil { ch <- fmt.Errorf("error in goroutine %d: %w", idx, err) + return } ch <- nil }(i) @@ -226,7 +227,7 @@ func testConcurrent(concurrency int, f func() error) error { if err != nil { return err } - case <-time.After(time.Second): + case <-time.After(testTimeout(time.Second)): return errors.New("timeout") } } diff --git a/fs_fs_test.go b/fs_fs_test.go index e17767c..f2ee0e8 100644 --- a/fs_fs_test.go +++ b/fs_fs_test.go @@ -852,7 +852,7 @@ func TestDirFSFSCompressConcurrent(t *testing.T) { for range concurrency { select { case <-ch: - case <-time.After(time.Second * 2): + case <-time.After(testTimeout(time.Second * 2)): t.Fatalf("timeout") } } diff --git a/race_disabled_test.go b/race_disabled_test.go new file mode 100644 index 0000000..751f3e8 --- /dev/null +++ b/race_disabled_test.go @@ -0,0 +1,5 @@ +//go:build !race + +package fasthttp + +const raceEnabled = false diff --git a/race_enabled_test.go b/race_enabled_test.go new file mode 100644 index 0000000..62e0a67 --- /dev/null +++ b/race_enabled_test.go @@ -0,0 +1,5 @@ +//go:build race + +package fasthttp + +const raceEnabled = true diff --git a/test_timeout_test.go b/test_timeout_test.go new file mode 100644 index 0000000..36a949a --- /dev/null +++ b/test_timeout_test.go @@ -0,0 +1,16 @@ +package fasthttp + +import ( + "runtime" + "time" +) + +func testTimeout(timeout time.Duration) time.Duration { + if raceEnabled { + timeout *= 5 + } + if runtime.GOOS == "windows" { + timeout *= 2 + } + return timeout +}