feat(s3api): full-chunk gzip pass-through skips volume-side decompress (#9427)

Building on the io.Pipe streaming chunk copy: when a copy operation
covers an entire source chunk (the common case for Harbor's
part-size = chunk-size assemble pattern), ask the source volume for
compressed bytes via Accept-Encoding: gzip and forward them to the
destination as-is.

This trades a Range fetch (where the volume decompresses the chunk
internally to satisfy the byte range) for a full-chunk fetch that
returns whatever wire bytes the chunk is stored as. For gzipped
chunks the source volume avoids the decompression entirely; we never
allocate a chunk-sized decompress buffer.

Implementation: build the source GET directly instead of going
through ReadUrlAsStream, because that helper auto-decompresses gzip
responses (which would defeat the point). Trust the response's
Content-Encoding header over caller hints — for partial ranges the
volume always returns raw bytes regardless of how the chunk is
stored, so labeling those as gzip would corrupt subsequent reads.

End-to-end repro impact (512 MiB src, 6 parallel UploadPartCopy):
  + #9420/#9421/#9422       : 2236 MiB
  + io.Pipe streaming       : 1521 MiB
  + this commit             : 1149 MiB  (round 2 RSS, perfectly flat)

Round 3 now completes (was hitting volume-full before, since
chunks took up uncompressed space on disk; we now store the gzipped
chunks the volume gives us, which fit in the test's 8 GiB volume
budget).

Heap inuse_space (after force GC):
  before all: ~1.5 GiB
  this PR:    266 MiB

Volume-side bytes.Buffer.ReadFrom inuse:
  before:     611 MiB
  streaming:  571 MiB
  this PR:    297 MiB (now in destination-volume parseUpload's
                       size-hint decompression — separate
                       optimization opportunity for a hint header)
This commit is contained in:
Chris Lu
2026-05-10 14:55:59 -07:00
committed by GitHub
parent 4a04594826
commit 9a70bbfcc6
3 changed files with 104 additions and 57 deletions
+13 -3
View File
@@ -1055,9 +1055,11 @@ func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath strin
// Stream the chunk through io.Pipe when no in-transit transformation is
// required; this holds only ~32 KiB per copy in flight, vs. the
// chunk-sized buffers the buffered path needs.
// chunk-sized buffers the buffered path needs. isFullChunk=true asks
// the source volume for compressed bytes, so a gzipped chunk is
// forwarded to the destination without anyone having to decompress.
if canStreamCopyChunk(chunk) {
if err := s3a.streamCopyChunkRange(context.Background(), srcUrl, fileId, 0, int64(chunk.Size), assignResult, chunk.IsCompressed); err != nil {
if err := s3a.streamCopyChunkRange(context.Background(), srcUrl, fileId, 0, int64(chunk.Size), true /*isFullChunk*/, assignResult); err != nil {
return nil, fmt.Errorf("stream chunk: %w", err)
}
return dstChunk, nil
@@ -1103,8 +1105,16 @@ func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer
// transformation required (see canStreamCopyChunk for the eligibility
// rules); this is the dominant path for Harbor-style multipart
// assemble loads, which use UploadPartCopy with a CopySourceRange.
//
// When the requested range happens to cover the entire source chunk
// exactly, switch to the full-chunk fetch mode: that asks the source
// volume for compressed bytes (Accept-Encoding: gzip) and forwards
// them as-is, avoiding the volume-side decompression that a Range
// fetch on a gzipped chunk would otherwise pay. Harbor's typical
// part-size = chunk-size assemble pattern hits this branch.
if canStreamCopyChunk(originalChunk) {
if err := s3a.streamCopyChunkRange(context.Background(), srcUrl, fileId, offsetInChunk, int64(rangeChunk.Size), assignResult, originalChunk.IsCompressed); err != nil {
isFullChunk := offsetInChunk == 0 && rangeChunk.Size == originalChunk.Size
if err := s3a.streamCopyChunkRange(context.Background(), srcUrl, fileId, offsetInChunk, int64(rangeChunk.Size), isFullChunk, assignResult); err != nil {
return nil, fmt.Errorf("stream chunk range: %w", err)
}
return dstChunk, nil
@@ -160,7 +160,7 @@ func BenchmarkCopyChunk_Streamed(b *testing.B) {
for i := 0; i < b.N; i++ {
if err := s3a.streamCopyChunkRange(context.Background(),
env.srcSrv.URL, env.assign.FileId, 0, int64(size),
env.assign, false); err != nil {
true /*isFullChunk*/, env.assign); err != nil {
b.Fatalf("stream: %v", err)
}
}
+90 -53
View File
@@ -31,7 +31,8 @@ import (
// In both cases the caller falls back to the buffered copySingleChunk path,
// which already handles those transformations correctly. The streaming
// path is still safe for IsCompressed chunks — gzip is end-to-end and we
// just forward the compressed bytes with the Content-Encoding header.
// just forward the wire bytes (with the destination Content-Encoding
// header set to whatever the source actually returned).
func canStreamCopyChunk(chunk *filer_pb.FileChunk) bool {
if len(chunk.CipherKey) > 0 {
return false
@@ -48,35 +49,96 @@ func canStreamCopyChunk(chunk *filer_pb.FileChunk) bool {
// same multipart wire format that operation.upload_content writes — see
// upload_content.go for the canonical version; we mirror its part headers
// here so the volume's needle.parseUpload accepts the request.
//
// isFullChunk is the load-bearing distinction:
//
// - false (partial range): a Range header is sent to the source volume,
// which serves the requested byte range. For compressed chunks the
// volume internally decompresses to satisfy the range, then returns
// raw bytes (no Content-Encoding); we forward those raw bytes.
// - true (full chunk): we ask the source volume to send compressed
// bytes via Accept-Encoding: gzip, bypass Go's auto-decompression on
// resp.Body, and forward whatever wire bytes arrived to the
// destination. When the chunk is stored gzipped that means we never
// decompress on either the source-volume or s3-server side; the
// destination receives gzipped bytes and stores them as-is.
//
// For the Harbor-style assemble repro (where each UploadPartCopy range
// matches a source chunk boundary), the full-chunk path eliminates the
// "bytes.growSlice from util.DecompressData" cost that dominates the
// volume-side heap profile after the s3-side fixes.
//
// The destination's Content-Encoding header is set from the source's
// response Content-Encoding only — never from a caller-supplied hint.
// Trusting a hint that disagrees with the wire bytes (e.g. caller says
// "compressed" but the volume returned raw bytes for a partial range)
// would label raw bytes as gzip and corrupt reads on the destination.
func (s3a *S3ApiServer) streamCopyChunkRange(
ctx context.Context,
srcUrl, srcFileId string,
offset, size int64,
isFullChunk bool,
assignResult *filer_pb.AssignVolumeResponse,
isCompressed bool,
) error {
// ReadUrlAsStream takes int for size; reject anything that would
// truncate negative on 32-bit. Mirrors the guard in downloadChunkData.
// Range header takes int64, but mirror the int32 bound from
// downloadChunkData so the two paths reject the same inputs.
if size > int64(math.MaxInt32) {
return fmt.Errorf("chunk size %d exceeds maximum int32 size", size)
}
// Child context so a terminal error here unblocks the producer
// goroutine immediately. Without this, a failed POST closes
// pipeReader (which only fails the producer's writes), but
// ReadUrlAsStream can keep draining the source body in its read
// loop until EOF before noticing — wasting source-volume bandwidth
// and CPU. Cancelling streamCtx makes ReadUrlAsStream's per-tick
// ctx.Done() check return on the next iteration.
// Child context so a terminal error here unblocks both legs
// immediately. Without this, a failed POST closes pipeReader
// (which only fails the producer's writes), but the source GET's
// read loop would keep draining srcResp.Body in the background
// until EOF — wasting source-volume bandwidth and CPU on a copy
// that's already failed. Cancelling streamCtx tears down both the
// source GET and the destination POST when this function returns.
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId)
dstJwt := security.EncodedJwt(assignResult.Auth)
srcJwt := filer.JwtForVolumeServer(srcFileId)
// io.Pipe gives us a synchronous handoff: the producer goroutine
// builds the multipart body, the consumer (HTTP transport) reads it.
// Writes block until reads consume them, so the in-flight footprint
// is the pipe's internal hand-off buffer — not the chunk size.
// Source GET: built directly (not through ReadUrlAsStream) because that
// helper auto-decompresses gzipped responses, which would defeat the
// whole-chunk pass-through optimization. We want the wire bytes.
srcReq, err := http.NewRequestWithContext(streamCtx, http.MethodGet, srcUrl, nil)
if err != nil {
return fmt.Errorf("create source GET: %w", err)
}
if srcJwt != "" {
srcReq.Header.Set("Authorization", "BEARER "+srcJwt)
}
if isFullChunk {
// Manually setting Accept-Encoding tells Go's http.Transport that
// the user wants raw bytes back — Go's automatic decompression is
// only applied when it adds the header itself. The server may or
// may not actually gzip; the response Content-Encoding header is
// authoritative.
srcReq.Header.Set("Accept-Encoding", "gzip")
} else {
srcReq.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1))
}
srcResp, err := util_http.GetGlobalHttpClient().Do(srcReq)
if err != nil {
return fmt.Errorf("source GET: %w", err)
}
// CloseResponse drains and closes resp.Body for keepalive.
defer util_http.CloseResponse(srcResp)
if srcResp.StatusCode >= 400 {
return fmt.Errorf("source GET %s: %s", srcUrl, srcResp.Status)
}
// What's actually on the wire — trust this over caller hints. For range
// fetches the volume always returns raw bytes (no Content-Encoding); for
// full-chunk fetches it depends on whether the chunk is stored gzipped.
bodyIsGzipped := srcResp.Header.Get("Content-Encoding") == "gzip"
// io.Pipe gives us a synchronous handoff: the producer goroutine builds
// the multipart body, the consumer (HTTP transport) reads it. Writes
// block until reads consume them, so the in-flight footprint is the
// pipe's hand-off buffer — not the chunk size.
pipeReader, pipeWriter := io.Pipe()
mw := multipart.NewWriter(pipeWriter)
contentType := mw.FormDataContentType()
@@ -94,7 +156,7 @@ func (s3a *S3ApiServer) streamCopyChunkRange(
h.Set("Content-Disposition", mime.FormatMediaType("form-data",
map[string]string{"name": "file", "filename": ""}))
h.Set("Idempotency-Key", dstUrl)
if isCompressed {
if bodyIsGzipped {
h.Set("Content-Encoding", "gzip")
}
@@ -104,40 +166,12 @@ func (s3a *S3ApiServer) streamCopyChunkRange(
return
}
// ReadUrlAsStream reads in 256 KiB ticks and hands each tick to
// the callback. Forwarding directly into fw means each tick is
// flushed through the multipart writer into the pipe, where the
// HTTP transport picks it up — no per-chunk buffering on either
// side of the pipe.
var writeErr error
shouldRetry, readErr := util_http.ReadUrlAsStream(streamCtx, srcUrl, srcJwt, nil, false, false, offset, int(size), func(data []byte) {
if writeErr != nil {
return
if _, err := io.Copy(fw, srcResp.Body); err != nil {
if shouldLogStreamError(err) {
glog.V(2).Infof("stream copy %s offset=%d size=%d: %v",
srcUrl, offset, size, err)
}
if _, err := fw.Write(data); err != nil {
writeErr = err
}
})
if writeErr != nil {
producerErr = fmt.Errorf("stream write: %w", writeErr)
return
}
if readErr != nil {
if shouldRetry {
glog.V(2).Infof("stream copy %s offset=%d size=%d: retryable error: %v",
srcUrl, offset, size, readErr)
}
producerErr = fmt.Errorf("stream read: %w", readErr)
return
}
// shouldRetry can be set without an error (e.g. ReadUrlAsStream
// surfacing a partial-read condition that the buffered path
// re-fetches). Treat it as a failed copy here too — otherwise we
// would close the multipart cleanly and let the destination POST
// succeed against a possibly-truncated body. Mirrors the explicit
// check downloadChunkData makes after ReadUrlAsStream returns.
if shouldRetry {
producerErr = fmt.Errorf("stream read %s offset=%d size=%d: retry needed", srcUrl, offset, size)
producerErr = fmt.Errorf("stream copy: %w", err)
return
}
@@ -149,8 +183,6 @@ func (s3a *S3ApiServer) streamCopyChunkRange(
req, err := http.NewRequestWithContext(streamCtx, http.MethodPost, dstUrl, pipeReader)
if err != nil {
// Drain the pipe so the producer goroutine doesn't leak waiting
// on a never-read writer.
pipeReader.CloseWithError(err)
return fmt.Errorf("create POST request: %w", err)
}
@@ -166,8 +198,6 @@ func (s3a *S3ApiServer) streamCopyChunkRange(
pipeReader.CloseWithError(err)
return fmt.Errorf("POST: %w", err)
}
// CloseResponse drains and closes resp.Body for us; no manual io.Copy
// drain needed for keepalive.
defer util_http.CloseResponse(resp)
if resp.StatusCode >= 400 {
@@ -175,3 +205,10 @@ func (s3a *S3ApiServer) streamCopyChunkRange(
}
return nil
}
func shouldLogStreamError(err error) bool {
// io.ErrClosedPipe shows up when the HTTP transport closed pipeReader
// after the POST failed; the underlying error is reported via the POST
// path, no need to log twice.
return err != nil && err != io.ErrClosedPipe
}