diff --git a/fshandler.go b/fshandler.go index cc88571..cfbb2dd 100644 --- a/fshandler.go +++ b/fshandler.go @@ -1,6 +1,7 @@ package fasthttp import ( + "bufio" "bytes" "errors" "fmt" @@ -72,7 +73,7 @@ type fsHandler struct { pendingFiles []*fsFile cacheLock sync.Mutex - fileReaderPool sync.Pool + smallFileReaderPool sync.Pool } type fsFile struct { @@ -84,6 +85,9 @@ type fsFile struct { t time.Time readersCount int + + bigFiles []*bigFileReader + bigFilesLock sync.Mutex } func (ff *fsFile) Reader(incrementReaders bool) io.Reader { @@ -93,53 +97,152 @@ func (ff *fsFile) Reader(incrementReaders bool) io.Reader { ff.h.cacheLock.Unlock() } - v := ff.h.fileReaderPool.Get() + if ff.isBig() { + return ff.bigFileReader() + } + return ff.smallFileReader() +} + +func (ff *fsFile) smallFileReader() io.Reader { + v := ff.h.smallFileReaderPool.Get() if v == nil { - r := &fsFileReader{ + r := &fsSmallFileReader{ ff: ff, } r.v = r return r } - r := v.(*fsFileReader) + r := v.(*fsSmallFileReader) r.ff = ff if r.offset > 0 { - panic("BUG: fsFileReader with non-nil offset found in the pool") + panic("BUG: fsSmallFileReader with non-nil offset found in the pool") + } + return r +} + +const maxSmallFileSize = 4096 + +func (ff *fsFile) isBig() bool { + return ff.contentLength > maxSmallFileSize && len(ff.dirIndex) == 0 +} + +func (ff *fsFile) bigFileReader() io.Reader { + if ff.f == nil { + panic("BUG: ff.f must be non-nil in bigFileReader") } - return r + var r io.Reader + + ff.bigFilesLock.Lock() + n := len(ff.bigFiles) + if n > 0 { + r = ff.bigFiles[n-1] + ff.bigFiles = ff.bigFiles[:n-1] + } + ff.bigFilesLock.Unlock() + + if r != nil { + return r + } + + f, err := os.Open(ff.f.Name()) + if err != nil { + panic(fmt.Sprintf("BUG: cannot open already opened file %s: %s", ff.f.Name(), err)) + } + return &bigFileReader{ + f: f, + ff: ff, + } } func (ff *fsFile) Release() { if ff.f != nil { ff.f.Close() + + if ff.isBig() { + ff.bigFilesLock.Lock() + for _, r := range ff.bigFiles { + r.f.Close() + } + ff.bigFilesLock.Unlock() + } } } -type fsFileReader struct { - ff *fsFile - offset int64 - - v interface{} -} - -func (r *fsFileReader) Close() error { - ff := r.ff - +func (ff *fsFile) decReadersCount() { ff.h.cacheLock.Lock() ff.readersCount-- if ff.readersCount < 0 { panic("BUG: negative fsFile.readersCount!") } ff.h.cacheLock.Unlock() +} +// bigFileReader attempts to trigger sendfile +// for sending big files over the wire. +type bigFileReader struct { + f *os.File + ff *fsFile +} + +func (r *bigFileReader) Read(p []byte) (int, error) { + return r.f.Read(p) +} + +func (r *bigFileReader) WriteTo(w io.Writer) (int64, error) { + // fast path + if rf, ok := w.(io.ReaderFrom); ok { + // This is a hack for triggering sendfile path in bufio.Writer: + // the buffer must be empty before calling ReadFrom. + var n int + if bw, ok := w.(*bufio.Writer); ok && bw.Buffered() > 0 { + n = bw.Buffered() + if err := bw.Flush(); err != nil { + return 0, err + } + } + nn, err := rf.ReadFrom(r.f) + return nn + int64(n), err + } + + // slow path + return copyZeroAlloc(w, r.f) +} + +func (r *bigFileReader) Close() error { + n, err := r.f.Seek(0, 0) + if err == nil { + if n != 0 { + panic("BUG: File.Seek(0,0) returned (non-zero, nil)") + } + + ff := r.ff + ff.bigFilesLock.Lock() + ff.bigFiles = append(ff.bigFiles, r) + ff.bigFilesLock.Unlock() + } else { + r.f.Close() + } + r.ff.decReadersCount() + return err +} + +type fsSmallFileReader struct { + ff *fsFile + offset int64 + + v interface{} +} + +func (r *fsSmallFileReader) Close() error { + r.ff.decReadersCount() r.ff = nil r.offset = 0 - ff.h.fileReaderPool.Put(r.v) + r.ff.h.smallFileReaderPool.Put(r.v) return nil } -func (r *fsFileReader) Read(p []byte) (int, error) { +func (r *fsSmallFileReader) Read(p []byte) (int, error) { ff := r.ff if ff.f != nil { @@ -156,7 +259,7 @@ func (r *fsFileReader) Read(p []byte) (int, error) { return n, nil } -func (r *fsFileReader) WriteTo(w io.Writer) (int64, error) { +func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) { if r.offset != 0 { panic("BUG: non-zero offset! Read() mustn't be called before WriteTo()") } diff --git a/http.go b/http.go index d2404f5..24faff8 100644 --- a/http.go +++ b/http.go @@ -541,19 +541,21 @@ func writeBodyChunked(w *bufio.Writer, r io.Reader) error { } func writeBodyFixedSize(w *bufio.Writer, r io.Reader, size int) error { - vbuf := copyBufPool.Get() - buf := vbuf.([]byte) - - n, err := io.CopyBuffer(w, r, buf) - - copyBufPool.Put(vbuf) - + n, err := copyZeroAlloc(w, r) if n != int64(size) && err == nil { err = fmt.Errorf("read %d bytes from BodyStream instead of %d bytes", n, size) } return err } +func copyZeroAlloc(w io.Writer, r io.Reader) (int64, error) { + vbuf := copyBufPool.Get() + buf := vbuf.([]byte) + n, err := io.CopyBuffer(w, r, buf) + copyBufPool.Put(vbuf) + return n, err +} + var copyBufPool = sync.Pool{ New: func() interface{} { return make([]byte, 4096)