mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(s3api): validate SSE-S3 chunk IV length; add multipart direct reader tests (#9218)
* fix(s3api): validate SSE-S3 chunk IV length; add multipart direct reader tests DeserializeSSES3Metadata does not require an IV, and a corrupted or legacy chunk without one would have flowed into cipher.NewCTR and panicked. Validate that each per-chunk IV is exactly AESBlockSize bytes before decryption, closing the current and any already-appended chunk readers on error. Factor the per-chunk decryption loop out of createMultipartSSES3DecryptedReaderDirect into buildMultipartSSES3Reader so it can be driven with a mock chunk fetcher, and add tests covering: the happy path with two parts (distinct per-chunk DEKs/IVs, out-of-order chunks) to lock in the fix from #9211; missing-IV and short-IV metadata rejection without panic; and reader cleanup when a later chunk fails. * address review: sort chunks copy; close encryptedStream on error - buildMultipartSSES3Reader now sorts a copy of the chunks slice so callers do not observe entry.Chunks reordered (other code paths, e.g. ETag computation, can rely on the original order). - createMultipartSSES3DecryptedReaderDirect now closes encryptedStream on the error path from buildMultipartSSES3Reader. All current callers pass nil, but this keeps cleanup symmetric with the success path. - Extend TestBuildMultipartSSES3Reader_PerChunkKeys to assert the input slice is not mutated. * address review: defer single close; extend chunk-copy + IV-guard pattern - createMultipartSSES3DecryptedReaderDirect: collapse the duplicated encryptedStream.Close() calls into a single nil-guarded defer so the error and success paths share cleanup. - createMultipartSSECDecryptedReaderDirect, createMultipartSSEKMSDecryptedReaderDirect: sort a copy of entry.Chunks instead of mutating the caller's slice, matching the SSE-S3 helper. - createMultipartSSECDecryptedReaderDirect: validate per-chunk IV length before handing it to cipher.NewCTR; a base64-decoded empty or short IV from malformed/corrupt metadata would otherwise panic. - SSE-KMS needs no IV guard: CreateSSEKMSDecryptedReader already calls ValidateIV before cipher.NewCTR. Note recorded in the sort comment. * address review: close appended readers on SSE-C/SSE-KMS error paths createMultipartSSECDecryptedReaderDirect and createMultipartSSEKMSDecryptedReaderDirect only closed the current chunk reader on error and leaked any chunk readers already appended to the local readers slice, mirroring the leak previously fixed in the SSE-S3 helper. Add the same closeAppendedReaders() closure pattern to both functions and invoke it on every error return inside the loop so failed requests do not leak volume-server HTTP connections. * address review: defer encryptedStream close in SSE-C/SSE-KMS; drop chunks reassignment - Move encryptedStream.Close() to a nil-guarded defer at the top of createMultipartSSECDecryptedReaderDirect and createMultipartSSEKMSDecryptedReaderDirect so the stream is closed on every return path (including error returns from inside the per-chunk loop), mirroring the SSE-S3 helper. - In buildMultipartSSES3Reader, iterate sortedChunks directly instead of reassigning chunks = sortedChunks.
This commit is contained in:
@@ -2,7 +2,11 @@ package s3api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
@@ -323,3 +327,255 @@ func TestSSES3EndToEndWithDetectPrimaryType(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// initSSES3KeyManagerForTest resets the global SSE-S3 key manager and seeds it
|
||||
// with a deterministic super key suitable for envelope-encrypt/decrypt cycles
|
||||
// inside tests. Returns the freshly initialized manager.
|
||||
func initSSES3KeyManagerForTest(t *testing.T) *SSES3KeyManager {
|
||||
t.Helper()
|
||||
globalSSES3KeyManager = NewSSES3KeyManager()
|
||||
t.Cleanup(func() { globalSSES3KeyManager = NewSSES3KeyManager() })
|
||||
km := GetSSES3KeyManager()
|
||||
km.superKey = make([]byte, 32)
|
||||
for i := range km.superKey {
|
||||
km.superKey[i] = byte(i)
|
||||
}
|
||||
return km
|
||||
}
|
||||
|
||||
// encryptSSES3Part encrypts data with a freshly generated DEK and returns the
|
||||
// ciphertext plus serialized per-chunk metadata (DEK + IV). Mirrors what the
|
||||
// multipart upload path writes into each part's chunk metadata.
|
||||
func encryptSSES3Part(t *testing.T, data []byte) (ciphertext, metadata []byte) {
|
||||
t.Helper()
|
||||
key, err := GenerateSSES3Key()
|
||||
if err != nil {
|
||||
t.Fatalf("GenerateSSES3Key: %v", err)
|
||||
}
|
||||
encReader, iv, err := CreateSSES3EncryptedReader(bytes.NewReader(data), key)
|
||||
if err != nil {
|
||||
t.Fatalf("CreateSSES3EncryptedReader: %v", err)
|
||||
}
|
||||
ciphertext, err = io.ReadAll(encReader)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadAll ciphertext: %v", err)
|
||||
}
|
||||
key.IV = iv
|
||||
metadata, err = SerializeSSES3Metadata(key)
|
||||
if err != nil {
|
||||
t.Fatalf("SerializeSSES3Metadata: %v", err)
|
||||
}
|
||||
return ciphertext, metadata
|
||||
}
|
||||
|
||||
// TestBuildMultipartSSES3Reader_PerChunkKeys locks in the fix from PR #9211:
|
||||
// each multipart part has its own DEK and IV, and the direct multipart reader
|
||||
// must decrypt each chunk with its own per-chunk metadata (not the entry-level
|
||||
// key). Before the fix, using a shared entry key produced garbled output.
|
||||
func TestBuildMultipartSSES3Reader_PerChunkKeys(t *testing.T) {
|
||||
keyManager := initSSES3KeyManagerForTest(t)
|
||||
|
||||
// Two parts with distinct sizes (including a short final part) and
|
||||
// independent DEKs/IVs to exercise the per-chunk key plumbing.
|
||||
part1Plaintext := bytes.Repeat([]byte("ABCDEFGHIJKLMNOP"), 16) // 256 bytes
|
||||
part2Plaintext := []byte("short tail part") // 15 bytes
|
||||
|
||||
cipher1, meta1 := encryptSSES3Part(t, part1Plaintext)
|
||||
cipher2, meta2 := encryptSSES3Part(t, part2Plaintext)
|
||||
|
||||
chunks := []*filer_pb.FileChunk{
|
||||
{
|
||||
FileId: "1,aaa",
|
||||
Offset: 0,
|
||||
Size: uint64(len(cipher1)),
|
||||
SseType: filer_pb.SSEType_SSE_S3,
|
||||
SseMetadata: meta1,
|
||||
},
|
||||
{
|
||||
FileId: "2,bbb",
|
||||
Offset: int64(len(part1Plaintext)),
|
||||
Size: uint64(len(cipher2)),
|
||||
SseType: filer_pb.SSEType_SSE_S3,
|
||||
SseMetadata: meta2,
|
||||
},
|
||||
}
|
||||
// Pass chunks out of order to verify offset-based sort.
|
||||
shuffled := []*filer_pb.FileChunk{chunks[1], chunks[0]}
|
||||
// Snapshot the input ordering; the helper must not mutate the caller's
|
||||
// slice, which is backed by entry.Chunks and relied on elsewhere (e.g.
|
||||
// ETag computation).
|
||||
shuffledOrderBefore := []*filer_pb.FileChunk{shuffled[0], shuffled[1]}
|
||||
|
||||
fetched := map[string]int{}
|
||||
chunkData := map[string][]byte{
|
||||
"1,aaa": cipher1,
|
||||
"2,bbb": cipher2,
|
||||
}
|
||||
fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) {
|
||||
fetched[c.GetFileIdString()]++
|
||||
data, ok := chunkData[c.GetFileIdString()]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected chunk %s", c.GetFileIdString())
|
||||
}
|
||||
return io.NopCloser(bytes.NewReader(data)), nil
|
||||
}
|
||||
|
||||
reader, err := buildMultipartSSES3Reader(shuffled, keyManager, fetch)
|
||||
if err != nil {
|
||||
t.Fatalf("buildMultipartSSES3Reader: %v", err)
|
||||
}
|
||||
|
||||
got, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadAll reader: %v", err)
|
||||
}
|
||||
|
||||
want := append(append([]byte{}, part1Plaintext...), part2Plaintext...)
|
||||
if !bytes.Equal(got, want) {
|
||||
t.Fatalf("decrypted output mismatch\n want (len=%d): %q\n got (len=%d): %q",
|
||||
len(want), want, len(got), got)
|
||||
}
|
||||
if fetched["1,aaa"] != 1 || fetched["2,bbb"] != 1 {
|
||||
t.Errorf("expected each chunk fetched once, got %v", fetched)
|
||||
}
|
||||
for i := range shuffledOrderBefore {
|
||||
if shuffled[i] != shuffledOrderBefore[i] {
|
||||
t.Errorf("caller's chunk slice was reordered at index %d: before=%s after=%s",
|
||||
i, shuffledOrderBefore[i].GetFileIdString(), shuffled[i].GetFileIdString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildMultipartSSES3Reader_InvalidIVLength verifies that per-chunk metadata
|
||||
// with a missing or short IV is rejected with a clear error instead of
|
||||
// panicking inside cipher.NewCTR. The short-IV case is crafted by encoding the
|
||||
// metadata JSON directly, since SerializeSSES3Metadata refuses to emit a bad
|
||||
// IV; this simulates corrupted or legacy on-disk metadata.
|
||||
func TestBuildMultipartSSES3Reader_InvalidIVLength(t *testing.T) {
|
||||
keyManager := initSSES3KeyManagerForTest(t)
|
||||
|
||||
// Pre-encrypt the DEK with the current super key so Deserialize can unwrap
|
||||
// it successfully and we reach the IV-length check.
|
||||
dek := bytes.Repeat([]byte{0x42}, s3_constants.AESKeySize)
|
||||
encryptedDEK, nonce, err := keyManager.encryptKeyWithSuperKey(dek)
|
||||
if err != nil {
|
||||
t.Fatalf("encryptKeyWithSuperKey: %v", err)
|
||||
}
|
||||
|
||||
makeMetadata := func(iv []byte) []byte {
|
||||
meta := map[string]string{
|
||||
"algorithm": s3_constants.SSEAlgorithmAES256,
|
||||
"keyId": "test-key",
|
||||
"encryptedDEK": base64.StdEncoding.EncodeToString(encryptedDEK),
|
||||
"nonce": base64.StdEncoding.EncodeToString(nonce),
|
||||
}
|
||||
if iv != nil {
|
||||
meta["iv"] = base64.StdEncoding.EncodeToString(iv)
|
||||
}
|
||||
out, err := json.Marshal(meta)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal metadata: %v", err)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
iv []byte
|
||||
}{
|
||||
{"missing IV", nil},
|
||||
{"short IV", []byte("too-short")}, // 9 bytes, not 16
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
closed := false
|
||||
fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) {
|
||||
return &closeTrackingReadCloser{Reader: bytes.NewReader([]byte("whatever")), closed: &closed}, nil
|
||||
}
|
||||
|
||||
chunks := []*filer_pb.FileChunk{
|
||||
{
|
||||
FileId: "1,bad",
|
||||
Offset: 0,
|
||||
Size: 8,
|
||||
SseType: filer_pb.SSEType_SSE_S3,
|
||||
SseMetadata: makeMetadata(tc.iv),
|
||||
},
|
||||
}
|
||||
|
||||
_, err := buildMultipartSSES3Reader(chunks, keyManager, fetch)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for invalid IV length, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "invalid IV length") {
|
||||
t.Errorf("expected 'invalid IV length' in error, got: %v", err)
|
||||
}
|
||||
if !closed {
|
||||
t.Error("chunk reader for the bad chunk was not closed on error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildMultipartSSES3Reader_ClosesAppendedOnError verifies that when a
|
||||
// later chunk fails (e.g., malformed metadata), readers already appended for
|
||||
// earlier valid chunks are closed so volume-server HTTP connections do not leak.
|
||||
func TestBuildMultipartSSES3Reader_ClosesAppendedOnError(t *testing.T) {
|
||||
keyManager := initSSES3KeyManagerForTest(t)
|
||||
|
||||
// First chunk: valid SSE-S3 chunk.
|
||||
cipher1, meta1 := encryptSSES3Part(t, []byte("first chunk plaintext"))
|
||||
|
||||
// Second chunk: missing per-chunk metadata, triggers error after first is
|
||||
// already appended.
|
||||
chunks := []*filer_pb.FileChunk{
|
||||
{
|
||||
FileId: "1,good",
|
||||
Offset: 0,
|
||||
Size: uint64(len(cipher1)),
|
||||
SseType: filer_pb.SSEType_SSE_S3,
|
||||
SseMetadata: meta1,
|
||||
},
|
||||
{
|
||||
FileId: "2,bad",
|
||||
Offset: int64(len(cipher1)),
|
||||
Size: 1,
|
||||
SseType: filer_pb.SSEType_SSE_S3,
|
||||
SseMetadata: nil, // triggers "missing per-chunk metadata"
|
||||
},
|
||||
}
|
||||
|
||||
firstClosed := false
|
||||
secondClosed := false
|
||||
fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) {
|
||||
switch c.GetFileIdString() {
|
||||
case "1,good":
|
||||
return &closeTrackingReadCloser{Reader: bytes.NewReader(cipher1), closed: &firstClosed}, nil
|
||||
case "2,bad":
|
||||
return &closeTrackingReadCloser{Reader: bytes.NewReader([]byte("x")), closed: &secondClosed}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("unexpected chunk %s", c.GetFileIdString())
|
||||
}
|
||||
|
||||
_, err := buildMultipartSSES3Reader(chunks, keyManager, fetch)
|
||||
if err == nil {
|
||||
t.Fatal("expected error from missing chunk metadata, got nil")
|
||||
}
|
||||
if !firstClosed {
|
||||
t.Error("previously appended chunk reader was not closed on error")
|
||||
}
|
||||
if !secondClosed {
|
||||
t.Error("chunk reader for the failing chunk was not closed on error")
|
||||
}
|
||||
}
|
||||
|
||||
type closeTrackingReadCloser struct {
|
||||
io.Reader
|
||||
closed *bool
|
||||
}
|
||||
|
||||
func (r *closeTrackingReadCloser) Close() error {
|
||||
*r.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2593,19 +2593,40 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
|
||||
// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O.
|
||||
// It's kept in the signature for API consistency with non-Direct versions.
|
||||
func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) {
|
||||
// Sort chunks by offset to ensure correct order
|
||||
chunks := entry.GetChunks()
|
||||
// Close the original encrypted stream since chunks are fetched individually.
|
||||
// Defer so the stream is closed on every return path (including error
|
||||
// returns from inside the per-chunk loop), matching the SSE-S3 helper.
|
||||
if encryptedStream != nil {
|
||||
defer encryptedStream.Close()
|
||||
}
|
||||
|
||||
// Sort a copy of the slice so entry.Chunks is not reordered (other code
|
||||
// paths, e.g. ETag computation, can rely on the original chunk order).
|
||||
originalChunks := entry.GetChunks()
|
||||
chunks := make([]*filer_pb.FileChunk, len(originalChunks))
|
||||
copy(chunks, originalChunks)
|
||||
sort.Slice(chunks, func(i, j int) bool {
|
||||
return chunks[i].GetOffset() < chunks[j].GetOffset()
|
||||
})
|
||||
|
||||
// Create readers for each chunk, decrypting them independently
|
||||
var readers []io.Reader
|
||||
readers := make([]io.Reader, 0, len(chunks))
|
||||
|
||||
// Close any readers already appended to `readers` on error paths, to avoid
|
||||
// leaking volume-server HTTP connections.
|
||||
closeAppendedReaders := func() {
|
||||
for _, r := range readers {
|
||||
if closer, ok := r.(io.Closer); ok {
|
||||
closer.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, chunk := range chunks {
|
||||
// Get this chunk's encrypted data
|
||||
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
|
||||
if err != nil {
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
|
||||
}
|
||||
|
||||
@@ -2614,6 +2635,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Con
|
||||
// Check if this chunk has per-chunk SSE-C metadata
|
||||
if len(chunk.GetSseMetadata()) == 0 {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("SSE-C chunk %s missing per-chunk metadata", chunk.GetFileIdString())
|
||||
}
|
||||
|
||||
@@ -2621,6 +2643,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Con
|
||||
ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata())
|
||||
if err != nil {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), err)
|
||||
}
|
||||
|
||||
@@ -2628,8 +2651,17 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Con
|
||||
chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV)
|
||||
if err != nil {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err)
|
||||
}
|
||||
// Guard cipher.NewCTR against a missing/short IV (base64 decode of
|
||||
// an empty or malformed field would otherwise reach it and panic).
|
||||
if len(chunkIV) != s3_constants.AESBlockSize {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("SSE-C chunk %s has invalid IV length %d (expected %d)",
|
||||
chunk.GetFileIdString(), len(chunkIV), s3_constants.AESBlockSize)
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d",
|
||||
chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset)
|
||||
@@ -2647,11 +2679,13 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Con
|
||||
partOffset := ssecMetadata.PartOffset
|
||||
if partOffset < 0 {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("invalid SSE-C part offset %d for chunk %s", partOffset, chunk.GetFileIdString())
|
||||
}
|
||||
decryptedChunkReader, decErr := CreateSSECDecryptedReaderWithOffset(chunkReader, customerKey, chunkIV, uint64(partOffset))
|
||||
if decErr != nil {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
|
||||
}
|
||||
|
||||
@@ -2671,11 +2705,6 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Con
|
||||
}
|
||||
}
|
||||
|
||||
// Close the original encrypted stream since we're reading chunks individually
|
||||
if encryptedStream != nil {
|
||||
encryptedStream.Close()
|
||||
}
|
||||
|
||||
return NewMultipartSSEReader(readers), nil
|
||||
}
|
||||
|
||||
@@ -2683,19 +2712,41 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Con
|
||||
// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O.
|
||||
// It's kept in the signature for API consistency with non-Direct versions.
|
||||
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) {
|
||||
// Sort chunks by offset to ensure correct order
|
||||
chunks := entry.GetChunks()
|
||||
// Close the original encrypted stream since chunks are fetched individually.
|
||||
// Defer so the stream is closed on every return path (including error
|
||||
// returns from inside the per-chunk loop), matching the SSE-S3 helper.
|
||||
if encryptedStream != nil {
|
||||
defer encryptedStream.Close()
|
||||
}
|
||||
|
||||
// Sort a copy of the slice so entry.Chunks is not reordered (other code
|
||||
// paths, e.g. ETag computation, can rely on the original chunk order).
|
||||
// IV length is validated inside CreateSSEKMSDecryptedReader via ValidateIV.
|
||||
originalChunks := entry.GetChunks()
|
||||
chunks := make([]*filer_pb.FileChunk, len(originalChunks))
|
||||
copy(chunks, originalChunks)
|
||||
sort.Slice(chunks, func(i, j int) bool {
|
||||
return chunks[i].GetOffset() < chunks[j].GetOffset()
|
||||
})
|
||||
|
||||
// Create readers for each chunk, decrypting them independently
|
||||
var readers []io.Reader
|
||||
readers := make([]io.Reader, 0, len(chunks))
|
||||
|
||||
// Close any readers already appended to `readers` on error paths, to avoid
|
||||
// leaking volume-server HTTP connections.
|
||||
closeAppendedReaders := func() {
|
||||
for _, r := range readers {
|
||||
if closer, ok := r.(io.Closer); ok {
|
||||
closer.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, chunk := range chunks {
|
||||
// Get this chunk's encrypted data
|
||||
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
|
||||
if err != nil {
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
|
||||
}
|
||||
|
||||
@@ -2704,6 +2755,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.C
|
||||
// Check if this chunk has per-chunk SSE-KMS metadata
|
||||
if len(chunk.GetSseMetadata()) == 0 {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("SSE-KMS chunk %s missing per-chunk metadata", chunk.GetFileIdString())
|
||||
}
|
||||
|
||||
@@ -2711,6 +2763,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.C
|
||||
kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
|
||||
if err != nil {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
|
||||
}
|
||||
|
||||
@@ -2721,6 +2774,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.C
|
||||
decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, kmsKey)
|
||||
if decErr != nil {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
|
||||
}
|
||||
|
||||
@@ -2740,11 +2794,6 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.C
|
||||
}
|
||||
}
|
||||
|
||||
// Close the original encrypted stream since we're reading chunks individually
|
||||
if encryptedStream != nil {
|
||||
encryptedStream.Close()
|
||||
}
|
||||
|
||||
return NewMultipartSSEReader(readers), nil
|
||||
}
|
||||
|
||||
@@ -2752,14 +2801,32 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.C
|
||||
// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O.
|
||||
// It's kept in the signature for API consistency with non-Direct versions.
|
||||
func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) {
|
||||
// Sort chunks by offset to ensure correct order
|
||||
chunks := entry.GetChunks()
|
||||
sort.Slice(chunks, func(i, j int) bool {
|
||||
return chunks[i].GetOffset() < chunks[j].GetOffset()
|
||||
// Close the original encrypted stream since chunks are fetched individually.
|
||||
if encryptedStream != nil {
|
||||
defer encryptedStream.Close()
|
||||
}
|
||||
|
||||
return buildMultipartSSES3Reader(entry.GetChunks(), GetSSES3KeyManager(), func(chunk *filer_pb.FileChunk) (io.ReadCloser, error) {
|
||||
return s3a.createEncryptedChunkReader(ctx, chunk)
|
||||
})
|
||||
}
|
||||
|
||||
// buildMultipartSSES3Reader composes a decrypted reader from a set of multipart
|
||||
// SSE-S3 chunks. Chunks are fetched via fetchChunk and decrypted using their
|
||||
// per-chunk metadata (each multipart part has its own DEK and IV). Exposed as a
|
||||
// standalone helper so tests can inject a mock chunk fetcher.
|
||||
func buildMultipartSSES3Reader(chunks []*filer_pb.FileChunk, keyManager *SSES3KeyManager, fetchChunk func(*filer_pb.FileChunk) (io.ReadCloser, error)) (io.Reader, error) {
|
||||
// Sort a copy of the slice so callers do not observe their input chunks
|
||||
// reordered (the backing array is shared with entry.Chunks, which other
|
||||
// code may rely on being in its original order, e.g. for ETag computation).
|
||||
sortedChunks := make([]*filer_pb.FileChunk, len(chunks))
|
||||
copy(sortedChunks, chunks)
|
||||
sort.Slice(sortedChunks, func(i, j int) bool {
|
||||
return sortedChunks[i].GetOffset() < sortedChunks[j].GetOffset()
|
||||
})
|
||||
|
||||
// Create readers for each chunk, decrypting them independently
|
||||
readers := make([]io.Reader, 0, len(chunks))
|
||||
readers := make([]io.Reader, 0, len(sortedChunks))
|
||||
|
||||
// Close any readers already appended to `readers` on error paths, to avoid
|
||||
// leaking volume-server HTTP connections.
|
||||
@@ -2771,12 +2838,9 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Co
|
||||
}
|
||||
}
|
||||
|
||||
// Get key manager for deserializing per-chunk SSE-S3 metadata
|
||||
keyManager := GetSSES3KeyManager()
|
||||
|
||||
for _, chunk := range chunks {
|
||||
for _, chunk := range sortedChunks {
|
||||
// Get this chunk's encrypted data
|
||||
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
|
||||
chunkReader, err := fetchChunk(chunk)
|
||||
if err != nil {
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
|
||||
@@ -2799,8 +2863,16 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Co
|
||||
return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err)
|
||||
}
|
||||
|
||||
// Use the IV from the chunk metadata
|
||||
// Use the IV from the chunk metadata. DeserializeSSES3Metadata does
|
||||
// not require an IV, so validate the length here before it reaches
|
||||
// cipher.NewCTR, which would otherwise panic on a nil or short IV.
|
||||
iv := chunkSSES3Metadata.IV
|
||||
if len(iv) != s3_constants.AESBlockSize {
|
||||
chunkReader.Close()
|
||||
closeAppendedReaders()
|
||||
return nil, fmt.Errorf("SSE-S3 chunk %s has invalid IV length %d (expected %d)",
|
||||
chunk.GetFileIdString(), len(iv), s3_constants.AESBlockSize)
|
||||
}
|
||||
glog.V(4).Infof("Decrypting SSE-S3 chunk %s with KeyID=%s, IV length=%d",
|
||||
chunk.GetFileIdString(), chunkSSES3Metadata.KeyID, len(iv))
|
||||
|
||||
@@ -2828,11 +2900,6 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Co
|
||||
}
|
||||
}
|
||||
|
||||
// Close the original encrypted stream since we're reading chunks individually
|
||||
if encryptedStream != nil {
|
||||
encryptedStream.Close()
|
||||
}
|
||||
|
||||
return NewMultipartSSEReader(readers), nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user