Files
seaweedfs/weed/s3api/s3api_object_handlers_copy_bench_test.go
Chris Lu 9a70bbfcc6 feat(s3api): full-chunk gzip pass-through skips volume-side decompress (#9427)
Building on the io.Pipe streaming chunk copy: when a copy operation
covers an entire source chunk (the common case for Harbor's
part-size = chunk-size assemble pattern), ask the source volume for
compressed bytes via Accept-Encoding: gzip and forward them to the
destination as-is.

This trades a Range fetch (where the volume decompresses the chunk
internally to satisfy the byte range) for a full-chunk fetch that
returns whatever wire bytes the chunk is stored as. For gzipped
chunks the source volume avoids the decompression entirely; we never
allocate a chunk-sized decompress buffer.

Implementation: build the source GET directly instead of going
through ReadUrlAsStream, because that helper auto-decompresses gzip
responses (which would defeat the point). Trust the response's
Content-Encoding header over caller hints — for partial ranges the
volume always returns raw bytes regardless of how the chunk is
stored, so labeling those as gzip would corrupt subsequent reads.

End-to-end repro impact (512 MiB src, 6 parallel UploadPartCopy):
  + #9420/#9421/#9422       : 2236 MiB
  + io.Pipe streaming       : 1521 MiB
  + this commit             : 1149 MiB  (round 2 RSS, perfectly flat)

Round 3 now completes (was hitting volume-full before, since
chunks took up uncompressed space on disk; we now store the gzipped
chunks the volume gives us, which fit in the test's 8 GiB volume
budget).

Heap inuse_space (after force GC):
  before all: ~1.5 GiB
  this PR:    266 MiB

Volume-side bytes.Buffer.ReadFrom inuse:
  before:     611 MiB
  streaming:  571 MiB
  this PR:    297 MiB (now in destination-volume parseUpload's
                       size-hint decompression — separate
                       optimization opportunity for a hint header)
2026-05-10 14:55:59 -07:00

187 lines
4.9 KiB
Go

package s3api
import (
"context"
"crypto/sha256"
"fmt"
"io"
"mime/multipart"
"net"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
// benchEnv stands up a fake source volume (returns N bytes on GET) and a
// fake destination volume (parses an incoming multipart POST and discards
// the body, asserting the data arrived intact via SHA-256). It returns the
// URLs and an AssignVolumeResponse pointing at the destination.
type benchEnv struct {
srcSrv *httptest.Server
dstSrv *httptest.Server
payload []byte
dstHash [32]byte
assign *filer_pb.AssignVolumeResponse
}
func newBenchEnv(b *testing.B, payloadSize int) *benchEnv {
b.Helper()
payload := make([]byte, payloadSize)
for i := range payload {
payload[i] = byte(i*31 + 7) // arbitrary repeatable pattern
}
wantHash := sha256.Sum256(payload)
srcSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Honor Range header if present (UploadPartCopy passes one).
start, end := int64(0), int64(len(payload)-1)
if rng := r.Header.Get("Range"); rng != "" {
var s, e int64
if _, err := fmt.Sscanf(rng, "bytes=%d-%d", &s, &e); err == nil {
start, end = s, e
}
}
w.Header().Set("Content-Length", strconv.FormatInt(end-start+1, 10))
w.WriteHeader(http.StatusOK)
_, _ = w.Write(payload[start : end+1])
}))
dstSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mr, err := r.MultipartReader()
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
part, err := mr.NextPart()
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
h := sha256.New()
if _, err := io.Copy(h, part); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var got [32]byte
copy(got[:], h.Sum(nil))
if got != wantHash {
http.Error(w, "hash mismatch", http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusCreated)
_, _ = io.WriteString(w, `{"size":`+strconv.Itoa(payloadSize)+`}`)
}))
dstURL, _ := neturl(dstSrv.URL)
assign := &filer_pb.AssignVolumeResponse{
FileId: "1,benchmark",
Location: &filer_pb.Location{
Url: dstURL,
PublicUrl: dstURL,
},
Auth: "",
}
return &benchEnv{
srcSrv: srcSrv,
dstSrv: dstSrv,
payload: payload,
dstHash: wantHash,
assign: assign,
}
}
func (e *benchEnv) close() {
e.srcSrv.Close()
e.dstSrv.Close()
}
// neturl extracts host:port from "http://host:port".
func neturl(rawURL string) (string, error) {
const prefix = "http://"
if len(rawURL) < len(prefix) || rawURL[:len(prefix)] != prefix {
return "", fmt.Errorf("unexpected URL: %q", rawURL)
}
host := rawURL[len(prefix):]
if _, _, err := net.SplitHostPort(host); err != nil {
return "", err
}
return host, nil
}
// BenchmarkCopyChunk_Buffered measures the cost of the existing buffered
// chunk-copy path (downloadChunkData + uploadChunkData). The path
// allocates one chunk-sized []byte and one multipart-encoded buffer per
// call.
func BenchmarkCopyChunk_Buffered(b *testing.B) {
for _, size := range []int{1 << 20, 8 << 20, 64 << 20} {
b.Run(humanByteName(size), func(b *testing.B) {
util_http.InitGlobalHttpClient()
env := newBenchEnv(b, size)
defer env.close()
s3a := &S3ApiServer{}
b.ResetTimer()
b.SetBytes(int64(size))
b.ReportAllocs()
for i := 0; i < b.N; i++ {
data, err := s3a.downloadChunkData(env.srcSrv.URL, env.assign.FileId, 0, int64(size), nil)
if err != nil {
b.Fatalf("download: %v", err)
}
if err := s3a.uploadChunkData(data, env.assign, false); err != nil {
b.Fatalf("upload: %v", err)
}
}
})
}
}
// BenchmarkCopyChunk_Streamed measures the cost of the io.Pipe streaming
// chunk-copy path. The path holds only the pipe's hand-off buffer (~32
// KiB) plus http transport buffers per call, regardless of chunk size.
func BenchmarkCopyChunk_Streamed(b *testing.B) {
for _, size := range []int{1 << 20, 8 << 20, 64 << 20} {
b.Run(humanByteName(size), func(b *testing.B) {
util_http.InitGlobalHttpClient()
env := newBenchEnv(b, size)
defer env.close()
s3a := &S3ApiServer{}
b.ResetTimer()
b.SetBytes(int64(size))
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if err := s3a.streamCopyChunkRange(context.Background(),
env.srcSrv.URL, env.assign.FileId, 0, int64(size),
true /*isFullChunk*/, env.assign); err != nil {
b.Fatalf("stream: %v", err)
}
}
})
}
}
func humanByteName(n int) string {
switch {
case n >= 1<<30:
return strconv.Itoa(n>>30) + "GiB"
case n >= 1<<20:
return strconv.Itoa(n>>20) + "MiB"
case n >= 1<<10:
return strconv.Itoa(n>>10) + "KiB"
default:
return strconv.Itoa(n) + "B"
}
}
// Compile-time sanity: ensure the multipart library version we depend on
// is the one whose framing we mirror in streamCopyChunkRange.
var _ = multipart.NewWriter