Skip fs cache based on config (#1644)

* add cache manager struce

* refactor cache by adding interface

* generalize ctor

* implement feature add unit tests

* fix code

* rename fs field as filesystem
This commit is contained in:
Tiago Peczenyj
2023-11-05 19:31:04 +01:00
committed by GitHub
parent 3ead307ea9
commit dfce853067
2 changed files with 298 additions and 102 deletions
+193 -94
View File
@@ -326,6 +326,11 @@ type FS struct {
// "Cannot open requested path"
PathNotFound RequestHandler
// SkipCache if true, will cache no file handler.
//
// By default is false.
SkipCache bool
// Expiration duration for inactive file handlers.
//
// FSHandlerCacheDuration is used by default.
@@ -453,11 +458,6 @@ func (fs *FS) initRequestHandler() {
compressRoot = fs.normalizeRoot(compressRoot)
}
cacheDuration := fs.CacheDuration
if cacheDuration <= 0 {
cacheDuration = FSHandlerCacheDuration
}
compressedFileSuffixes := fs.CompressedFileSuffixes
if len(compressedFileSuffixes["br"]) == 0 || len(compressedFileSuffixes["gzip"]) == 0 ||
compressedFileSuffixes["br"] == compressedFileSuffixes["gzip"] {
@@ -474,7 +474,7 @@ func (fs *FS) initRequestHandler() {
}
h := &fsHandler{
fs: fs.FS,
filesystem: fs.FS,
root: root,
indexNames: fs.IndexNames,
pathRewrite: fs.PathRewrite,
@@ -484,50 +484,20 @@ func (fs *FS) initRequestHandler() {
compressRoot: compressRoot,
pathNotFound: fs.PathNotFound,
acceptByteRange: fs.AcceptByteRange,
cacheDuration: cacheDuration,
compressedFileSuffixes: compressedFileSuffixes,
cache: make(map[string]*fsFile),
cacheBrotli: make(map[string]*fsFile),
cacheGzip: make(map[string]*fsFile),
}
if h.fs == nil {
h.fs = &osFS{} // It provides os.Open and os.Stat
h.cacheManager = newCacheManager(fs)
if h.filesystem == nil {
h.filesystem = &osFS{} // It provides os.Open and os.Stat
}
go func() {
var pendingFiles []*fsFile
clean := func() {
pendingFiles = h.cleanCache(pendingFiles)
}
if fs.CleanStop != nil {
t := time.NewTicker(cacheDuration / 2)
for {
select {
case <-t.C:
clean()
case _, stillOpen := <-fs.CleanStop:
// Ignore values send on the channel, only stop when it is closed.
if !stillOpen {
t.Stop()
return
}
}
}
}
for {
time.Sleep(cacheDuration / 2)
clean()
}
}()
fs.h = h.handleRequest
}
type fsHandler struct {
fs fs.FS
filesystem fs.FS
root string
indexNames []string
pathRewrite PathRewriteFunc
@@ -537,13 +507,9 @@ type fsHandler struct {
compressBrotli bool
compressRoot string
acceptByteRange bool
cacheDuration time.Duration
compressedFileSuffixes map[string]string
cache map[string]*fsFile
cacheBrotli map[string]*fsFile
cacheGzip map[string]*fsFile
cacheLock sync.Mutex
cacheManager cacheManager
smallFileReaderPool sync.Pool
}
@@ -596,7 +562,7 @@ func (ff *fsFile) smallFileReader() (io.Reader, error) {
const maxSmallFileSize = 2 * 4096
func (ff *fsFile) isBig() bool {
if _, ok := ff.h.fs.(*osFS); !ok { // fs.FS only uses bigFileReader, memory cache uses fsSmallFileReader
if _, ok := ff.h.filesystem.(*osFS); !ok { // fs.FS only uses bigFileReader, memory cache uses fsSmallFileReader
return ff.f != nil
}
return ff.contentLength > maxSmallFileSize && len(ff.dirIndex) == 0
@@ -621,7 +587,7 @@ func (ff *fsFile) bigFileReader() (io.Reader, error) {
return r, nil
}
f, err := ff.h.fs.Open(ff.filename)
f, err := ff.h.filesystem.Open(ff.filename)
if err != nil {
return nil, fmt.Errorf("cannot open already opened file: %w", err)
}
@@ -647,12 +613,12 @@ func (ff *fsFile) Release() {
}
func (ff *fsFile) decReadersCount() {
ff.h.cacheLock.Lock()
ff.readersCount--
if ff.readersCount < 0 {
ff.readersCount = 0
}
ff.h.cacheLock.Unlock()
ff.h.cacheManager.WithLock(func() {
ff.readersCount--
if ff.readersCount < 0 {
ff.readersCount = 0
}
})
}
// bigFileReader attempts to trigger sendfile
@@ -811,10 +777,164 @@ func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) {
return int64(curPos - r.startPos), err
}
func (h *fsHandler) cleanCache(pendingFiles []*fsFile) []*fsFile {
type cacheManager interface {
WithLock(work func())
GetFileFromCache(cacheKind CacheKind, path string) (*fsFile, bool)
SetFileToCache(cacheKind CacheKind, path string, ff *fsFile) *fsFile
}
var (
_ cacheManager = (*inMemoryCacheManager)(nil)
_ cacheManager = (*noopCacheManager)(nil)
)
type CacheKind uint8
const (
defaultCacheKind CacheKind = iota
brotliCacheKind
gzipCacheKind
)
func newCacheManager(fs *FS) cacheManager {
if fs.SkipCache {
return &noopCacheManager{}
}
cacheDuration := fs.CacheDuration
if cacheDuration <= 0 {
cacheDuration = FSHandlerCacheDuration
}
instance := &inMemoryCacheManager{
cacheDuration: cacheDuration,
cache: make(map[string]*fsFile),
cacheBrotli: make(map[string]*fsFile),
cacheGzip: make(map[string]*fsFile),
}
go instance.handleCleanCache(fs.CleanStop)
return instance
}
type noopCacheManager struct {
cacheLock sync.Mutex
}
func (n *noopCacheManager) WithLock(work func()) {
n.cacheLock.Lock()
work()
n.cacheLock.Unlock()
}
func (*noopCacheManager) GetFileFromCache(cacheKind CacheKind, path string) (*fsFile, bool) {
return nil, false
}
func (*noopCacheManager) SetFileToCache(cacheKind CacheKind, path string, ff *fsFile) *fsFile {
return ff
}
type inMemoryCacheManager struct {
cacheDuration time.Duration
cache map[string]*fsFile
cacheBrotli map[string]*fsFile
cacheGzip map[string]*fsFile
cacheLock sync.Mutex
}
func (cm *inMemoryCacheManager) WithLock(work func()) {
cm.cacheLock.Lock()
work()
cm.cacheLock.Unlock()
}
func (cm *inMemoryCacheManager) getFsCache(cacheKind CacheKind) map[string]*fsFile {
fileCache := cm.cache
switch cacheKind {
case brotliCacheKind:
fileCache = cm.cacheBrotli
case gzipCacheKind:
fileCache = cm.cacheGzip
}
return fileCache
}
func (cm *inMemoryCacheManager) GetFileFromCache(cacheKind CacheKind, path string) (*fsFile, bool) {
fileCache := cm.getFsCache(cacheKind)
cm.cacheLock.Lock()
ff, ok := fileCache[string(path)]
if ok {
ff.readersCount++
}
cm.cacheLock.Unlock()
return ff, ok
}
func (cm *inMemoryCacheManager) SetFileToCache(cacheKind CacheKind, path string, ff *fsFile) *fsFile {
fileCache := cm.getFsCache(cacheKind)
cm.cacheLock.Lock()
ff1, ok := fileCache[path]
if !ok {
fileCache[path] = ff
ff.readersCount++
} else {
ff1.readersCount++
}
cm.cacheLock.Unlock()
if ok {
// The file has been already opened by another
// goroutine, so close the current file and use
// the file opened by another goroutine instead.
ff.Release()
ff = ff1
}
return ff
}
func (cm *inMemoryCacheManager) handleCleanCache(cleanStop chan struct{}) {
var pendingFiles []*fsFile
clean := func() {
pendingFiles = cm.cleanCache(pendingFiles)
}
if cleanStop != nil {
t := time.NewTicker(cm.cacheDuration / 2)
for {
select {
case <-t.C:
clean()
case _, stillOpen := <-cleanStop:
// Ignore values send on the channel, only stop when it is closed.
if !stillOpen {
t.Stop()
return
}
}
}
}
for {
time.Sleep(cm.cacheDuration / 2)
clean()
}
}
func (cm *inMemoryCacheManager) cleanCache(pendingFiles []*fsFile) []*fsFile {
var filesToRelease []*fsFile
h.cacheLock.Lock()
cm.cacheLock.Lock()
// Close files which couldn't be closed before due to non-zero
// readers count on the previous run.
@@ -828,11 +948,11 @@ func (h *fsHandler) cleanCache(pendingFiles []*fsFile) []*fsFile {
}
pendingFiles = remainingFiles
pendingFiles, filesToRelease = cleanCacheNolock(h.cache, pendingFiles, filesToRelease, h.cacheDuration)
pendingFiles, filesToRelease = cleanCacheNolock(h.cacheBrotli, pendingFiles, filesToRelease, h.cacheDuration)
pendingFiles, filesToRelease = cleanCacheNolock(h.cacheGzip, pendingFiles, filesToRelease, h.cacheDuration)
pendingFiles, filesToRelease = cleanCacheNolock(cm.cache, pendingFiles, filesToRelease, cm.cacheDuration)
pendingFiles, filesToRelease = cleanCacheNolock(cm.cacheBrotli, pendingFiles, filesToRelease, cm.cacheDuration)
pendingFiles, filesToRelease = cleanCacheNolock(cm.cacheGzip, pendingFiles, filesToRelease, cm.cacheDuration)
h.cacheLock.Unlock()
cm.cacheLock.Unlock()
for _, ff := range filesToRelease {
ff.Release()
@@ -860,7 +980,7 @@ func cleanCacheNolock(cache map[string]*fsFile, pendingFiles, filesToRelease []*
}
func (h *fsHandler) pathToFilePath(path string) string {
if _, ok := h.fs.(*osFS); !ok {
if _, ok := h.filesystem.(*osFS); !ok {
if len(path) < 1 {
return path
}
@@ -906,30 +1026,25 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
}
mustCompress := false
fileCache := h.cache
fileCacheKind := defaultCacheKind
fileEncoding := ""
byteRange := ctx.Request.Header.peek(strRange)
if len(byteRange) == 0 && h.compress {
if h.compressBrotli && ctx.Request.Header.HasAcceptEncodingBytes(strBr) {
mustCompress = true
fileCache = h.cacheBrotli
fileCacheKind = brotliCacheKind
fileEncoding = "br"
} else if ctx.Request.Header.HasAcceptEncodingBytes(strGzip) {
mustCompress = true
fileCache = h.cacheGzip
fileCacheKind = gzipCacheKind
fileEncoding = "gzip"
}
}
h.cacheLock.Lock()
ff, ok := fileCache[string(path)]
if ok {
ff.readersCount++
}
h.cacheLock.Unlock()
pathStr := string(path)
ff, ok := h.cacheManager.GetFileFromCache(fileCacheKind, pathStr)
if !ok {
pathStr := string(path)
filePath := h.pathToFilePath(pathStr)
var err error
@@ -962,23 +1077,7 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
return
}
h.cacheLock.Lock()
ff1, ok := fileCache[pathStr]
if !ok {
fileCache[pathStr] = ff
ff.readersCount++
} else {
ff1.readersCount++
}
h.cacheLock.Unlock()
if ok {
// The file has been already opened by another
// goroutine, so close the current file and use
// the file opened by another goroutine instead.
ff.Release()
ff = ff1
}
ff = h.cacheManager.SetFileToCache(fileCacheKind, pathStr, ff)
}
if !ctx.IfModifiedSince(ff.lastModified) {
@@ -1153,7 +1252,7 @@ func (h *fsHandler) createDirIndex(ctx *RequestCtx, dirPath string, mustCompress
_, _ = fmt.Fprintf(w, `<li><a href="%s" class="dir">..</a></li>`, parentPathEscaped)
}
dirEntries, err := fs.ReadDir(h.fs, dirPath)
dirEntries, err := fs.ReadDir(h.filesystem, dirPath)
if err != nil {
return nil, err
}
@@ -1233,7 +1332,7 @@ const (
)
func (h *fsHandler) compressAndOpenFSFile(filePath string, fileEncoding string) (*fsFile, error) {
f, err := h.fs.Open(filePath)
f, err := h.filesystem.Open(filePath)
if err != nil {
return nil, err
}
@@ -1257,7 +1356,7 @@ func (h *fsHandler) compressAndOpenFSFile(filePath string, fileEncoding string)
compressedFilePath := h.filePathToCompressed(filePath)
if _, ok := h.fs.(*osFS); !ok {
if _, ok := h.filesystem.(*osFS); !ok {
return h.newCompressedFSFileCache(f, fileInfo, compressedFilePath, fileEncoding)
}
@@ -1397,7 +1496,7 @@ func (h *fsHandler) newCompressedFSFileCache(f fs.File, fileInfo fs.FileInfo, fi
}
func (h *fsHandler) newCompressedFSFile(filePath string, fileEncoding string) (*fsFile, error) {
f, err := h.fs.Open(filePath)
f, err := h.filesystem.Open(filePath)
if err != nil {
return nil, fmt.Errorf("cannot open compressed file %q: %w", filePath, err)
}
@@ -1415,7 +1514,7 @@ func (h *fsHandler) openFSFile(filePath string, mustCompress bool, fileEncoding
filePath += h.compressedFileSuffixes[fileEncoding]
}
f, err := h.fs.Open(filePath)
f, err := h.filesystem.Open(filePath)
if err != nil {
if mustCompress && errors.Is(err, fs.ErrNotExist) {
return h.compressAndOpenFSFile(filePathOriginal, fileEncoding)
@@ -1439,7 +1538,7 @@ func (h *fsHandler) openFSFile(filePath string, mustCompress bool, fileEncoding
}
if mustCompress {
fileInfoOriginal, err := fs.Stat(h.fs, filePathOriginal)
fileInfoOriginal, err := fs.Stat(h.filesystem, filePathOriginal)
if err != nil {
_ = f.Close()
return nil, fmt.Errorf("cannot obtain info for original file %q: %w", filePathOriginal, err)
+105 -8
View File
@@ -66,6 +66,8 @@ func TestNewVHostPathRewriterMaliciousHost(t *testing.T) {
}
func testPathNotFound(t *testing.T, pathNotFoundFunc RequestHandler) {
t.Helper()
var ctx RequestCtx
var req Request
req.SetRequestURI("http//some.url/file")
@@ -302,11 +304,30 @@ func TestFSByteRangeConcurrent(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
fs := &FS{
runFSByteRangeConcurrent(t, &FS{
Root: ".",
AcceptByteRange: true,
CleanStop: stop,
}
})
}
func TestFSByteRangeConcurrentSkipCache(t *testing.T) {
// This test can't run parallel as files in / might be changed by other tests.
stop := make(chan struct{})
defer close(stop)
runFSByteRangeConcurrent(t, &FS{
Root: ".",
SkipCache: true,
AcceptByteRange: true,
CleanStop: stop,
})
}
func runFSByteRangeConcurrent(t *testing.T, fs *FS) {
t.Helper()
h := fs.NewRequestHandler()
concurrency := 10
@@ -336,11 +357,30 @@ func TestFSByteRangeSingleThread(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
fs := &FS{
runFSByteRangeSingleThread(t, &FS{
Root: ".",
AcceptByteRange: true,
CleanStop: stop,
}
})
}
func TestFSByteRangeSingleThreadSkipCache(t *testing.T) {
// This test can't run parallel as files in / might be changed by other tests.
stop := make(chan struct{})
defer close(stop)
runFSByteRangeSingleThread(t, &FS{
Root: ".",
AcceptByteRange: true,
SkipCache: true,
CleanStop: stop,
})
}
func runFSByteRangeSingleThread(t *testing.T, fs *FS) {
t.Helper()
h := fs.NewRequestHandler()
testFSByteRange(t, h, "/fs.go")
@@ -348,6 +388,8 @@ func TestFSByteRangeSingleThread(t *testing.T) {
}
func testFSByteRange(t *testing.T, h RequestHandler, filePath string) {
t.Helper()
var ctx RequestCtx
ctx.Init(&Request{}, nil, nil)
@@ -427,6 +469,8 @@ func TestParseByteRangeSuccess(t *testing.T) {
}
func testParseByteRangeSuccess(t *testing.T, v string, contentLength, startPos, endPos int) {
t.Helper()
startPos1, endPos1, err := ParseByteRange([]byte(v), contentLength)
if err != nil {
t.Fatalf("unexpected error: %v. v=%q, contentLength=%d", err, v, contentLength)
@@ -467,6 +511,8 @@ func TestParseByteRangeError(t *testing.T) {
}
func testParseByteRangeError(t *testing.T, v string, contentLength int) {
t.Helper()
_, _, err := ParseByteRange([]byte(v), contentLength)
if err == nil {
t.Fatalf("expecting error when parsing byte range %q", v)
@@ -480,17 +526,41 @@ func TestFSCompressConcurrent(t *testing.T) {
}
// This test can't run parallel as files in / might be changed by other tests.
stop := make(chan struct{})
defer close(stop)
fs := &FS{
runFSCompressConcurrent(t, &FS{
Root: ".",
GenerateIndexPages: true,
Compress: true,
CompressBrotli: true,
CleanStop: stop,
})
}
func TestFSCompressConcurrentSkipCache(t *testing.T) {
// Don't run this test on Windows, the Windows GitHub actions are too slow and timeout too often.
if runtime.GOOS == "windows" {
t.SkipNow()
}
// This test can't run parallel as files in / might be changed by other tests.
stop := make(chan struct{})
defer close(stop)
runFSCompressConcurrent(t, &FS{
Root: ".",
GenerateIndexPages: true,
SkipCache: true,
Compress: true,
CompressBrotli: true,
CleanStop: stop,
})
}
func runFSCompressConcurrent(t *testing.T, fs *FS) {
t.Helper()
h := fs.NewRequestHandler()
concurrency := 4
@@ -521,13 +591,34 @@ func TestFSCompressSingleThread(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
fs := &FS{
runFSCompressSingleThread(t, &FS{
Root: ".",
GenerateIndexPages: true,
Compress: true,
CompressBrotli: true,
CleanStop: stop,
}
})
}
func TestFSCompressSingleThreadSkipCache(t *testing.T) {
// This test can't run parallel as files in / might be changed by other tests.
stop := make(chan struct{})
defer close(stop)
runFSCompressSingleThread(t, &FS{
Root: ".",
GenerateIndexPages: true,
SkipCache: true,
Compress: true,
CompressBrotli: true,
CleanStop: stop,
})
}
func runFSCompressSingleThread(t *testing.T, fs *FS) {
t.Helper()
h := fs.NewRequestHandler()
testFSCompress(t, h, "/fs.go")
@@ -536,6 +627,8 @@ func TestFSCompressSingleThread(t *testing.T) {
}
func testFSCompress(t *testing.T, h RequestHandler, filePath string) {
t.Helper()
// File locking is flaky on Windows.
if runtime.GOOS == "windows" {
t.SkipNow()
@@ -755,6 +848,8 @@ func TestStripPathSlashes(t *testing.T) {
}
func testStripPathSlashes(t *testing.T, path string, stripSlashes int, expectedPath string) {
t.Helper()
s := stripLeadingSlashes([]byte(path), stripSlashes)
s = stripTrailingSlashes(s)
if string(s) != expectedPath {
@@ -779,6 +874,8 @@ func TestFileExtension(t *testing.T) {
}
func testFileExtension(t *testing.T, path string, compressed bool, compressedFileSuffix, expectedExt string) {
t.Helper()
ext := fileExtension(path, compressed, compressedFileSuffix)
if ext != expectedExt {
t.Fatalf("unexpected file extension for file %q: %q. Expecting %q", path, ext, expectedExt)