Files
fasthttp/streaming.go
T
Roman Khimov 19fcd40863 Fix chunked streaming (#1015)
* http: refactor out crlf reading function

Make it a bit simpler and make it reusable.

* streaming: fix chunked stream test

This test is supposed to check for stream unchunking, but it's not really
effective with that because chunks created by createChunkedBody() get wrapped
into another chunk by writeBodyStream(), so we end up with chunkedBody in
request handler although what we really want is plain body.

Deduplicate test and benchmark also.

* streaming: fix Read interface

It wasn't actually compatible with io.Reader as io.Reader _never_ returns n >
len(p) while this function easily did that for chunked payloads confusing its
users:

panic: runtime error: slice bounds out of range [:528] with capacity 512

goroutine 562 [running]:
io.ReadAll(0x9f4380, 0xc0003be1a0, 0xc0004fcd80, 0x0, 0x0, 0xc00086bc30, 0x46f99b)
        /usr/lib64/go/1.16/src/io/io.go:634 +0x205
io/ioutil.ReadAll(...)
        /usr/lib64/go/1.16/src/io/ioutil/ioutil.go:27
github.com/valyala/fasthttp.getChunkedTestEnv.func1(0xc001fdc680)
        /home/rik/dev/fasthttp/streaming_test.go:108 +0x6c
github.com/valyala/fasthttp.(*Server).serveConn(0xc000416d80, 0xa034e8, 0xc0004da880, 0x0, 0x0)
        /home/rik/dev/fasthttp/server.go:2219 +0x12ee
github.com/valyala/fasthttp.(*workerPool).workerFunc(0xc000148960, 0xc0003be160)
        /home/rik/dev/fasthttp/workerpool.go:223 +0xba
github.com/valyala/fasthttp.(*workerPool).getCh.func1(0xc000148960, 0xc0003be160, 0x8b4ec0, 0xc0003be160)
        /home/rik/dev/fasthttp/workerpool.go:195 +0x35
created by github.com/valyala/fasthttp.(*workerPool).getCh
        /home/rik/dev/fasthttp/workerpool.go:194 +0x11f

It also returned len(p) in some cases where it read less than that.
2021-05-04 12:55:54 +02:00

110 lines
2.1 KiB
Go

package fasthttp
import (
"bufio"
"bytes"
"io"
"sync"
"github.com/valyala/bytebufferpool"
)
type requestStream struct {
prefetchedBytes *bytes.Reader
reader *bufio.Reader
totalBytesRead int
contentLength int
chunkLeft int
}
func (rs *requestStream) Read(p []byte) (int, error) {
var (
n int
err error
)
if rs.contentLength == -1 {
if rs.chunkLeft == 0 {
chunkSize, err := parseChunkSize(rs.reader)
if err != nil {
return 0, err
}
if chunkSize == 0 {
err = readCrLf(rs.reader)
if err == nil {
err = io.EOF
}
return 0, err
}
rs.chunkLeft = chunkSize
}
bytesToRead := len(p)
if rs.chunkLeft < len(p) {
bytesToRead = rs.chunkLeft
}
n, err = rs.reader.Read(p[:bytesToRead])
rs.totalBytesRead += n
rs.chunkLeft -= n
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
if err == nil && rs.chunkLeft == 0 {
err = readCrLf(rs.reader)
}
return n, err
}
if rs.totalBytesRead == rs.contentLength {
return 0, io.EOF
}
prefetchedSize := int(rs.prefetchedBytes.Size())
if prefetchedSize > rs.totalBytesRead {
left := prefetchedSize - rs.totalBytesRead
if len(p) > left {
p = p[:left]
}
n, err := rs.prefetchedBytes.Read(p)
rs.totalBytesRead += n
if n == rs.contentLength {
return n, io.EOF
}
return n, err
} else {
left := rs.contentLength - rs.totalBytesRead
if len(p) > left {
p = p[:left]
}
n, err = rs.reader.Read(p)
rs.totalBytesRead += n
if err != nil {
return n, err
}
}
if rs.totalBytesRead == rs.contentLength {
err = io.EOF
}
return n, err
}
func acquireRequestStream(b *bytebufferpool.ByteBuffer, r *bufio.Reader, contentLength int) *requestStream {
rs := requestStreamPool.Get().(*requestStream)
rs.prefetchedBytes = bytes.NewReader(b.B)
rs.reader = r
rs.contentLength = contentLength
return rs
}
func releaseRequestStream(rs *requestStream) {
rs.prefetchedBytes = nil
rs.totalBytesRead = 0
rs.chunkLeft = 0
rs.reader = nil
requestStreamPool.Put(rs)
}
var requestStreamPool = sync.Pool{
New: func() interface{} {
return &requestStream{}
},
}