Files
fasthttp/compress_test.go
T
Aliaksandr Valialkin 30e92af08f Limit heap memory usage when compressing high number of concurrent responses
Previously each concurrent compression could allocate huge compression state
with the size up to 1Mb each. So 10K concurrent connections could result in
10Gb of compression state in the heap.

This CL limits the number of compression states among concurrent requests
when {Append,Write}{Gzip,Deflate}* functions are called to O(GOMAXPROCS).
These functions are used by CompressHandler* for non-streaming responses,
i.e. it should cover the majority of use cases.

Memory usage for 10K concurrent connections that compress responses drops
from 10Gb to 200Mb after this CL.
2017-05-17 14:45:31 +03:00

217 lines
5.2 KiB
Go

package fasthttp
import (
"bytes"
"fmt"
"io/ioutil"
"testing"
"time"
)
var compressTestcases = func() []string {
a := []string{
"",
"foobar",
"выфаодлодл одлфываыв sd2 k34",
}
bigS := createFixedBody(1e4)
a = append(a, string(bigS))
return a
}()
func TestGzipBytesSerial(t *testing.T) {
if err := testGzipBytes(); err != nil {
t.Fatal(err)
}
}
func TestGzipBytesConcurrent(t *testing.T) {
if err := testConcurrent(10, testGzipBytes); err != nil {
t.Fatal(err)
}
}
func TestDeflateBytesSerial(t *testing.T) {
if err := testDeflateBytes(); err != nil {
t.Fatal(err)
}
}
func TestDeflateBytesConcurrent(t *testing.T) {
if err := testConcurrent(10, testDeflateBytes); err != nil {
t.Fatal(err)
}
}
func testGzipBytes() error {
for _, s := range compressTestcases {
if err := testGzipBytesSingleCase(s); err != nil {
return err
}
}
return nil
}
func testDeflateBytes() error {
for _, s := range compressTestcases {
if err := testDeflateBytesSingleCase(s); err != nil {
return err
}
}
return nil
}
func testGzipBytesSingleCase(s string) error {
prefix := []byte("foobar")
gzippedS := AppendGzipBytes(prefix, []byte(s))
if !bytes.Equal(gzippedS[:len(prefix)], prefix) {
return fmt.Errorf("unexpected prefix when compressing %q: %q. Expecting %q", s, gzippedS[:len(prefix)], prefix)
}
gunzippedS, err := AppendGunzipBytes(prefix, gzippedS[len(prefix):])
if err != nil {
return fmt.Errorf("unexpected error when uncompressing %q: %s", s, err)
}
if !bytes.Equal(gunzippedS[:len(prefix)], prefix) {
return fmt.Errorf("unexpected prefix when uncompressing %q: %q. Expecting %q", s, gunzippedS[:len(prefix)], prefix)
}
gunzippedS = gunzippedS[len(prefix):]
if string(gunzippedS) != s {
return fmt.Errorf("unexpected uncompressed string %q. Expecting %q", gunzippedS, s)
}
return nil
}
func testDeflateBytesSingleCase(s string) error {
prefix := []byte("foobar")
deflatedS := AppendDeflateBytes(prefix, []byte(s))
if !bytes.Equal(deflatedS[:len(prefix)], prefix) {
return fmt.Errorf("unexpected prefix when compressing %q: %q. Expecting %q", s, deflatedS[:len(prefix)], prefix)
}
inflatedS, err := AppendInflateBytes(prefix, deflatedS[len(prefix):])
if err != nil {
return fmt.Errorf("unexpected error when uncompressing %q: %s", s, err)
}
if !bytes.Equal(inflatedS[:len(prefix)], prefix) {
return fmt.Errorf("unexpected prefix when uncompressing %q: %q. Expecting %q", s, inflatedS[:len(prefix)], prefix)
}
inflatedS = inflatedS[len(prefix):]
if string(inflatedS) != s {
return fmt.Errorf("unexpected uncompressed string %q. Expecting %q", inflatedS, s)
}
return nil
}
func TestGzipCompressSerial(t *testing.T) {
if err := testGzipCompress(); err != nil {
t.Fatal(err)
}
}
func TestGzipCompressConcurrent(t *testing.T) {
if err := testConcurrent(10, testGzipCompress); err != nil {
t.Fatal(err)
}
}
func TestFlateCompressSerial(t *testing.T) {
if err := testFlateCompress(); err != nil {
t.Fatal(err)
}
}
func TestFlateCompressConcurrent(t *testing.T) {
if err := testConcurrent(10, testFlateCompress); err != nil {
t.Fatal(err)
}
}
func testGzipCompress() error {
for _, s := range compressTestcases {
if err := testGzipCompressSingleCase(s); err != nil {
return err
}
}
return nil
}
func testFlateCompress() error {
for _, s := range compressTestcases {
if err := testFlateCompressSingleCase(s); err != nil {
return err
}
}
return nil
}
func testGzipCompressSingleCase(s string) error {
var buf bytes.Buffer
zw := acquireStacklessGzipWriter(&buf, CompressDefaultCompression)
if _, err := zw.Write([]byte(s)); err != nil {
return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
releaseStacklessGzipWriter(zw, CompressDefaultCompression)
zr, err := acquireGzipReader(&buf)
if err != nil {
return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
body, err := ioutil.ReadAll(zr)
if err != nil {
return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
if string(body) != s {
return fmt.Errorf("unexpected string after decompression: %q. Expecting %q", body, s)
}
releaseGzipReader(zr)
return nil
}
func testFlateCompressSingleCase(s string) error {
var buf bytes.Buffer
zw := acquireStacklessDeflateWriter(&buf, CompressDefaultCompression)
if _, err := zw.Write([]byte(s)); err != nil {
return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
releaseStacklessDeflateWriter(zw, CompressDefaultCompression)
zr, err := acquireFlateReader(&buf)
if err != nil {
return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
body, err := ioutil.ReadAll(zr)
if err != nil {
return fmt.Errorf("unexpected error: %s. s=%q", err, s)
}
if string(body) != s {
return fmt.Errorf("unexpected string after decompression: %q. Expecting %q", body, s)
}
releaseFlateReader(zr)
return nil
}
func testConcurrent(concurrency int, f func() error) error {
ch := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func(idx int) {
err := f()
if err != nil {
ch <- fmt.Errorf("error in goroutine %d: %s", idx, err)
}
ch <- nil
}(i)
}
for i := 0; i < concurrency; i++ {
select {
case err := <-ch:
if err != nil {
return err
}
case <-time.After(time.Second):
return fmt.Errorf("timeout")
}
}
return nil
}