mirror of
https://github.com/valyala/fasthttp.git
synced 2026-06-14 15:56:44 +03:00
30e92af08f
Previously each concurrent compression could allocate huge compression state
with the size up to 1Mb each. So 10K concurrent connections could result in
10Gb of compression state in the heap.
This CL limits the number of compression states among concurrent requests
when {Append,Write}{Gzip,Deflate}* functions are called to O(GOMAXPROCS).
These functions are used by CompressHandler* for non-streaming responses,
i.e. it should cover the majority of use cases.
Memory usage for 10K concurrent connections that compress responses drops
from 10Gb to 200Mb after this CL.
441 lines
11 KiB
Go
441 lines
11 KiB
Go
package fasthttp
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/klauspost/compress/flate"
|
|
"github.com/klauspost/compress/gzip"
|
|
"github.com/klauspost/compress/zlib"
|
|
"github.com/valyala/bytebufferpool"
|
|
"github.com/valyala/fasthttp/stackless"
|
|
)
|
|
|
|
// Supported compression levels.
|
|
const (
|
|
CompressNoCompression = flate.NoCompression
|
|
CompressBestSpeed = flate.BestSpeed
|
|
CompressBestCompression = flate.BestCompression
|
|
CompressDefaultCompression = 6 // flate.DefaultCompression
|
|
CompressHuffmanOnly = -2 // flate.HuffmanOnly
|
|
)
|
|
|
|
func acquireGzipReader(r io.Reader) (*gzip.Reader, error) {
|
|
v := gzipReaderPool.Get()
|
|
if v == nil {
|
|
return gzip.NewReader(r)
|
|
}
|
|
zr := v.(*gzip.Reader)
|
|
if err := zr.Reset(r); err != nil {
|
|
return nil, err
|
|
}
|
|
return zr, nil
|
|
}
|
|
|
|
func releaseGzipReader(zr *gzip.Reader) {
|
|
zr.Close()
|
|
gzipReaderPool.Put(zr)
|
|
}
|
|
|
|
var gzipReaderPool sync.Pool
|
|
|
|
func acquireFlateReader(r io.Reader) (io.ReadCloser, error) {
|
|
v := flateReaderPool.Get()
|
|
if v == nil {
|
|
zr, err := zlib.NewReader(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return zr, nil
|
|
}
|
|
zr := v.(io.ReadCloser)
|
|
if err := resetFlateReader(zr, r); err != nil {
|
|
return nil, err
|
|
}
|
|
return zr, nil
|
|
}
|
|
|
|
func releaseFlateReader(zr io.ReadCloser) {
|
|
zr.Close()
|
|
flateReaderPool.Put(zr)
|
|
}
|
|
|
|
func resetFlateReader(zr io.ReadCloser, r io.Reader) error {
|
|
zrr, ok := zr.(zlib.Resetter)
|
|
if !ok {
|
|
panic("BUG: zlib.Reader doesn't implement zlib.Resetter???")
|
|
}
|
|
return zrr.Reset(r, nil)
|
|
}
|
|
|
|
var flateReaderPool sync.Pool
|
|
|
|
func acquireStacklessGzipWriter(w io.Writer, level int) stackless.Writer {
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := stacklessGzipWriterPoolMap[nLevel]
|
|
v := p.Get()
|
|
if v == nil {
|
|
return stackless.NewWriter(w, func(w io.Writer) stackless.Writer {
|
|
return acquireRealGzipWriter(w, level)
|
|
})
|
|
}
|
|
sw := v.(stackless.Writer)
|
|
sw.Reset(w)
|
|
return sw
|
|
}
|
|
|
|
func releaseStacklessGzipWriter(sw stackless.Writer, level int) {
|
|
sw.Close()
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := stacklessGzipWriterPoolMap[nLevel]
|
|
p.Put(sw)
|
|
}
|
|
|
|
func acquireRealGzipWriter(w io.Writer, level int) *gzip.Writer {
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := realGzipWriterPoolMap[nLevel]
|
|
v := p.Get()
|
|
if v == nil {
|
|
zw, err := gzip.NewWriterLevel(w, level)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("BUG: unexpected error from gzip.NewWriterLevel(%d): %s", level, err))
|
|
}
|
|
return zw
|
|
}
|
|
zw := v.(*gzip.Writer)
|
|
zw.Reset(w)
|
|
return zw
|
|
}
|
|
|
|
func releaseRealGzipWriter(zw *gzip.Writer, level int) {
|
|
zw.Close()
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := realGzipWriterPoolMap[nLevel]
|
|
p.Put(zw)
|
|
}
|
|
|
|
var (
|
|
stacklessGzipWriterPoolMap = newCompressWriterPoolMap()
|
|
realGzipWriterPoolMap = newCompressWriterPoolMap()
|
|
)
|
|
|
|
// AppendGzipBytesLevel appends gzipped src to dst using the given
|
|
// compression level and returns the resulting dst.
|
|
//
|
|
// Supported compression levels are:
|
|
//
|
|
// * CompressNoCompression
|
|
// * CompressBestSpeed
|
|
// * CompressBestCompression
|
|
// * CompressDefaultCompression
|
|
// * CompressHuffmanOnly
|
|
func AppendGzipBytesLevel(dst, src []byte, level int) []byte {
|
|
w := &byteSliceWriter{dst}
|
|
WriteGzipLevel(w, src, level)
|
|
return w.b
|
|
}
|
|
|
|
// WriteGzipLevel writes gzipped p to w using the given compression level
|
|
// and returns the number of compressed bytes written to w.
|
|
//
|
|
// Supported compression levels are:
|
|
//
|
|
// * CompressNoCompression
|
|
// * CompressBestSpeed
|
|
// * CompressBestCompression
|
|
// * CompressDefaultCompression
|
|
// * CompressHuffmanOnly
|
|
func WriteGzipLevel(w io.Writer, p []byte, level int) (int, error) {
|
|
switch w.(type) {
|
|
case *byteSliceWriter,
|
|
*bytes.Buffer,
|
|
*ByteBuffer,
|
|
*bytebufferpool.ByteBuffer:
|
|
// These writers don't block, so we can just use stacklessWriteGzip
|
|
ctx := &compressCtx{
|
|
w: w,
|
|
p: p,
|
|
level: level,
|
|
}
|
|
stacklessWriteGzip(ctx)
|
|
return len(p), nil
|
|
default:
|
|
zw := acquireStacklessGzipWriter(w, level)
|
|
n, err := zw.Write(p)
|
|
releaseStacklessGzipWriter(zw, level)
|
|
return n, err
|
|
}
|
|
}
|
|
|
|
var stacklessWriteGzip = stackless.NewFunc(nonblockingWriteGzip)
|
|
|
|
func nonblockingWriteGzip(ctxv interface{}) {
|
|
ctx := ctxv.(*compressCtx)
|
|
zw := acquireRealGzipWriter(ctx.w, ctx.level)
|
|
|
|
_, err := zw.Write(ctx.p)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("BUG: gzip.Writer.Write for len(p)=%d returned unexpected error: %s", len(ctx.p), err))
|
|
}
|
|
|
|
releaseRealGzipWriter(zw, ctx.level)
|
|
}
|
|
|
|
// WriteGzip writes gzipped p to w and returns the number of compressed
|
|
// bytes written to w.
|
|
func WriteGzip(w io.Writer, p []byte) (int, error) {
|
|
return WriteGzipLevel(w, p, CompressDefaultCompression)
|
|
}
|
|
|
|
// AppendGzipBytes appends gzipped src to dst and returns the resulting dst.
|
|
func AppendGzipBytes(dst, src []byte) []byte {
|
|
return AppendGzipBytesLevel(dst, src, CompressDefaultCompression)
|
|
}
|
|
|
|
// WriteGunzip writes ungzipped p to w and returns the number of uncompressed
|
|
// bytes written to w.
|
|
func WriteGunzip(w io.Writer, p []byte) (int, error) {
|
|
r := &byteSliceReader{p}
|
|
zr, err := acquireGzipReader(r)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
n, err := copyZeroAlloc(w, zr)
|
|
releaseGzipReader(zr)
|
|
nn := int(n)
|
|
if int64(nn) != n {
|
|
return 0, fmt.Errorf("too much data gunzipped: %d", n)
|
|
}
|
|
return nn, err
|
|
}
|
|
|
|
// AppendGunzipBytes appends gunzipped src to dst and returns the resulting dst.
|
|
func AppendGunzipBytes(dst, src []byte) ([]byte, error) {
|
|
w := &byteSliceWriter{dst}
|
|
_, err := WriteGunzip(w, src)
|
|
return w.b, err
|
|
}
|
|
|
|
// AppendDeflateBytesLevel appends deflated src to dst using the given
|
|
// compression level and returns the resulting dst.
|
|
//
|
|
// Supported compression levels are:
|
|
//
|
|
// * CompressNoCompression
|
|
// * CompressBestSpeed
|
|
// * CompressBestCompression
|
|
// * CompressDefaultCompression
|
|
// * CompressHuffmanOnly
|
|
func AppendDeflateBytesLevel(dst, src []byte, level int) []byte {
|
|
w := &byteSliceWriter{dst}
|
|
WriteDeflateLevel(w, src, level)
|
|
return w.b
|
|
}
|
|
|
|
// WriteDeflateLevel writes deflated p to w using the given compression level
|
|
// and returns the number of compressed bytes written to w.
|
|
//
|
|
// Supported compression levels are:
|
|
//
|
|
// * CompressNoCompression
|
|
// * CompressBestSpeed
|
|
// * CompressBestCompression
|
|
// * CompressDefaultCompression
|
|
// * CompressHuffmanOnly
|
|
func WriteDeflateLevel(w io.Writer, p []byte, level int) (int, error) {
|
|
switch w.(type) {
|
|
case *byteSliceWriter,
|
|
*bytes.Buffer,
|
|
*ByteBuffer,
|
|
*bytebufferpool.ByteBuffer:
|
|
// These writers don't block, so we can just use stacklessWriteDeflate
|
|
ctx := &compressCtx{
|
|
w: w,
|
|
p: p,
|
|
level: level,
|
|
}
|
|
stacklessWriteDeflate(ctx)
|
|
return len(p), nil
|
|
default:
|
|
zw := acquireStacklessDeflateWriter(w, level)
|
|
n, err := zw.Write(p)
|
|
releaseStacklessDeflateWriter(zw, level)
|
|
return n, err
|
|
}
|
|
}
|
|
|
|
var stacklessWriteDeflate = stackless.NewFunc(nonblockingWriteDeflate)
|
|
|
|
func nonblockingWriteDeflate(ctxv interface{}) {
|
|
ctx := ctxv.(*compressCtx)
|
|
zw := acquireRealDeflateWriter(ctx.w, ctx.level)
|
|
|
|
_, err := zw.Write(ctx.p)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("BUG: zlib.Writer.Write for len(p)=%d returned unexpected error: %s", len(ctx.p), err))
|
|
}
|
|
|
|
releaseRealDeflateWriter(zw, ctx.level)
|
|
}
|
|
|
|
type compressCtx struct {
|
|
w io.Writer
|
|
p []byte
|
|
level int
|
|
}
|
|
|
|
// WriteDeflate writes deflated p to w and returns the number of compressed
|
|
// bytes written to w.
|
|
func WriteDeflate(w io.Writer, p []byte) (int, error) {
|
|
return WriteDeflateLevel(w, p, CompressDefaultCompression)
|
|
}
|
|
|
|
// AppendDeflateBytes appends deflated src to dst and returns the resulting dst.
|
|
func AppendDeflateBytes(dst, src []byte) []byte {
|
|
return AppendDeflateBytesLevel(dst, src, CompressDefaultCompression)
|
|
}
|
|
|
|
// WriteInflate writes inflated p to w and returns the number of uncompressed
|
|
// bytes written to w.
|
|
func WriteInflate(w io.Writer, p []byte) (int, error) {
|
|
r := &byteSliceReader{p}
|
|
zr, err := acquireFlateReader(r)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
n, err := copyZeroAlloc(w, zr)
|
|
releaseFlateReader(zr)
|
|
nn := int(n)
|
|
if int64(nn) != n {
|
|
return 0, fmt.Errorf("too much data inflated: %d", n)
|
|
}
|
|
return nn, err
|
|
}
|
|
|
|
// AppendInflateBytes appends inflated src to dst and returns the resulting dst.
|
|
func AppendInflateBytes(dst, src []byte) ([]byte, error) {
|
|
w := &byteSliceWriter{dst}
|
|
_, err := WriteInflate(w, src)
|
|
return w.b, err
|
|
}
|
|
|
|
type byteSliceWriter struct {
|
|
b []byte
|
|
}
|
|
|
|
func (w *byteSliceWriter) Write(p []byte) (int, error) {
|
|
w.b = append(w.b, p...)
|
|
return len(p), nil
|
|
}
|
|
|
|
type byteSliceReader struct {
|
|
b []byte
|
|
}
|
|
|
|
func (r *byteSliceReader) Read(p []byte) (int, error) {
|
|
if len(r.b) == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
n := copy(p, r.b)
|
|
r.b = r.b[n:]
|
|
return n, nil
|
|
}
|
|
|
|
func acquireStacklessDeflateWriter(w io.Writer, level int) stackless.Writer {
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := stacklessDeflateWriterPoolMap[nLevel]
|
|
v := p.Get()
|
|
if v == nil {
|
|
return stackless.NewWriter(w, func(w io.Writer) stackless.Writer {
|
|
return acquireRealDeflateWriter(w, level)
|
|
})
|
|
}
|
|
sw := v.(stackless.Writer)
|
|
sw.Reset(w)
|
|
return sw
|
|
}
|
|
|
|
func releaseStacklessDeflateWriter(sw stackless.Writer, level int) {
|
|
sw.Close()
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := stacklessDeflateWriterPoolMap[nLevel]
|
|
p.Put(sw)
|
|
}
|
|
|
|
func acquireRealDeflateWriter(w io.Writer, level int) *zlib.Writer {
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := realDeflateWriterPoolMap[nLevel]
|
|
v := p.Get()
|
|
if v == nil {
|
|
zw, err := zlib.NewWriterLevel(w, level)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("BUG: unexpected error from zlib.NewWriterLevel(%d): %s", level, err))
|
|
}
|
|
return zw
|
|
}
|
|
zw := v.(*zlib.Writer)
|
|
zw.Reset(w)
|
|
return zw
|
|
}
|
|
|
|
func releaseRealDeflateWriter(zw *zlib.Writer, level int) {
|
|
zw.Close()
|
|
nLevel := normalizeCompressLevel(level)
|
|
p := realDeflateWriterPoolMap[nLevel]
|
|
p.Put(zw)
|
|
}
|
|
|
|
var (
|
|
stacklessDeflateWriterPoolMap = newCompressWriterPoolMap()
|
|
realDeflateWriterPoolMap = newCompressWriterPoolMap()
|
|
)
|
|
|
|
func newCompressWriterPoolMap() []*sync.Pool {
|
|
// Initialize pools for all the compression levels defined
|
|
// in https://golang.org/pkg/compress/flate/#pkg-constants .
|
|
// Compression levels are normalized with normalizeCompressLevel,
|
|
// so the fit [0..11].
|
|
var m []*sync.Pool
|
|
for i := 0; i < 12; i++ {
|
|
m = append(m, &sync.Pool{})
|
|
}
|
|
return m
|
|
}
|
|
|
|
func isFileCompressible(f *os.File, minCompressRatio float64) bool {
|
|
// Try compressing the first 4kb of of the file
|
|
// and see if it can be compressed by more than
|
|
// the given minCompressRatio.
|
|
b := AcquireByteBuffer()
|
|
zw := acquireStacklessGzipWriter(b, CompressDefaultCompression)
|
|
lr := &io.LimitedReader{
|
|
R: f,
|
|
N: 4096,
|
|
}
|
|
_, err := copyZeroAlloc(zw, lr)
|
|
releaseStacklessGzipWriter(zw, CompressDefaultCompression)
|
|
f.Seek(0, 0)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
n := 4096 - lr.N
|
|
zn := len(b.B)
|
|
ReleaseByteBuffer(b)
|
|
return float64(zn) < float64(n)*minCompressRatio
|
|
}
|
|
|
|
// normalizes compression level into [0..11], so it could be used as an index
|
|
// in *PoolMap.
|
|
func normalizeCompressLevel(level int) int {
|
|
// -2 is the lowest compression level - CompressHuffmanOnly
|
|
// 9 is the highest compression level - CompressBestCompression
|
|
if level < -2 || level > 9 {
|
|
level = CompressDefaultCompression
|
|
}
|
|
return level + 2
|
|
}
|