From 9b8a7de66be92dd259ffae0f7b8a22b31dab4db9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 29 Dec 2015 11:51:11 +0200 Subject: [PATCH] Added transparent compression option to FS --- examples/fileserver/fileserver.go | 15 +- fs.go | 346 +++++++++++++++++++++++++----- fs_test.go | 131 ++++++++++- 3 files changed, 423 insertions(+), 69 deletions(-) diff --git a/examples/fileserver/fileserver.go b/examples/fileserver/fileserver.go index 1636d83..e91615b 100644 --- a/examples/fileserver/fileserver.go +++ b/examples/fileserver/fileserver.go @@ -9,14 +9,23 @@ import ( ) var ( - addr = flag.String("addr", ":8080", "TCP address to listen to") - dir = flag.String("dir", "/usr/share/nginx/html", "Directory to serve static files from") + addr = flag.String("addr", ":8080", "TCP address to listen to") + compress = flag.Bool("compress", false, "Enables transparent response compression if set to true") + dir = flag.String("dir", "/usr/share/nginx/html", "Directory to serve static files from") + generateIndexPages = flag.Bool("generateIndexPages", true, "Whether to generate directory index pages") ) func main() { flag.Parse() - h := fasthttp.FSHandler(*dir, 0) + fs := &fasthttp.FS{ + Root: *dir, + IndexNames: []string{"index.html"}, + GenerateIndexPages: *generateIndexPages, + Compress: *compress, + } + h := fs.NewRequestHandler() + if err := fasthttp.ListenAndServe(*addr, h); err != nil { log.Fatalf("error in ListenAndServe: %s", err) } diff --git a/fs.go b/fs.go index acafde9..1c89b6a 100644 --- a/fs.go +++ b/fs.go @@ -8,6 +8,7 @@ import ( "io" "mime" "os" + "path/filepath" "sort" "strings" "sync" @@ -65,12 +66,35 @@ type FS struct { // Path to the root directory to serve files from. Root string - // Index pages for directories without index.html are automatically - // generated if set. + // List of index file names to try opening during directory access. + // + // For example: + // + // * index.html + // * index.htm + // * my-super-index.xml + // + // By default the list is empty. + IndexNames []string + + // Index pages for directories without files matching IndexNames + // are automatically generated if set. // // By default index pages aren't generated. GenerateIndexPages bool + // Transparently compresses responses if set to true. + // + // The server tries minimizing CPU usage by caching compressed files. + // It adds FSCompressedFileSuffix suffix to the original file name and + // tries saving the resulting compressed file under the new file name. + // So it is advisable to give the server write access to Root + // and to all inner folders in order to minimze CPU usage when serving + // compressed responses. + // + // Transparent compression is disabled by default. + Compress bool + // Path rewriting function. // // By default request path is not modified. @@ -84,9 +108,13 @@ type FS struct { started bool } +// FS adds this suffix to the original file names when trying to store +// compressed file under the new file name. See FS.Compress for details. +const FSCompressedFileSuffix = ".fasthttp.gz" + // FSHandlerCacheDuration is the duration for caching open file handles // by FSHandler. -const FSHandlerCacheDuration = 5 * time.Second +const FSHandlerCacheDuration = 10 * time.Second // FSHandler returns request handler serving static files from // the given root folder. @@ -113,6 +141,7 @@ const FSHandlerCacheDuration = 5 * time.Second func FSHandler(root string, stripSlashes int) RequestHandler { fs := &FS{ Root: root, + IndexNames: []string{"index.html"}, GenerateIndexPages: true, PathRewrite: NewPathSlashesStripper(stripSlashes), } @@ -151,23 +180,22 @@ func (fs *FS) NewRequestHandler() RequestHandler { cacheDuration = FSHandlerCacheDuration } - pathRewrite := fs.PathRewrite - if pathRewrite == nil { - pathRewrite = NewPathSlashesStripper(0) - } - h := &fsHandler{ root: root, - pathRewrite: pathRewrite, + indexNames: fs.IndexNames, + pathRewrite: fs.PathRewrite, generateIndexPages: fs.GenerateIndexPages, + compress: fs.Compress, cacheDuration: cacheDuration, cache: make(map[string]*fsFile), + compressedCache: make(map[string]*fsFile), } go func() { + var pendingFiles []*fsFile for { time.Sleep(cacheDuration / 2) - h.cleanCache() + pendingFiles = h.cleanCache(pendingFiles) } }() @@ -176,13 +204,16 @@ func (fs *FS) NewRequestHandler() RequestHandler { type fsHandler struct { root string + indexNames []string pathRewrite PathRewriteFunc generateIndexPages bool + compress bool cacheDuration time.Duration - cache map[string]*fsFile - pendingFiles []*fsFile - cacheLock sync.Mutex + cache map[string]*fsFile + compressedCache map[string]*fsFile + pendingFiles []*fsFile + cacheLock sync.Mutex smallFileReaderPool sync.Pool } @@ -193,6 +224,7 @@ type fsFile struct { dirIndex []byte contentType string contentLength int + compressed bool lastModified time.Time lastModifiedStr []byte @@ -400,42 +432,60 @@ func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) { return r.offset, err } -func (h *fsHandler) cleanCache() { - t := time.Now() +func (h *fsHandler) cleanCache(pendingFiles []*fsFile) []*fsFile { + var filesToRelease []*fsFile + h.cacheLock.Lock() // Close files which couldn't be closed before due to non-zero - // readers count. - var pendingFiles []*fsFile - for _, ff := range h.pendingFiles { + // readers count on the previous run. + var remainingFiles []*fsFile + for _, ff := range pendingFiles { if ff.readersCount > 0 { - pendingFiles = append(pendingFiles, ff) + remainingFiles = append(remainingFiles, ff) } else { - ff.Release() + filesToRelease = append(filesToRelease, ff) } } - h.pendingFiles = pendingFiles + pendingFiles = remainingFiles - // Close stale file handles. - for k, ff := range h.cache { - if t.Sub(ff.t) > h.cacheDuration { + pendingFiles, filesToRelease = cleanCacheNolock(h.cache, pendingFiles, filesToRelease, h.cacheDuration) + pendingFiles, filesToRelease = cleanCacheNolock(h.compressedCache, pendingFiles, filesToRelease, h.cacheDuration) + + h.cacheLock.Unlock() + + for _, ff := range filesToRelease { + ff.Release() + } + + return pendingFiles +} + +func cleanCacheNolock(cache map[string]*fsFile, pendingFiles, filesToRelease []*fsFile, cacheDuration time.Duration) ([]*fsFile, []*fsFile) { + t := time.Now() + for k, ff := range cache { + if t.Sub(ff.t) > cacheDuration { if ff.readersCount > 0 { // There are pending readers on stale file handle, // so we cannot close it. Put it into pendingFiles // so it will be closed later. - h.pendingFiles = append(h.pendingFiles, ff) + pendingFiles = append(pendingFiles, ff) } else { - ff.Release() + filesToRelease = append(filesToRelease, ff) } - delete(h.cache, k) + delete(cache, k) } } - - h.cacheLock.Unlock() + return pendingFiles, filesToRelease } func (h *fsHandler) handleRequest(ctx *RequestCtx) { - path := h.pathRewrite(ctx) + var path []byte + if h.pathRewrite != nil { + path = h.pathRewrite(ctx) + } else { + path = ctx.Path() + } path = stripTrailingSlashes(path) if n := bytes.IndexByte(path, 0); n >= 0 { @@ -444,8 +494,15 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { return } + mustCompress := false + fileCache := h.cache + if h.compress && ctx.Request.Header.HasAcceptEncodingBytes(strGzip) { + mustCompress = true + fileCache = h.compressedCache + } + h.cacheLock.Lock() - ff, ok := h.cache[string(path)] + ff, ok := fileCache[string(path)] if ok { ff.readersCount++ } @@ -455,17 +512,12 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { pathStr := string(path) filePath := h.root + pathStr var err error - ff, err = h.openFSFile(filePath) + ff, err = h.openFSFile(filePath, mustCompress) if err == errDirIndexRequired { - if !h.generateIndexPages { - ctx.Logger().Printf("An attempt to access directory without index page. Directory %q", filePath) - ctx.Error("Directory index is forbidden", StatusForbidden) - return - } - ff, err = h.createDirIndex(ctx.URI(), filePath) + ff, err = h.openIndexFile(ctx, filePath, mustCompress) if err != nil { - ctx.Logger().Printf("Cannot create index for directory %q: %s", filePath, err) - ctx.Error("Cannot create directory index", StatusNotFound) + ctx.Logger().Printf("cannot open dir index %q: %s", filePath, err) + ctx.Error("Directory index is forbidden", StatusForbidden) return } } else if err != nil { @@ -475,9 +527,9 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { } h.cacheLock.Lock() - ff1, ok := h.cache[pathStr] + ff1, ok := fileCache[pathStr] if !ok { - h.cache[pathStr] = ff + fileCache[pathStr] = ff ff.readersCount++ } else { ff1.readersCount++ @@ -506,14 +558,37 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { return } + if ff.compressed { + ctx.Response.Header.SetCanonical(strContentEncoding, strGzip) + } ctx.Response.Header.SetCanonical(strLastModified, ff.lastModifiedStr) ctx.SetBodyStream(r, ff.contentLength) ctx.SetContentType(ff.contentType) + ctx.SetStatusCode(StatusOK) +} + +func (h *fsHandler) openIndexFile(ctx *RequestCtx, dirPath string, mustCompress bool) (*fsFile, error) { + for _, indexName := range h.indexNames { + indexFilePath := dirPath + "/" + indexName + ff, err := h.openFSFile(indexFilePath, mustCompress) + if err == nil { + return ff, nil + } + if !os.IsNotExist(err) { + return nil, fmt.Errorf("cannot open file %q: %s", indexFilePath, err) + } + } + + if !h.generateIndexPages { + return nil, fmt.Errorf("cannot access directory without index page. Directory %q", dirPath) + } + + return h.createDirIndex(ctx.URI(), dirPath, mustCompress) } var errDirIndexRequired = errors.New("directory index required") -func (h *fsHandler) createDirIndex(base *URI, filePath string) (*fsFile, error) { +func (h *fsHandler) createDirIndex(base *URI, dirPath string, mustCompress bool) (*fsFile, error) { var buf bytes.Buffer w := &buf @@ -530,7 +605,7 @@ func (h *fsHandler) createDirIndex(base *URI, filePath string) (*fsFile, error) fmt.Fprintf(w, `
  • ..
  • `, parentPathEscaped) } - f, err := os.Open(filePath) + f, err := os.Open(dirPath) if err != nil { return nil, err } @@ -569,64 +644,198 @@ func (h *fsHandler) createDirIndex(base *URI, filePath string) (*fsFile, error) } fmt.Fprintf(w, "") - dirIndex := w.Bytes() + if mustCompress { + var zbuf bytes.Buffer + zw := acquireGzipWriter(&zbuf, CompressDefaultCompression) + _, err = io.Copy(zw, &buf) + releaseGzipWriter(zw) + if err != nil { + return nil, fmt.Errorf("error when compressing automatically generated index for directory %q: %s", dirPath, err) + } + w = &zbuf + } + + dirIndex := w.Bytes() lastModified := time.Now() ff := &fsFile{ h: h, dirIndex: dirIndex, contentType: "text/html; charset=utf-8", contentLength: len(dirIndex), + compressed: mustCompress, lastModified: lastModified, lastModifiedStr: AppendHTTPDate(nil, lastModified), + + t: lastModified, } return ff, nil } -func (h *fsHandler) openFSFile(filePath string) (*fsFile, error) { +const fsMinCompressRatio = 0.9 + +func (h *fsHandler) compressAndOpenFSFile(filePath string) (*fsFile, error) { f, err := os.Open(filePath) if err != nil { return nil, err } - fileStat, err := f.Stat() + fileInfo, err := f.Stat() if err != nil { f.Close() + return nil, fmt.Errorf("cannot obtain info for file %q: %s", filePath, err) + } + + if fileInfo.IsDir() { + f.Close() + return nil, errDirIndexRequired + } + + if !isFileCompressible(f, fsMinCompressRatio) { + return h.newFSFile(f, fileInfo, false) + } + + compressedFilePath := filePath + FSCompressedFileSuffix + absPath, err := filepath.Abs(compressedFilePath) + if err != nil { + f.Close() + return nil, fmt.Errorf("cannot determine absolute path for %q: %s", compressedFilePath, err) + } + + flock := getFileLock(absPath) + flock.Lock() + ff, err := h.compressFileNolock(f, fileInfo, filePath, compressedFilePath) + flock.Unlock() + + return ff, err +} + +func (h *fsHandler) compressFileNolock(f *os.File, fileInfo os.FileInfo, filePath, compressedFilePath string) (*fsFile, error) { + // Attempt to open compressed file created by another concurrent + // goroutine. + // It is safe opening such a file, since the file creation + // is guarded by file mutex - see getFileLock call. + if _, err := os.Stat(compressedFilePath); err == nil { + f.Close() + return h.newCompressedFSFile(compressedFilePath) + } + + // Create temporary file, so concurrent goroutines don't use + // it until it is created. + tmpFilePath := compressedFilePath + ".tmp" + zf, err := os.Create(tmpFilePath) + if err != nil { + if !os.IsPermission(err) { + f.Close() + return nil, fmt.Errorf("cannot create temporary file %q: %s", tmpFilePath, err) + } + + // No permission for compressed file creation. Just return uncompressed file + return h.newFSFile(f, fileInfo, false) + } + + zw := acquireGzipWriter(zf, CompressDefaultCompression) + _, err = io.Copy(zw, f) + if err1 := zw.Flush(); err == nil { + err = err1 + } + releaseGzipWriter(zw) + zf.Close() + f.Close() + if err != nil { + return nil, fmt.Errorf("error when compressing file %q to %q: %s", filePath, tmpFilePath, err) + } + if err = os.Chtimes(tmpFilePath, time.Now(), fileInfo.ModTime()); err != nil { + return nil, fmt.Errorf("cannot change modification time to %s for tmp file %q: %s", + fileInfo.ModTime(), tmpFilePath, err) + } + if err = os.Rename(tmpFilePath, compressedFilePath); err != nil { + return nil, fmt.Errorf("cannot move compressed file from %q to %q: %s", tmpFilePath, compressedFilePath, err) + } + return h.newCompressedFSFile(compressedFilePath) +} + +func (h *fsHandler) newCompressedFSFile(filePath string) (*fsFile, error) { + f, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("cannot open compressed file %q: %s", filePath, err) + } + fileInfo, err := f.Stat() + if err != nil { + f.Close() + return nil, fmt.Errorf("cannot obtain info for compressed file %q: %s", filePath, err) + } + return h.newFSFile(f, fileInfo, true) +} + +func (h *fsHandler) openFSFile(filePath string, mustCompress bool) (*fsFile, error) { + filePathOriginal := filePath + if mustCompress { + filePath += FSCompressedFileSuffix + } + + f, err := os.Open(filePath) + if err != nil { + if mustCompress && os.IsNotExist(err) { + return h.compressAndOpenFSFile(filePathOriginal) + } return nil, err } - if fileStat.IsDir() { + fileInfo, err := f.Stat() + if err != nil { f.Close() + return nil, fmt.Errorf("cannot obtain info for file %q: %s", filePath, err) + } - indexPath := filePath + "/index.html" - ff, err := h.openFSFile(indexPath) - if err == nil { - return ff, nil - } - if !os.IsNotExist(err) { - return nil, err + if fileInfo.IsDir() { + f.Close() + if mustCompress { + return nil, fmt.Errorf("directory with unexpected suffix found: %q. Suffix: %q", filePath, FSCompressedFileSuffix) } return nil, errDirIndexRequired } - n := fileStat.Size() + if mustCompress { + fileInfoOriginal, err := os.Stat(filePathOriginal) + if err != nil { + f.Close() + return nil, fmt.Errorf("cannot obtain info for original file %q: %s", filePathOriginal, err) + } + + if fileInfoOriginal.ModTime() != fileInfo.ModTime() { + // The compressed file became stale. Re-create it. + f.Close() + os.Remove(filePath) + return h.compressAndOpenFSFile(filePathOriginal) + } + } + + return h.newFSFile(f, fileInfo, mustCompress) +} + +func (h *fsHandler) newFSFile(f *os.File, fileInfo os.FileInfo, compressed bool) (*fsFile, error) { + n := fileInfo.Size() contentLength := int(n) if n != int64(contentLength) { f.Close() return nil, fmt.Errorf("too big file: %d bytes", n) } - ext := fileExtension(filePath) + ext := fileExtension(fileInfo.Name(), compressed) contentType := mime.TypeByExtension(ext) - lastModified := fileStat.ModTime() + lastModified := fileInfo.ModTime() ff := &fsFile{ h: h, f: f, contentType: contentType, contentLength: contentLength, + compressed: compressed, lastModified: lastModified, lastModifiedStr: AppendHTTPDate(nil, lastModified), + + t: time.Now(), } return ff, nil } @@ -654,7 +863,10 @@ func stripTrailingSlashes(path []byte) []byte { return path } -func fileExtension(path string) string { +func fileExtension(path string, compressed bool) string { + if compressed && strings.HasSuffix(path, FSCompressedFileSuffix) { + path = path[:len(path)-len(FSCompressedFileSuffix)] + } n := strings.LastIndexByte(path, '.') if n < 0 { return "" @@ -678,3 +890,19 @@ func fsLastModified(path string) (time.Time, error) { func fsModTime(t time.Time) time.Time { return t.In(gmtLocation).Truncate(time.Second) } + +var ( + filesLockMap = make(map[string]*sync.Mutex) + filesLockMapLock sync.Mutex +) + +func getFileLock(absPath string) *sync.Mutex { + filesLockMapLock.Lock() + flock := filesLockMap[absPath] + if flock == nil { + flock = &sync.Mutex{} + filesLockMap[absPath] = flock + } + filesLockMapLock.Unlock() + return flock +} diff --git a/fs_test.go b/fs_test.go index c757e35..c3712f8 100644 --- a/fs_test.go +++ b/fs_test.go @@ -1,7 +1,9 @@ package fasthttp import ( + "bufio" "bytes" + "fmt" "io/ioutil" "os" "sort" @@ -9,6 +11,115 @@ import ( "time" ) +func TestFSCompressConcurrent(t *testing.T) { + fs := &FS{ + Root: ".", + GenerateIndexPages: true, + Compress: true, + } + h := fs.NewRequestHandler() + + concurrency := 4 + ch := make(chan struct{}, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + for j := 0; j < 10; j++ { + testFSCompress(t, h, "/fs.go") + testFSCompress(t, h, "/") + testFSCompress(t, h, "/README.md") + } + ch <- struct{}{} + }() + } + + for i := 0; i < concurrency; i++ { + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("timeout") + } + } +} + +func TestFSCompressSingleThread(t *testing.T) { + fs := &FS{ + Root: ".", + GenerateIndexPages: true, + Compress: true, + } + h := fs.NewRequestHandler() + + testFSCompress(t, h, "/fs.go") + testFSCompress(t, h, "/") + testFSCompress(t, h, "/README.md") +} + +func testFSCompress(t *testing.T, h RequestHandler, filePath string) { + var ctx RequestCtx + ctx.Init(&Request{}, nil, nil) + + // request uncompressed file + ctx.Request.Reset() + ctx.Request.SetRequestURI(filePath) + h(&ctx) + + var resp Response + s := ctx.Response.String() + br := bufio.NewReader(bytes.NewBufferString(s)) + if err := resp.Read(br); err != nil { + t.Fatalf("unexpected error: %s. filePath=%q", err, filePath) + } + if resp.StatusCode() != StatusOK { + t.Fatalf("unexpected status code: %d. Expecting %d. filePath=%q", resp.StatusCode(), StatusOK, filePath) + } + ce := resp.Header.Peek("Content-Encoding") + if string(ce) != "" { + t.Fatalf("unexpected content-encoding %q. Expecting empty string. filePath=%q", ce, filePath) + } + body := string(resp.Body()) + + // request compressed file + ctx.Request.Reset() + ctx.Request.SetRequestURI(filePath) + ctx.Request.Header.Set("Accept-Encoding", "gzip") + h(&ctx) + s = ctx.Response.String() + br = bufio.NewReader(bytes.NewBufferString(s)) + if err := resp.Read(br); err != nil { + t.Fatalf("unexpected error: %s. filePath=%q", err, filePath) + } + if resp.StatusCode() != StatusOK { + t.Fatalf("unexpected status code: %d. Expecting %d. filePath=%q", resp.StatusCode(), StatusOK, filePath) + } + ce = resp.Header.Peek("Content-Encoding") + if string(ce) != "gzip" { + t.Fatalf("unexpected content-encoding %q. Expecting %q. filePath=%q", ce, "gzip", filePath) + } + zbody, err := resp.BodyGunzip() + if err != nil { + t.Fatalf("unexpected error when gunzipping response body: %s. filePath=%q", err, filePath) + } + if string(zbody) != body { + t.Fatalf("unexpected body %q. Expected %q. FilePath=%q", zbody, body, filePath) + } +} + +func TestFileLock(t *testing.T) { + for i := 0; i < 10; i++ { + filePath := fmt.Sprintf("foo/bar/%d.jpg", i) + lock := getFileLock(filePath) + lock.Lock() + lock.Unlock() + } + + for i := 0; i < 10; i++ { + filePath := fmt.Sprintf("foo/bar/%d.jpg", i) + lock := getFileLock(filePath) + lock.Lock() + lock.Unlock() + } +} + func TestFSHandlerSingleThread(t *testing.T) { requestHandler := FSHandler(".", 0) @@ -151,15 +262,21 @@ func testStripPathSlashes(t *testing.T, path string, stripSlashes int, expectedP } func TestFileExtension(t *testing.T) { - testFileExtension(t, "foo.bar", ".bar") - testFileExtension(t, "foobar", "") - testFileExtension(t, "foo.bar.baz", ".baz") - testFileExtension(t, "", "") - testFileExtension(t, "/a/b/c.d/efg.jpg", ".jpg") + testFileExtension(t, "foo.bar", false, ".bar") + testFileExtension(t, "foobar", false, "") + testFileExtension(t, "foo.bar.baz", false, ".baz") + testFileExtension(t, "", false, "") + testFileExtension(t, "/a/b/c.d/efg.jpg", false, ".jpg") + + testFileExtension(t, "foo.bar", true, ".bar") + testFileExtension(t, "foobar.fasthttp.gz", true, "") + testFileExtension(t, "foo.bar.baz.fasthttp.gz", true, ".baz") + testFileExtension(t, "", true, "") + testFileExtension(t, "/a/b/c.d/efg.jpg.fasthttp.gz", true, ".jpg") } -func testFileExtension(t *testing.T, path, expectedExt string) { - ext := fileExtension(path) +func testFileExtension(t *testing.T, path string, compressed bool, expectedExt string) { + ext := fileExtension(path, compressed) if ext != expectedExt { t.Fatalf("unexpected file extension for file %q: %q. Expecting %q", path, ext, expectedExt) }