Files
fasthttp/streaming_test.go
T
Roman Khimov 19fcd40863 Fix chunked streaming (#1015)
* http: refactor out crlf reading function

Make it a bit simpler and make it reusable.

* streaming: fix chunked stream test

This test is supposed to check for stream unchunking, but it's not really
effective with that because chunks created by createChunkedBody() get wrapped
into another chunk by writeBodyStream(), so we end up with chunkedBody in
request handler although what we really want is plain body.

Deduplicate test and benchmark also.

* streaming: fix Read interface

It wasn't actually compatible with io.Reader as io.Reader _never_ returns n >
len(p) while this function easily did that for chunked payloads confusing its
users:

panic: runtime error: slice bounds out of range [:528] with capacity 512

goroutine 562 [running]:
io.ReadAll(0x9f4380, 0xc0003be1a0, 0xc0004fcd80, 0x0, 0x0, 0xc00086bc30, 0x46f99b)
        /usr/lib64/go/1.16/src/io/io.go:634 +0x205
io/ioutil.ReadAll(...)
        /usr/lib64/go/1.16/src/io/ioutil/ioutil.go:27
github.com/valyala/fasthttp.getChunkedTestEnv.func1(0xc001fdc680)
        /home/rik/dev/fasthttp/streaming_test.go:108 +0x6c
github.com/valyala/fasthttp.(*Server).serveConn(0xc000416d80, 0xa034e8, 0xc0004da880, 0x0, 0x0)
        /home/rik/dev/fasthttp/server.go:2219 +0x12ee
github.com/valyala/fasthttp.(*workerPool).workerFunc(0xc000148960, 0xc0003be160)
        /home/rik/dev/fasthttp/workerpool.go:223 +0xba
github.com/valyala/fasthttp.(*workerPool).getCh.func1(0xc000148960, 0xc0003be160, 0x8b4ec0, 0xc0003be160)
        /home/rik/dev/fasthttp/workerpool.go:195 +0x35
created by github.com/valyala/fasthttp.(*workerPool).getCh
        /home/rik/dev/fasthttp/workerpool.go:194 +0x11f

It also returned len(p) in some cases where it read less than that.
2021-05-04 12:55:54 +02:00

192 lines
4.1 KiB
Go

package fasthttp
import (
"bufio"
"bytes"
"io/ioutil"
"sync"
"testing"
"time"
"github.com/valyala/fasthttp/fasthttputil"
)
func TestStreamingPipeline(t *testing.T) {
t.Parallel()
reqS := `POST /one HTTP/1.1
Host: example.com
Content-Length: 10
aaaaaaaaaa
POST /two HTTP/1.1
Host: example.com
Content-Length: 10
aaaaaaaaaa`
ln := fasthttputil.NewInmemoryListener()
s := &Server{
StreamRequestBody: true,
Handler: func(ctx *RequestCtx) {
body := ""
expected := "aaaaaaaaaa"
if string(ctx.Path()) == "/one" {
body = string(ctx.PostBody())
} else {
all, err := ioutil.ReadAll(ctx.RequestBodyStream())
if err != nil {
t.Error(err)
}
body = string(all)
}
if body != expected {
t.Errorf("expected %q got %q", expected, body)
}
},
}
ch := make(chan struct{})
go func() {
if err := s.Serve(ln); err != nil {
t.Errorf("unexpected error: %s", err)
}
close(ch)
}()
conn, err := ln.Dial()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if _, err = conn.Write([]byte(reqS)); err != nil {
t.Fatalf("unexpected error: %s", err)
}
var resp Response
br := bufio.NewReader(conn)
respCh := make(chan struct{})
go func() {
if err := resp.Read(br); err != nil {
t.Errorf("error when reading response: %s", err)
}
if resp.StatusCode() != StatusOK {
t.Errorf("unexpected status code %d. Expecting %d", resp.StatusCode(), StatusOK)
}
if err := resp.Read(br); err != nil {
t.Errorf("error when reading response: %s", err)
}
if resp.StatusCode() != StatusOK {
t.Errorf("unexpected status code %d. Expecting %d", resp.StatusCode(), StatusOK)
}
close(respCh)
}()
select {
case <-respCh:
case <-time.After(time.Second):
t.Fatal("timeout")
}
if err := ln.Close(); err != nil {
t.Fatalf("error when closing listener: %s", err)
}
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("timeout when waiting for the server to stop")
}
}
func getChunkedTestEnv(t testing.TB) (*fasthttputil.InmemoryListener, []byte) {
body := createFixedBody(128 * 1024)
chunkedBody := createChunkedBody(body)
testHandler := func(ctx *RequestCtx) {
bodyBytes, err := ioutil.ReadAll(ctx.RequestBodyStream())
if err != nil {
t.Logf("ioutil read returned err=%s", err)
t.Error("unexpected error while reading request body stream")
}
if !bytes.Equal(body, bodyBytes) {
t.Errorf("unexpected request body, expected %q, got %q", body, bodyBytes)
}
}
s := &Server{
Handler: testHandler,
StreamRequestBody: true,
MaxRequestBodySize: 1, // easier to test with small limit
}
ln := fasthttputil.NewInmemoryListener()
go func() {
err := s.Serve(ln)
if err != nil {
t.Errorf("could not serve listener: %s", err)
}
}()
req := Request{}
req.SetHost("localhost")
req.Header.SetMethod("POST")
req.Header.Set("transfer-encoding", "chunked")
req.Header.SetContentLength(-1)
formattedRequest := req.Header.Header()
formattedRequest = append(formattedRequest, chunkedBody...)
return ln, formattedRequest
}
func TestRequestStream(t *testing.T) {
ln, formattedRequest := getChunkedTestEnv(t)
c, err := ln.Dial()
if err != nil {
t.Errorf("unexpected error while dialing: %s", err)
}
if _, err = c.Write(formattedRequest); err != nil {
t.Errorf("unexpected error while writing request: %s", err)
}
br := bufio.NewReader(c)
var respH ResponseHeader
if err = respH.Read(br); err != nil {
t.Errorf("unexpected error: %s", err)
}
}
func BenchmarkRequestStreamE2E(b *testing.B) {
ln, formattedRequest := getChunkedTestEnv(b)
wg := &sync.WaitGroup{}
wg.Add(4)
for i := 0; i < 4; i++ {
go func(wg *sync.WaitGroup) {
for i := 0; i < b.N/4; i++ {
c, err := ln.Dial()
if err != nil {
b.Errorf("unexpected error while dialing: %s", err)
}
if _, err = c.Write(formattedRequest); err != nil {
b.Errorf("unexpected error while writing request: %s", err)
}
br := bufio.NewReaderSize(c, 128)
var respH ResponseHeader
if err = respH.Read(br); err != nil {
b.Errorf("unexpected error: %s", err)
}
c.Close()
}
wg.Done()
}(wg)
}
wg.Wait()
}