Added transparent compression option to FS

This commit is contained in:
Aliaksandr Valialkin
2015-12-29 11:51:11 +02:00
parent c5b85a6124
commit 9b8a7de66b
3 changed files with 423 additions and 69 deletions
+12 -3
View File
@@ -9,14 +9,23 @@ import (
)
var (
addr = flag.String("addr", ":8080", "TCP address to listen to")
dir = flag.String("dir", "/usr/share/nginx/html", "Directory to serve static files from")
addr = flag.String("addr", ":8080", "TCP address to listen to")
compress = flag.Bool("compress", false, "Enables transparent response compression if set to true")
dir = flag.String("dir", "/usr/share/nginx/html", "Directory to serve static files from")
generateIndexPages = flag.Bool("generateIndexPages", true, "Whether to generate directory index pages")
)
func main() {
flag.Parse()
h := fasthttp.FSHandler(*dir, 0)
fs := &fasthttp.FS{
Root: *dir,
IndexNames: []string{"index.html"},
GenerateIndexPages: *generateIndexPages,
Compress: *compress,
}
h := fs.NewRequestHandler()
if err := fasthttp.ListenAndServe(*addr, h); err != nil {
log.Fatalf("error in ListenAndServe: %s", err)
}
+287 -59
View File
@@ -8,6 +8,7 @@ import (
"io"
"mime"
"os"
"path/filepath"
"sort"
"strings"
"sync"
@@ -65,12 +66,35 @@ type FS struct {
// Path to the root directory to serve files from.
Root string
// Index pages for directories without index.html are automatically
// generated if set.
// List of index file names to try opening during directory access.
//
// For example:
//
// * index.html
// * index.htm
// * my-super-index.xml
//
// By default the list is empty.
IndexNames []string
// Index pages for directories without files matching IndexNames
// are automatically generated if set.
//
// By default index pages aren't generated.
GenerateIndexPages bool
// Transparently compresses responses if set to true.
//
// The server tries minimizing CPU usage by caching compressed files.
// It adds FSCompressedFileSuffix suffix to the original file name and
// tries saving the resulting compressed file under the new file name.
// So it is advisable to give the server write access to Root
// and to all inner folders in order to minimze CPU usage when serving
// compressed responses.
//
// Transparent compression is disabled by default.
Compress bool
// Path rewriting function.
//
// By default request path is not modified.
@@ -84,9 +108,13 @@ type FS struct {
started bool
}
// FS adds this suffix to the original file names when trying to store
// compressed file under the new file name. See FS.Compress for details.
const FSCompressedFileSuffix = ".fasthttp.gz"
// FSHandlerCacheDuration is the duration for caching open file handles
// by FSHandler.
const FSHandlerCacheDuration = 5 * time.Second
const FSHandlerCacheDuration = 10 * time.Second
// FSHandler returns request handler serving static files from
// the given root folder.
@@ -113,6 +141,7 @@ const FSHandlerCacheDuration = 5 * time.Second
func FSHandler(root string, stripSlashes int) RequestHandler {
fs := &FS{
Root: root,
IndexNames: []string{"index.html"},
GenerateIndexPages: true,
PathRewrite: NewPathSlashesStripper(stripSlashes),
}
@@ -151,23 +180,22 @@ func (fs *FS) NewRequestHandler() RequestHandler {
cacheDuration = FSHandlerCacheDuration
}
pathRewrite := fs.PathRewrite
if pathRewrite == nil {
pathRewrite = NewPathSlashesStripper(0)
}
h := &fsHandler{
root: root,
pathRewrite: pathRewrite,
indexNames: fs.IndexNames,
pathRewrite: fs.PathRewrite,
generateIndexPages: fs.GenerateIndexPages,
compress: fs.Compress,
cacheDuration: cacheDuration,
cache: make(map[string]*fsFile),
compressedCache: make(map[string]*fsFile),
}
go func() {
var pendingFiles []*fsFile
for {
time.Sleep(cacheDuration / 2)
h.cleanCache()
pendingFiles = h.cleanCache(pendingFiles)
}
}()
@@ -176,13 +204,16 @@ func (fs *FS) NewRequestHandler() RequestHandler {
type fsHandler struct {
root string
indexNames []string
pathRewrite PathRewriteFunc
generateIndexPages bool
compress bool
cacheDuration time.Duration
cache map[string]*fsFile
pendingFiles []*fsFile
cacheLock sync.Mutex
cache map[string]*fsFile
compressedCache map[string]*fsFile
pendingFiles []*fsFile
cacheLock sync.Mutex
smallFileReaderPool sync.Pool
}
@@ -193,6 +224,7 @@ type fsFile struct {
dirIndex []byte
contentType string
contentLength int
compressed bool
lastModified time.Time
lastModifiedStr []byte
@@ -400,42 +432,60 @@ func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) {
return r.offset, err
}
func (h *fsHandler) cleanCache() {
t := time.Now()
func (h *fsHandler) cleanCache(pendingFiles []*fsFile) []*fsFile {
var filesToRelease []*fsFile
h.cacheLock.Lock()
// Close files which couldn't be closed before due to non-zero
// readers count.
var pendingFiles []*fsFile
for _, ff := range h.pendingFiles {
// readers count on the previous run.
var remainingFiles []*fsFile
for _, ff := range pendingFiles {
if ff.readersCount > 0 {
pendingFiles = append(pendingFiles, ff)
remainingFiles = append(remainingFiles, ff)
} else {
ff.Release()
filesToRelease = append(filesToRelease, ff)
}
}
h.pendingFiles = pendingFiles
pendingFiles = remainingFiles
// Close stale file handles.
for k, ff := range h.cache {
if t.Sub(ff.t) > h.cacheDuration {
pendingFiles, filesToRelease = cleanCacheNolock(h.cache, pendingFiles, filesToRelease, h.cacheDuration)
pendingFiles, filesToRelease = cleanCacheNolock(h.compressedCache, pendingFiles, filesToRelease, h.cacheDuration)
h.cacheLock.Unlock()
for _, ff := range filesToRelease {
ff.Release()
}
return pendingFiles
}
func cleanCacheNolock(cache map[string]*fsFile, pendingFiles, filesToRelease []*fsFile, cacheDuration time.Duration) ([]*fsFile, []*fsFile) {
t := time.Now()
for k, ff := range cache {
if t.Sub(ff.t) > cacheDuration {
if ff.readersCount > 0 {
// There are pending readers on stale file handle,
// so we cannot close it. Put it into pendingFiles
// so it will be closed later.
h.pendingFiles = append(h.pendingFiles, ff)
pendingFiles = append(pendingFiles, ff)
} else {
ff.Release()
filesToRelease = append(filesToRelease, ff)
}
delete(h.cache, k)
delete(cache, k)
}
}
h.cacheLock.Unlock()
return pendingFiles, filesToRelease
}
func (h *fsHandler) handleRequest(ctx *RequestCtx) {
path := h.pathRewrite(ctx)
var path []byte
if h.pathRewrite != nil {
path = h.pathRewrite(ctx)
} else {
path = ctx.Path()
}
path = stripTrailingSlashes(path)
if n := bytes.IndexByte(path, 0); n >= 0 {
@@ -444,8 +494,15 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
return
}
mustCompress := false
fileCache := h.cache
if h.compress && ctx.Request.Header.HasAcceptEncodingBytes(strGzip) {
mustCompress = true
fileCache = h.compressedCache
}
h.cacheLock.Lock()
ff, ok := h.cache[string(path)]
ff, ok := fileCache[string(path)]
if ok {
ff.readersCount++
}
@@ -455,17 +512,12 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
pathStr := string(path)
filePath := h.root + pathStr
var err error
ff, err = h.openFSFile(filePath)
ff, err = h.openFSFile(filePath, mustCompress)
if err == errDirIndexRequired {
if !h.generateIndexPages {
ctx.Logger().Printf("An attempt to access directory without index page. Directory %q", filePath)
ctx.Error("Directory index is forbidden", StatusForbidden)
return
}
ff, err = h.createDirIndex(ctx.URI(), filePath)
ff, err = h.openIndexFile(ctx, filePath, mustCompress)
if err != nil {
ctx.Logger().Printf("Cannot create index for directory %q: %s", filePath, err)
ctx.Error("Cannot create directory index", StatusNotFound)
ctx.Logger().Printf("cannot open dir index %q: %s", filePath, err)
ctx.Error("Directory index is forbidden", StatusForbidden)
return
}
} else if err != nil {
@@ -475,9 +527,9 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
}
h.cacheLock.Lock()
ff1, ok := h.cache[pathStr]
ff1, ok := fileCache[pathStr]
if !ok {
h.cache[pathStr] = ff
fileCache[pathStr] = ff
ff.readersCount++
} else {
ff1.readersCount++
@@ -506,14 +558,37 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
return
}
if ff.compressed {
ctx.Response.Header.SetCanonical(strContentEncoding, strGzip)
}
ctx.Response.Header.SetCanonical(strLastModified, ff.lastModifiedStr)
ctx.SetBodyStream(r, ff.contentLength)
ctx.SetContentType(ff.contentType)
ctx.SetStatusCode(StatusOK)
}
func (h *fsHandler) openIndexFile(ctx *RequestCtx, dirPath string, mustCompress bool) (*fsFile, error) {
for _, indexName := range h.indexNames {
indexFilePath := dirPath + "/" + indexName
ff, err := h.openFSFile(indexFilePath, mustCompress)
if err == nil {
return ff, nil
}
if !os.IsNotExist(err) {
return nil, fmt.Errorf("cannot open file %q: %s", indexFilePath, err)
}
}
if !h.generateIndexPages {
return nil, fmt.Errorf("cannot access directory without index page. Directory %q", dirPath)
}
return h.createDirIndex(ctx.URI(), dirPath, mustCompress)
}
var errDirIndexRequired = errors.New("directory index required")
func (h *fsHandler) createDirIndex(base *URI, filePath string) (*fsFile, error) {
func (h *fsHandler) createDirIndex(base *URI, dirPath string, mustCompress bool) (*fsFile, error) {
var buf bytes.Buffer
w := &buf
@@ -530,7 +605,7 @@ func (h *fsHandler) createDirIndex(base *URI, filePath string) (*fsFile, error)
fmt.Fprintf(w, `<li><a href="%s" class="dir">..</a></li>`, parentPathEscaped)
}
f, err := os.Open(filePath)
f, err := os.Open(dirPath)
if err != nil {
return nil, err
}
@@ -569,64 +644,198 @@ func (h *fsHandler) createDirIndex(base *URI, filePath string) (*fsFile, error)
}
fmt.Fprintf(w, "</ul></body></html>")
dirIndex := w.Bytes()
if mustCompress {
var zbuf bytes.Buffer
zw := acquireGzipWriter(&zbuf, CompressDefaultCompression)
_, err = io.Copy(zw, &buf)
releaseGzipWriter(zw)
if err != nil {
return nil, fmt.Errorf("error when compressing automatically generated index for directory %q: %s", dirPath, err)
}
w = &zbuf
}
dirIndex := w.Bytes()
lastModified := time.Now()
ff := &fsFile{
h: h,
dirIndex: dirIndex,
contentType: "text/html; charset=utf-8",
contentLength: len(dirIndex),
compressed: mustCompress,
lastModified: lastModified,
lastModifiedStr: AppendHTTPDate(nil, lastModified),
t: lastModified,
}
return ff, nil
}
func (h *fsHandler) openFSFile(filePath string) (*fsFile, error) {
const fsMinCompressRatio = 0.9
func (h *fsHandler) compressAndOpenFSFile(filePath string) (*fsFile, error) {
f, err := os.Open(filePath)
if err != nil {
return nil, err
}
fileStat, err := f.Stat()
fileInfo, err := f.Stat()
if err != nil {
f.Close()
return nil, fmt.Errorf("cannot obtain info for file %q: %s", filePath, err)
}
if fileInfo.IsDir() {
f.Close()
return nil, errDirIndexRequired
}
if !isFileCompressible(f, fsMinCompressRatio) {
return h.newFSFile(f, fileInfo, false)
}
compressedFilePath := filePath + FSCompressedFileSuffix
absPath, err := filepath.Abs(compressedFilePath)
if err != nil {
f.Close()
return nil, fmt.Errorf("cannot determine absolute path for %q: %s", compressedFilePath, err)
}
flock := getFileLock(absPath)
flock.Lock()
ff, err := h.compressFileNolock(f, fileInfo, filePath, compressedFilePath)
flock.Unlock()
return ff, err
}
func (h *fsHandler) compressFileNolock(f *os.File, fileInfo os.FileInfo, filePath, compressedFilePath string) (*fsFile, error) {
// Attempt to open compressed file created by another concurrent
// goroutine.
// It is safe opening such a file, since the file creation
// is guarded by file mutex - see getFileLock call.
if _, err := os.Stat(compressedFilePath); err == nil {
f.Close()
return h.newCompressedFSFile(compressedFilePath)
}
// Create temporary file, so concurrent goroutines don't use
// it until it is created.
tmpFilePath := compressedFilePath + ".tmp"
zf, err := os.Create(tmpFilePath)
if err != nil {
if !os.IsPermission(err) {
f.Close()
return nil, fmt.Errorf("cannot create temporary file %q: %s", tmpFilePath, err)
}
// No permission for compressed file creation. Just return uncompressed file
return h.newFSFile(f, fileInfo, false)
}
zw := acquireGzipWriter(zf, CompressDefaultCompression)
_, err = io.Copy(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseGzipWriter(zw)
zf.Close()
f.Close()
if err != nil {
return nil, fmt.Errorf("error when compressing file %q to %q: %s", filePath, tmpFilePath, err)
}
if err = os.Chtimes(tmpFilePath, time.Now(), fileInfo.ModTime()); err != nil {
return nil, fmt.Errorf("cannot change modification time to %s for tmp file %q: %s",
fileInfo.ModTime(), tmpFilePath, err)
}
if err = os.Rename(tmpFilePath, compressedFilePath); err != nil {
return nil, fmt.Errorf("cannot move compressed file from %q to %q: %s", tmpFilePath, compressedFilePath, err)
}
return h.newCompressedFSFile(compressedFilePath)
}
func (h *fsHandler) newCompressedFSFile(filePath string) (*fsFile, error) {
f, err := os.Open(filePath)
if err != nil {
return nil, fmt.Errorf("cannot open compressed file %q: %s", filePath, err)
}
fileInfo, err := f.Stat()
if err != nil {
f.Close()
return nil, fmt.Errorf("cannot obtain info for compressed file %q: %s", filePath, err)
}
return h.newFSFile(f, fileInfo, true)
}
func (h *fsHandler) openFSFile(filePath string, mustCompress bool) (*fsFile, error) {
filePathOriginal := filePath
if mustCompress {
filePath += FSCompressedFileSuffix
}
f, err := os.Open(filePath)
if err != nil {
if mustCompress && os.IsNotExist(err) {
return h.compressAndOpenFSFile(filePathOriginal)
}
return nil, err
}
if fileStat.IsDir() {
fileInfo, err := f.Stat()
if err != nil {
f.Close()
return nil, fmt.Errorf("cannot obtain info for file %q: %s", filePath, err)
}
indexPath := filePath + "/index.html"
ff, err := h.openFSFile(indexPath)
if err == nil {
return ff, nil
}
if !os.IsNotExist(err) {
return nil, err
if fileInfo.IsDir() {
f.Close()
if mustCompress {
return nil, fmt.Errorf("directory with unexpected suffix found: %q. Suffix: %q", filePath, FSCompressedFileSuffix)
}
return nil, errDirIndexRequired
}
n := fileStat.Size()
if mustCompress {
fileInfoOriginal, err := os.Stat(filePathOriginal)
if err != nil {
f.Close()
return nil, fmt.Errorf("cannot obtain info for original file %q: %s", filePathOriginal, err)
}
if fileInfoOriginal.ModTime() != fileInfo.ModTime() {
// The compressed file became stale. Re-create it.
f.Close()
os.Remove(filePath)
return h.compressAndOpenFSFile(filePathOriginal)
}
}
return h.newFSFile(f, fileInfo, mustCompress)
}
func (h *fsHandler) newFSFile(f *os.File, fileInfo os.FileInfo, compressed bool) (*fsFile, error) {
n := fileInfo.Size()
contentLength := int(n)
if n != int64(contentLength) {
f.Close()
return nil, fmt.Errorf("too big file: %d bytes", n)
}
ext := fileExtension(filePath)
ext := fileExtension(fileInfo.Name(), compressed)
contentType := mime.TypeByExtension(ext)
lastModified := fileStat.ModTime()
lastModified := fileInfo.ModTime()
ff := &fsFile{
h: h,
f: f,
contentType: contentType,
contentLength: contentLength,
compressed: compressed,
lastModified: lastModified,
lastModifiedStr: AppendHTTPDate(nil, lastModified),
t: time.Now(),
}
return ff, nil
}
@@ -654,7 +863,10 @@ func stripTrailingSlashes(path []byte) []byte {
return path
}
func fileExtension(path string) string {
func fileExtension(path string, compressed bool) string {
if compressed && strings.HasSuffix(path, FSCompressedFileSuffix) {
path = path[:len(path)-len(FSCompressedFileSuffix)]
}
n := strings.LastIndexByte(path, '.')
if n < 0 {
return ""
@@ -678,3 +890,19 @@ func fsLastModified(path string) (time.Time, error) {
func fsModTime(t time.Time) time.Time {
return t.In(gmtLocation).Truncate(time.Second)
}
var (
filesLockMap = make(map[string]*sync.Mutex)
filesLockMapLock sync.Mutex
)
func getFileLock(absPath string) *sync.Mutex {
filesLockMapLock.Lock()
flock := filesLockMap[absPath]
if flock == nil {
flock = &sync.Mutex{}
filesLockMap[absPath] = flock
}
filesLockMapLock.Unlock()
return flock
}
+124 -7
View File
@@ -1,7 +1,9 @@
package fasthttp
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"os"
"sort"
@@ -9,6 +11,115 @@ import (
"time"
)
func TestFSCompressConcurrent(t *testing.T) {
fs := &FS{
Root: ".",
GenerateIndexPages: true,
Compress: true,
}
h := fs.NewRequestHandler()
concurrency := 4
ch := make(chan struct{}, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for j := 0; j < 10; j++ {
testFSCompress(t, h, "/fs.go")
testFSCompress(t, h, "/")
testFSCompress(t, h, "/README.md")
}
ch <- struct{}{}
}()
}
for i := 0; i < concurrency; i++ {
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("timeout")
}
}
}
func TestFSCompressSingleThread(t *testing.T) {
fs := &FS{
Root: ".",
GenerateIndexPages: true,
Compress: true,
}
h := fs.NewRequestHandler()
testFSCompress(t, h, "/fs.go")
testFSCompress(t, h, "/")
testFSCompress(t, h, "/README.md")
}
func testFSCompress(t *testing.T, h RequestHandler, filePath string) {
var ctx RequestCtx
ctx.Init(&Request{}, nil, nil)
// request uncompressed file
ctx.Request.Reset()
ctx.Request.SetRequestURI(filePath)
h(&ctx)
var resp Response
s := ctx.Response.String()
br := bufio.NewReader(bytes.NewBufferString(s))
if err := resp.Read(br); err != nil {
t.Fatalf("unexpected error: %s. filePath=%q", err, filePath)
}
if resp.StatusCode() != StatusOK {
t.Fatalf("unexpected status code: %d. Expecting %d. filePath=%q", resp.StatusCode(), StatusOK, filePath)
}
ce := resp.Header.Peek("Content-Encoding")
if string(ce) != "" {
t.Fatalf("unexpected content-encoding %q. Expecting empty string. filePath=%q", ce, filePath)
}
body := string(resp.Body())
// request compressed file
ctx.Request.Reset()
ctx.Request.SetRequestURI(filePath)
ctx.Request.Header.Set("Accept-Encoding", "gzip")
h(&ctx)
s = ctx.Response.String()
br = bufio.NewReader(bytes.NewBufferString(s))
if err := resp.Read(br); err != nil {
t.Fatalf("unexpected error: %s. filePath=%q", err, filePath)
}
if resp.StatusCode() != StatusOK {
t.Fatalf("unexpected status code: %d. Expecting %d. filePath=%q", resp.StatusCode(), StatusOK, filePath)
}
ce = resp.Header.Peek("Content-Encoding")
if string(ce) != "gzip" {
t.Fatalf("unexpected content-encoding %q. Expecting %q. filePath=%q", ce, "gzip", filePath)
}
zbody, err := resp.BodyGunzip()
if err != nil {
t.Fatalf("unexpected error when gunzipping response body: %s. filePath=%q", err, filePath)
}
if string(zbody) != body {
t.Fatalf("unexpected body %q. Expected %q. FilePath=%q", zbody, body, filePath)
}
}
func TestFileLock(t *testing.T) {
for i := 0; i < 10; i++ {
filePath := fmt.Sprintf("foo/bar/%d.jpg", i)
lock := getFileLock(filePath)
lock.Lock()
lock.Unlock()
}
for i := 0; i < 10; i++ {
filePath := fmt.Sprintf("foo/bar/%d.jpg", i)
lock := getFileLock(filePath)
lock.Lock()
lock.Unlock()
}
}
func TestFSHandlerSingleThread(t *testing.T) {
requestHandler := FSHandler(".", 0)
@@ -151,15 +262,21 @@ func testStripPathSlashes(t *testing.T, path string, stripSlashes int, expectedP
}
func TestFileExtension(t *testing.T) {
testFileExtension(t, "foo.bar", ".bar")
testFileExtension(t, "foobar", "")
testFileExtension(t, "foo.bar.baz", ".baz")
testFileExtension(t, "", "")
testFileExtension(t, "/a/b/c.d/efg.jpg", ".jpg")
testFileExtension(t, "foo.bar", false, ".bar")
testFileExtension(t, "foobar", false, "")
testFileExtension(t, "foo.bar.baz", false, ".baz")
testFileExtension(t, "", false, "")
testFileExtension(t, "/a/b/c.d/efg.jpg", false, ".jpg")
testFileExtension(t, "foo.bar", true, ".bar")
testFileExtension(t, "foobar.fasthttp.gz", true, "")
testFileExtension(t, "foo.bar.baz.fasthttp.gz", true, ".baz")
testFileExtension(t, "", true, "")
testFileExtension(t, "/a/b/c.d/efg.jpg.fasthttp.gz", true, ".jpg")
}
func testFileExtension(t *testing.T, path, expectedExt string) {
ext := fileExtension(path)
func testFileExtension(t *testing.T, path string, compressed bool, expectedExt string) {
ext := fileExtension(path, compressed)
if ext != expectedExt {
t.Fatalf("unexpected file extension for file %q: %q. Expecting %q", path, ext, expectedExt)
}