mirror of
https://github.com/valyala/fasthttp.git
synced 2026-06-15 16:07:51 +03:00
An attempt to trigger sendfile path in FSHandler when sending big files
This commit is contained in:
+122
-19
@@ -1,6 +1,7 @@
|
||||
package fasthttp
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -72,7 +73,7 @@ type fsHandler struct {
|
||||
pendingFiles []*fsFile
|
||||
cacheLock sync.Mutex
|
||||
|
||||
fileReaderPool sync.Pool
|
||||
smallFileReaderPool sync.Pool
|
||||
}
|
||||
|
||||
type fsFile struct {
|
||||
@@ -84,6 +85,9 @@ type fsFile struct {
|
||||
|
||||
t time.Time
|
||||
readersCount int
|
||||
|
||||
bigFiles []*bigFileReader
|
||||
bigFilesLock sync.Mutex
|
||||
}
|
||||
|
||||
func (ff *fsFile) Reader(incrementReaders bool) io.Reader {
|
||||
@@ -93,53 +97,152 @@ func (ff *fsFile) Reader(incrementReaders bool) io.Reader {
|
||||
ff.h.cacheLock.Unlock()
|
||||
}
|
||||
|
||||
v := ff.h.fileReaderPool.Get()
|
||||
if ff.isBig() {
|
||||
return ff.bigFileReader()
|
||||
}
|
||||
return ff.smallFileReader()
|
||||
}
|
||||
|
||||
func (ff *fsFile) smallFileReader() io.Reader {
|
||||
v := ff.h.smallFileReaderPool.Get()
|
||||
if v == nil {
|
||||
r := &fsFileReader{
|
||||
r := &fsSmallFileReader{
|
||||
ff: ff,
|
||||
}
|
||||
r.v = r
|
||||
return r
|
||||
}
|
||||
r := v.(*fsFileReader)
|
||||
r := v.(*fsSmallFileReader)
|
||||
r.ff = ff
|
||||
if r.offset > 0 {
|
||||
panic("BUG: fsFileReader with non-nil offset found in the pool")
|
||||
panic("BUG: fsSmallFileReader with non-nil offset found in the pool")
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
const maxSmallFileSize = 4096
|
||||
|
||||
func (ff *fsFile) isBig() bool {
|
||||
return ff.contentLength > maxSmallFileSize && len(ff.dirIndex) == 0
|
||||
}
|
||||
|
||||
func (ff *fsFile) bigFileReader() io.Reader {
|
||||
if ff.f == nil {
|
||||
panic("BUG: ff.f must be non-nil in bigFileReader")
|
||||
}
|
||||
|
||||
return r
|
||||
var r io.Reader
|
||||
|
||||
ff.bigFilesLock.Lock()
|
||||
n := len(ff.bigFiles)
|
||||
if n > 0 {
|
||||
r = ff.bigFiles[n-1]
|
||||
ff.bigFiles = ff.bigFiles[:n-1]
|
||||
}
|
||||
ff.bigFilesLock.Unlock()
|
||||
|
||||
if r != nil {
|
||||
return r
|
||||
}
|
||||
|
||||
f, err := os.Open(ff.f.Name())
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("BUG: cannot open already opened file %s: %s", ff.f.Name(), err))
|
||||
}
|
||||
return &bigFileReader{
|
||||
f: f,
|
||||
ff: ff,
|
||||
}
|
||||
}
|
||||
|
||||
func (ff *fsFile) Release() {
|
||||
if ff.f != nil {
|
||||
ff.f.Close()
|
||||
|
||||
if ff.isBig() {
|
||||
ff.bigFilesLock.Lock()
|
||||
for _, r := range ff.bigFiles {
|
||||
r.f.Close()
|
||||
}
|
||||
ff.bigFilesLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type fsFileReader struct {
|
||||
ff *fsFile
|
||||
offset int64
|
||||
|
||||
v interface{}
|
||||
}
|
||||
|
||||
func (r *fsFileReader) Close() error {
|
||||
ff := r.ff
|
||||
|
||||
func (ff *fsFile) decReadersCount() {
|
||||
ff.h.cacheLock.Lock()
|
||||
ff.readersCount--
|
||||
if ff.readersCount < 0 {
|
||||
panic("BUG: negative fsFile.readersCount!")
|
||||
}
|
||||
ff.h.cacheLock.Unlock()
|
||||
}
|
||||
|
||||
// bigFileReader attempts to trigger sendfile
|
||||
// for sending big files over the wire.
|
||||
type bigFileReader struct {
|
||||
f *os.File
|
||||
ff *fsFile
|
||||
}
|
||||
|
||||
func (r *bigFileReader) Read(p []byte) (int, error) {
|
||||
return r.f.Read(p)
|
||||
}
|
||||
|
||||
func (r *bigFileReader) WriteTo(w io.Writer) (int64, error) {
|
||||
// fast path
|
||||
if rf, ok := w.(io.ReaderFrom); ok {
|
||||
// This is a hack for triggering sendfile path in bufio.Writer:
|
||||
// the buffer must be empty before calling ReadFrom.
|
||||
var n int
|
||||
if bw, ok := w.(*bufio.Writer); ok && bw.Buffered() > 0 {
|
||||
n = bw.Buffered()
|
||||
if err := bw.Flush(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
nn, err := rf.ReadFrom(r.f)
|
||||
return nn + int64(n), err
|
||||
}
|
||||
|
||||
// slow path
|
||||
return copyZeroAlloc(w, r.f)
|
||||
}
|
||||
|
||||
func (r *bigFileReader) Close() error {
|
||||
n, err := r.f.Seek(0, 0)
|
||||
if err == nil {
|
||||
if n != 0 {
|
||||
panic("BUG: File.Seek(0,0) returned (non-zero, nil)")
|
||||
}
|
||||
|
||||
ff := r.ff
|
||||
ff.bigFilesLock.Lock()
|
||||
ff.bigFiles = append(ff.bigFiles, r)
|
||||
ff.bigFilesLock.Unlock()
|
||||
} else {
|
||||
r.f.Close()
|
||||
}
|
||||
r.ff.decReadersCount()
|
||||
return err
|
||||
}
|
||||
|
||||
type fsSmallFileReader struct {
|
||||
ff *fsFile
|
||||
offset int64
|
||||
|
||||
v interface{}
|
||||
}
|
||||
|
||||
func (r *fsSmallFileReader) Close() error {
|
||||
r.ff.decReadersCount()
|
||||
r.ff = nil
|
||||
r.offset = 0
|
||||
ff.h.fileReaderPool.Put(r.v)
|
||||
r.ff.h.smallFileReaderPool.Put(r.v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *fsFileReader) Read(p []byte) (int, error) {
|
||||
func (r *fsSmallFileReader) Read(p []byte) (int, error) {
|
||||
ff := r.ff
|
||||
|
||||
if ff.f != nil {
|
||||
@@ -156,7 +259,7 @@ func (r *fsFileReader) Read(p []byte) (int, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (r *fsFileReader) WriteTo(w io.Writer) (int64, error) {
|
||||
func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) {
|
||||
if r.offset != 0 {
|
||||
panic("BUG: non-zero offset! Read() mustn't be called before WriteTo()")
|
||||
}
|
||||
|
||||
@@ -541,19 +541,21 @@ func writeBodyChunked(w *bufio.Writer, r io.Reader) error {
|
||||
}
|
||||
|
||||
func writeBodyFixedSize(w *bufio.Writer, r io.Reader, size int) error {
|
||||
vbuf := copyBufPool.Get()
|
||||
buf := vbuf.([]byte)
|
||||
|
||||
n, err := io.CopyBuffer(w, r, buf)
|
||||
|
||||
copyBufPool.Put(vbuf)
|
||||
|
||||
n, err := copyZeroAlloc(w, r)
|
||||
if n != int64(size) && err == nil {
|
||||
err = fmt.Errorf("read %d bytes from BodyStream instead of %d bytes", n, size)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func copyZeroAlloc(w io.Writer, r io.Reader) (int64, error) {
|
||||
vbuf := copyBufPool.Get()
|
||||
buf := vbuf.([]byte)
|
||||
n, err := io.CopyBuffer(w, r, buf)
|
||||
copyBufPool.Put(vbuf)
|
||||
return n, err
|
||||
}
|
||||
|
||||
var copyBufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 4096)
|
||||
|
||||
Reference in New Issue
Block a user