Files
seaweedfs/weed/s3api/filer_multipart.go
T
Chris Lu 525900dfe4 fix(s3api): backfill multipart SSE-S3 metadata at completion (#9224)
* fix(s3api): backfill missing per-chunk SSE-S3 metadata at completion

When a part of an SSE-S3 multipart upload lands with SseType=NONE on
its chunks (e.g. a transient failure to apply SSE-S3 setup in
PutObjectPart), the completed object inherits NONE-tagged chunks and
detectPrimarySSEType then misses the chunked SSE-S3 encryption. The
read path falls through to the unencrypted serve and GET returns
ciphertext, producing the SHA mismatch reported in #8908.

Recover at completion using the base IV and key data the upload
directory recorded at CreateMultipartUpload:

  - extractMultipartSSES3Info validates upload-entry metadata up
    front and hard-fails completion if the base IV or key data are
    malformed; serializing chunk metadata we then could not decrypt
    is worse than rejecting the upload.
  - completedMultipartChunk re-derives a per-chunk IV from baseIV +
    chunk.Offset (matching what putToFiler would have written) and
    serializes per-chunk SSE-S3 metadata when the chunk has no tag.
    Existing per-chunk metadata is left alone; we cannot recover an
    already-derived IV from the upload-entry alone.

The IV formula intentionally has no partNumber term: putToFiler
hardcodes partOffset=0 when it calls handleSSES3MultipartEncryption
for every part, so each chunk's encryption IV is
calculateIVWithOffset(baseIV, chunk.Offset_part_local).
PartOffsetMultiplier is defined in s3_constants but is not consumed
by the encryption path. Adopting (partNumber-1)*PartOffsetMultiplier
+ chunk.Offset would produce IVs that fail to decrypt the bytes on
disk - a stronger failure mode than the bug being fixed. Tests pin
this:

  - TestCompletedMultipartChunkBackfilledIVDecryptsActualCiphertext
    runs the round trip across the encryption boundary: encrypt
    parts with CreateSSES3EncryptedReaderWithBaseIV (the call
    putToFiler uses), drop chunk metadata to reproduce #8908,
    backfill, decrypt with backfilled IV, assert plaintext intact.
  - TestCompletedMultipartChunkRejectsPartNumberMultiplierFormula
    constructs the IV the partNumber formula would produce and
    shows it does not decrypt the actual ciphertext.

This commit covers the chunk-level recovery only. The companion
fix for the object-level Extended attributes (SeaweedFSSSES3Key /
X-Amz-Server-Side-Encryption) follows separately.

* fix(s3api): backfill canonical SSE-S3 attributes onto multipart object

The previous commit ensures every chunk of an SSE-S3 multipart upload
carries SseType=SSE_S3 with a per-chunk IV, so the multipart-direct
read path can decrypt. The completed object's Extended map can still
miss the canonical pair detectPrimarySSEType and IsSSES3EncryptedInternal
look at:

  - X-Amz-Server-Side-Encryption (the AmzServerSideEncryption header
    detectPrimarySSEType reads on inline / small-object reads)
  - x-seaweedfs-sse-s3-key (SeaweedFSSSES3Key, required by
    IsSSES3EncryptedInternal and by the read-path key lookup)

When a part of the upload was written by a path that did not set
those (the same #8908 race that produced the NONE chunks),
copySSEHeadersFromFirstPart finds nothing to copy and the final entry
ends up with only the multipart-init keys (SeaweedFSSSES3Encryption /
BaseIV / KeyData). The read path then mis-detects the object as
unencrypted.

applyMultipartSSES3HeadersFromUploadEntry writes the canonical pair
from the multipart-init metadata in all three completion paths
(versioned, suspended, non-versioned), only when the keys are missing
so a healthy first part still wins. extractMultipartSSES3Info already
ran in prepareMultipartCompletionState, so the data is reused without
re-decoding.

Tests: TestApplyMultipartSSES3HeadersFromUploadEntry covers backfill,
do-not-clobber, and nil-info no-op cases.

* fix(s3api): drop double IV adjustment in SSE-KMS chunk view decrypt

decryptSSEKMSChunkView was pre-adjusting the SSE-KMS chunk IV
(calculateIVWithOffset(baseIV, ChunkOffset)) and then handing the
adjusted IV to CreateSSEKMSDecryptedReader, which itself runs
calculateIVWithOffset(IV, ChunkOffset) on whatever it receives. The
offset was being applied twice for any chunk with a non-zero
ChunkOffset, corrupting the keystream for range reads that cross
multipart chunk boundaries.

Pass the raw SSE-KMS key (with base IV and the original ChunkOffset
field) into CreateSSEKMSDecryptedReader so the offset is applied
exactly once, and remove the now-dead intra-block skip that was
compensating for the double adjustment.

Add an anti-test inside TestSSEKMSDecryptChunkView_RequiresOffsetAdjustment
that decrypts the same ciphertext with a deliberately double-adjusted
IV and asserts the output is corrupted, so any regression that
re-introduces the double application fails the unit test.

* test(s3): cover multipart SSE across chunk-spanning parts and ranges

Adds an integration subtest "Multipart Parts Larger Than Internal
Chunks Across SSE Types" to TestSSEMultipartUploadIntegration that
exercises the end-to-end S3 path for the bugs fixed in this branch:

  - Two-part multipart upload with each part larger than the 8MB
    internal SeaweedFS chunk, so each part itself spans multiple
    underlying chunks.
  - Subtests for SSE-C, SSE-KMS, explicit SSE-S3, and bucket-default
    SSE-S3 - the four paths multipart parts can take through the SSE
    pipeline.
  - Each subtest does a full GET (verifying every byte and the
    response Content-Length / SSE response headers) plus a 129-byte
    range read straddling the 8MB internal chunk boundary, which is
    the path that produced the SSE-KMS double-IV corruption (fix in
    the previous commit) and the SSE-S3 chunk-tag loss (fix in the
    earlier commits).

Factored the request shape behind multipartSSEOptions /
uploadAndVerifyMultipartSSEObject so all four SSE flavors share the
same upload+verify code; only the SSE-specific input/output
configuration differs per subtest.

* test(s3): abort orphan multipart uploads on test failure

Address coderabbit nitpick on uploadAndVerifyMultipartSSEObject. The
helper used require.NoError after CreateMultipartUpload, UploadPart
and CompleteMultipartUpload, so a failure in any of those (or in the
later GET / range read on a still-incomplete upload) called t.Fatal
without aborting the in-flight MPU, leaving an orphan upload in the
bucket. Harmless in CI where the data dir is wiped on shutdown, but a
real annoyance when iterating locally and a textbook AWS S3 caveat in
production.

Register a t.Cleanup that calls AbortMultipartUpload unless a
"completed" flag was set right after a successful
CompleteMultipartUpload. Use context.Background for the abort call
since the parent ctx may already be cancelled at cleanup time, and
t.Logf the abort error rather than failing the test so the original
failure remains visible in the run output.
2026-04-25 23:06:37 -07:00

1325 lines
53 KiB
Go

package s3api
import (
"cmp"
"crypto/md5"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"math"
"net/url"
"path"
"slices"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
const (
multipartExt = ".part"
multiPartMinSize = 5 * 1024 * 1024
)
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"`
s3.CreateMultipartUploadOutput
}
// getRequestScheme determines the URL scheme (http or https) from the request
// Checks X-Forwarded-Proto header first (for proxies), then TLS state
func getRequestScheme(r *http.Request) string {
// Check X-Forwarded-Proto header for proxied requests
if proto := r.Header.Get("X-Forwarded-Proto"); proto != "" {
return proto
}
// Check if connection is TLS
if r.TLS != nil {
return "https"
}
return "http"
}
func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("createMultipartUpload input %v", input)
uploadIdString := s3a.generateUploadID(*input.Key)
uploadIdString = uploadIdString + "_" + strings.ReplaceAll(uuid.New().String(), "-", "")
// Validate checksum algorithm before creating the upload directory
_, checksumHeaderName, checksumErrCode := detectRequestedChecksumAlgorithm(r)
if checksumErrCode != s3err.ErrNone {
return nil, checksumErrCode
}
// Prepare error handling outside callback scope
var encryptionError error
if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtMultipartObjectKey] = []byte(*input.Key)
// Set object owner for multipart upload
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range input.Metadata {
entry.Extended[k] = []byte(*v)
}
if input.ContentType != nil {
entry.Attributes.Mime = *input.ContentType
}
// Prepare and apply encryption configuration within directory creation
// This ensures encryption resources are only allocated if directory creation succeeds
encryptionConfig, prepErr := s3a.prepareMultipartEncryptionConfig(r, *input.Bucket, uploadIdString)
if prepErr != nil {
encryptionError = prepErr
return // Exit callback, letting mkdir handle the error
}
s3a.applyMultipartEncryptionConfig(entry, encryptionConfig)
// Store the requested checksum algorithm so CompleteMultipartUpload can compute
// a composite checksum from per-part checksums
if checksumHeaderName != "" {
entry.Extended[s3_constants.ExtChecksumAlgorithm] = []byte(checksumHeaderName)
}
// Extract and store object lock metadata from request headers
// This ensures object lock settings from create_multipart_upload are preserved
if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
glog.Errorf("createMultipartUpload: failed to extract object lock metadata: %v", err)
// Don't fail the upload - this matches AWS behavior for invalid metadata
}
}); err != nil {
_, errorCode := handleMultipartInternalError("create multipart upload directory", err)
return nil, errorCode
}
// Check for encryption configuration errors that occurred within the callback
if encryptionError != nil {
_, errorCode := handleMultipartInternalError("prepare encryption configuration", encryptionError)
return nil, errorCode
}
output = &InitiateMultipartUploadResult{
CreateMultipartUploadOutput: s3.CreateMultipartUploadOutput{
Bucket: input.Bucket,
Key: objectKey(input.Key),
UploadId: aws.String(uploadIdString),
},
}
return
}
type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"`
Location *string `xml:"Location,omitempty"`
Bucket *string `xml:"Bucket,omitempty"`
Key *string `xml:"Key,omitempty"`
ETag *string `xml:"ETag,omitempty"`
// Checksum fields — returned as HTTP response headers, not in the XML body
ChecksumHeaderName string `xml:"-"`
ChecksumValue string `xml:"-"`
// VersionId is NOT included in XML body - it should only be in x-amz-version-id HTTP header
// Store the VersionId internally for setting HTTP header, but don't marshal to XML
VersionId *string `xml:"-"`
}
// copySSEHeadersFromFirstPart copies all SSE-related headers from the first part to the destination entry
// This is critical for detectPrimarySSEType to work correctly and ensures encryption metadata is preserved
func copySSEHeadersFromFirstPart(dst *filer_pb.Entry, firstPart *filer_pb.Entry, context string) {
if firstPart == nil || firstPart.Extended == nil {
return
}
// Copy ALL SSE-related headers (not just SeaweedFSSSEKMSKey)
sseKeys := []string{
// SSE-C headers
s3_constants.SeaweedFSSSEIV,
s3_constants.AmzServerSideEncryptionCustomerAlgorithm,
s3_constants.AmzServerSideEncryptionCustomerKeyMD5,
// SSE-KMS headers
s3_constants.SeaweedFSSSEKMSKey,
s3_constants.AmzServerSideEncryptionAwsKmsKeyId,
// SSE-S3 headers
s3_constants.SeaweedFSSSES3Key,
// Common SSE header (for SSE-KMS and SSE-S3)
s3_constants.AmzServerSideEncryption,
}
for _, key := range sseKeys {
if value, exists := firstPart.Extended[key]; exists {
dst.Extended[key] = value
glog.V(4).Infof("completeMultipartUpload: copied SSE header %s from first part (%s)", key, context)
}
}
}
type multipartPartBoundary struct {
PartNumber int `json:"part"`
StartChunk int `json:"start"`
EndChunk int `json:"end"`
ETag string `json:"etag"`
}
type multipartSSES3Info struct {
keyData []byte
key *SSES3Key
baseIV []byte
}
type multipartCompletionState struct {
deleteEntries []*filer_pb.Entry
partEntries map[int][]*filer_pb.Entry
pentry *filer_pb.Entry
sses3Info *multipartSSES3Info
mime string
finalParts []*filer_pb.FileChunk
offset int64
partBoundaries []multipartPartBoundary
multipartETag string
entityWithTtl bool
checksumHeaderName string // e.g. "X-Amz-Checksum-Crc32", empty if no checksum
checksumValue string // composite base64 checksum with "-N" suffix
}
func completeMultipartResult(r *http.Request, input *s3.CompleteMultipartUploadInput, etag string, entry *filer_pb.Entry) *CompleteMultipartUploadResult {
result := &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etag),
Key: objectKey(input.Key),
}
if entry != nil && entry.Extended != nil {
if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
versionId := string(versionIdBytes)
if versionId != "" && versionId != "null" {
result.VersionId = aws.String(versionId)
}
}
}
return result
}
func extractMultipartSSES3Info(entry *filer_pb.Entry) (*multipartSSES3Info, error) {
if entry == nil || entry.Extended == nil {
return nil, nil
}
if encryptionType := string(entry.Extended[s3_constants.SeaweedFSSSES3Encryption]); encryptionType != s3_constants.SSEAlgorithmAES256 {
return nil, nil
}
baseIVEncoded := entry.Extended[s3_constants.SeaweedFSSSES3BaseIV]
if len(baseIVEncoded) == 0 {
return nil, fmt.Errorf("missing SSE-S3 multipart base IV")
}
baseIV, err := base64.StdEncoding.DecodeString(string(baseIVEncoded))
if err != nil {
return nil, fmt.Errorf("decode SSE-S3 multipart base IV: %w", err)
}
if len(baseIV) != s3_constants.AESBlockSize {
return nil, fmt.Errorf("invalid SSE-S3 multipart base IV length %d", len(baseIV))
}
keyDataEncoded := entry.Extended[s3_constants.SeaweedFSSSES3KeyData]
if len(keyDataEncoded) == 0 {
return nil, fmt.Errorf("missing SSE-S3 multipart key data")
}
keyData, err := base64.StdEncoding.DecodeString(string(keyDataEncoded))
if err != nil {
return nil, fmt.Errorf("decode SSE-S3 multipart key data: %w", err)
}
key, err := DeserializeSSES3Metadata(keyData, GetSSES3KeyManager())
if err != nil {
return nil, fmt.Errorf("deserialize SSE-S3 multipart key data: %w", err)
}
return &multipartSSES3Info{
keyData: keyData,
key: key,
baseIV: baseIV,
}, nil
}
func completedMultipartChunk(chunk *filer_pb.FileChunk, offset int64, sses3Info *multipartSSES3Info) (*filer_pb.FileChunk, error) {
finalChunk := &filer_pb.FileChunk{
FileId: chunk.GetFileIdString(),
Offset: offset,
Size: chunk.Size,
ModifiedTsNs: chunk.ModifiedTsNs,
CipherKey: chunk.CipherKey,
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
SseType: chunk.SseType,
SseMetadata: chunk.SseMetadata,
}
if sses3Info == nil {
return finalChunk, nil
}
if finalChunk.GetSseType() != filer_pb.SSEType_NONE && finalChunk.GetSseType() != filer_pb.SSEType_SSE_S3 {
return finalChunk, nil
}
// Trust existing per-chunk SSE-S3 metadata: if the part went through
// putToFiler's chunk loop with a non-nil sseS3Key it carries an offset
// derived IV that we cannot recover from the upload-entry alone. We do
// not validate that the embedded IV equals calculateIVWithOffset(baseIV,
// chunk.Offset); only completely missing metadata is backfilled. This
// keeps healthy parts untouched and limits backfill to the cases that
// caused #8908.
if finalChunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(finalChunk.GetSseMetadata()) > 0 {
return finalChunk, nil
}
// IV uses the chunk's offset within its part, with no partNumber term.
// This mirrors the encryption side: putToFiler hardcodes partOffset=0 when
// it calls handleSSES3MultipartEncryption for every part, so each part's
// encrypted stream begins at IV = calculateIVWithOffset(baseIV, 0) = baseIV.
// Each chunk within that stream is then at IV =
// calculateIVWithOffset(baseIV, chunk.Offset_part_local). Any deviation
// (e.g. adding (partNumber-1)*PartOffsetMultiplier) would produce IVs that
// do not match the actual encryption and would corrupt decryption.
chunkIV, _ := calculateIVWithOffset(sses3Info.baseIV, chunk.GetOffset())
chunkKey := &SSES3Key{
Key: sses3Info.key.Key,
KeyID: sses3Info.key.KeyID,
Algorithm: sses3Info.key.Algorithm,
IV: chunkIV,
}
chunkMetadata, err := SerializeSSES3Metadata(chunkKey)
if err != nil {
return nil, fmt.Errorf("serialize SSE-S3 chunk metadata for %s: %w", chunk.GetFileIdString(), err)
}
finalChunk.SseType = filer_pb.SSEType_SSE_S3
finalChunk.SseMetadata = chunkMetadata
return finalChunk, nil
}
// applyMultipartSSES3HeadersFromUploadEntry writes the canonical object-level
// SSE-S3 attributes (SeaweedFSSSES3Key / X-Amz-Server-Side-Encryption) onto a
// completed multipart entry when they are missing. detectPrimarySSEType uses
// the object-level X-Amz-Server-Side-Encryption header to recognize SSE-S3 on
// inline / small-object reads, and IsSSES3EncryptedInternal requires both keys
// to consider an object encrypted at all. Without this, an object whose first
// part lacked the canonical attributes (e.g. a part written through a path
// that did not set them) would be served as unencrypted on GET.
func applyMultipartSSES3HeadersFromUploadEntry(dst *filer_pb.Entry, sses3Info *multipartSSES3Info) {
if dst == nil || sses3Info == nil {
return
}
if dst.Extended == nil {
dst.Extended = make(map[string][]byte)
}
if _, exists := dst.Extended[s3_constants.SeaweedFSSSES3Key]; !exists {
dst.Extended[s3_constants.SeaweedFSSSES3Key] = sses3Info.keyData
}
if _, exists := dst.Extended[s3_constants.AmzServerSideEncryption]; !exists {
dst.Extended[s3_constants.AmzServerSideEncryption] = []byte(s3_constants.SSEAlgorithmAES256)
}
}
func (s3a *S3ApiServer) prepareMultipartCompletionState(r *http.Request, input *s3.CompleteMultipartUploadInput, uploadDirectory, entryName, dirName string, completedPartNumbers []int, completedPartMap map[int][]string, maxPartNo int) (*multipartCompletionState, *CompleteMultipartUploadResult, s3err.ErrorCode) {
if entry, err := s3a.resolveObjectEntry(*input.Bucket, *input.Key); err == nil && entry != nil && entry.Extended != nil {
if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) {
cleanupEntries, _, cleanupErr := s3a.list(uploadDirectory, "", "", false, s3_constants.MaxS3MultipartParts+1)
if cleanupErr != nil && !errors.Is(cleanupErr, filer_pb.ErrNotFound) {
glog.Warningf("completeMultipartUpload: failed to list stale upload directory %s for cleanup: %v", uploadDirectory, cleanupErr)
}
return &multipartCompletionState{deleteEntries: cleanupEntries}, completeMultipartResult(r, input, getEtagFromEntry(entry), entry), s3err.ErrNone
}
}
entries, _, err := s3a.list(uploadDirectory, "", "", false, s3_constants.MaxS3MultipartParts+1)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, nil, s3err.ErrNoSuchUpload
}
if len(entries) == 0 {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, nil, s3err.ErrNoSuchUpload
}
pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, nil, s3err.ErrNoSuchUpload
}
deleteEntries := make([]*filer_pb.Entry, 0)
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
entityTooSmall := false
entityWithTtl := false
for _, entry := range entries {
foundEntry := false
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name)
if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) {
continue
}
partNumber, parseErr := parsePartNumber(entry.Name)
if parseErr != nil {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc()
glog.Errorf("completeMultipartUpload failed to parse partNumber %s:%s", entry.Name, parseErr)
continue
}
completedPartsByNumber, ok := completedPartMap[partNumber]
if !ok {
continue
}
for _, partETag := range completedPartsByNumber {
match, invalid, normalizedPartETag, normalizedEntryETag := validateCompletePartETag(partETag, entry)
if invalid {
glog.Warningf("invalid complete etag %s, storedEtag %s", normalizedPartETag, normalizedEntryETag)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc()
continue
}
if !match {
glog.Errorf("completeMultipartUpload %s ETag mismatch stored: %s part: %s", entry.Name, normalizedEntryETag, normalizedPartETag)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc()
continue
}
if len(entry.Chunks) == 0 && partNumber != maxPartNo {
glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc()
continue
}
partEntries[partNumber] = append(partEntries[partNumber], entry)
foundEntry = true
}
if foundEntry {
if !entityWithTtl && entry.Attributes != nil && entry.Attributes.TtlSec > 0 {
entityWithTtl = true
}
if len(completedPartNumbers) > 1 && partNumber != completedPartNumbers[len(completedPartNumbers)-1] &&
entry.Attributes.FileSize < multiPartMinSize {
glog.Warningf("completeMultipartUpload %s part file size less 5mb", entry.Name)
entityTooSmall = true
}
} else {
deleteEntries = append(deleteEntries, entry)
}
}
if entityTooSmall {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompleteEntityTooSmall).Inc()
return nil, nil, s3err.ErrEntityTooSmall
}
mime := ""
if pentry.Attributes != nil {
mime = pentry.Attributes.Mime
}
sses3Info, sses3Err := extractMultipartSSES3Info(pentry)
if sses3Err != nil {
glog.Errorf("completeMultipartUpload %s %s SSE-S3 metadata error: %v", *input.Bucket, *input.UploadId, sses3Err)
return nil, nil, s3err.ErrInternalError
}
finalParts := make([]*filer_pb.FileChunk, 0)
partBoundaries := make([]multipartPartBoundary, 0, len(completedPartNumbers))
var offset int64
for _, partNumber := range completedPartNumbers {
partEntriesByNumber, ok := partEntries[partNumber]
if !ok {
glog.Errorf("part %d has no entry", partNumber)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc()
return nil, nil, s3err.ErrInvalidPart
}
found := false
if len(partEntriesByNumber) > 1 {
sortEntriesByLatestChunk(partEntriesByNumber)
}
for _, entry := range partEntriesByNumber {
if found {
deleteEntries = append(deleteEntries, entry)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEntryMismatch).Inc()
continue
}
partStartChunk := len(finalParts)
partETag := filer.ETag(entry)
for _, chunk := range entry.GetChunks() {
finalChunk, chunkErr := completedMultipartChunk(chunk, offset, sses3Info)
if chunkErr != nil {
glog.Errorf("completeMultipartUpload %s %s SSE-S3 chunk metadata error: %v", *input.Bucket, *input.UploadId, chunkErr)
return nil, nil, s3err.ErrInternalError
}
finalParts = append(finalParts, finalChunk)
offset += int64(chunk.Size)
}
partEndChunk := len(finalParts)
partBoundaries = append(partBoundaries, multipartPartBoundary{
PartNumber: partNumber,
StartChunk: partStartChunk,
EndChunk: partEndChunk,
ETag: partETag,
})
found = true
}
}
// Compute composite checksum from per-part checksums if the upload
// was initiated with a checksum algorithm (stored in upload dir entry)
checksumHeaderName := ""
checksumValue := ""
if pentry.Extended != nil {
if algoName, ok := pentry.Extended[s3_constants.ExtChecksumAlgorithm]; ok {
checksumHeaderName = string(algoName)
}
}
if checksumHeaderName != "" {
var checksumErr error
checksumValue, checksumErr = computeCompositeChecksum(checksumHeaderName, partEntries, completedPartNumbers)
if checksumErr != nil {
glog.Errorf("completeMultipartUpload: composite checksum computation failed: %v", checksumErr)
return nil, nil, s3err.ErrInvalidPart
}
}
return &multipartCompletionState{
deleteEntries: deleteEntries,
partEntries: partEntries,
pentry: pentry,
sses3Info: sses3Info,
mime: mime,
finalParts: finalParts,
offset: offset,
partBoundaries: partBoundaries,
multipartETag: calculateMultipartETag(partEntries, completedPartNumbers),
entityWithTtl: entityWithTtl,
checksumHeaderName: checksumHeaderName,
checksumValue: checksumValue,
}, nil, s3err.ErrNone
}
func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("completeMultipartUpload input %v", input)
if len(parts.Parts) == 0 {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
completedPartNumbers := []int{}
completedPartMap := make(map[int][]string)
maxPartNo := 1
lastSeenPartNo := 0
for _, part := range parts.Parts {
if part.PartNumber < lastSeenPartNo {
return nil, s3err.ErrInvalidPartOrder
}
lastSeenPartNo = part.PartNumber
if _, ok := completedPartMap[part.PartNumber]; !ok {
completedPartNumbers = append(completedPartNumbers, part.PartNumber)
}
completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag)
maxPartNo = maxInt(maxPartNo, part.PartNumber)
}
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
entryName, dirName := s3a.getEntryNameAndDir(input)
var completionState *multipartCompletionState
finalizeCode := s3a.withObjectWriteLock(*input.Bucket, *input.Key, func() s3err.ErrorCode {
return s3a.checkConditionalHeaders(r, *input.Bucket, *input.Key)
}, func() s3err.ErrorCode {
var prepCode s3err.ErrorCode
completionState, output, prepCode = s3a.prepareMultipartCompletionState(r, input, uploadDirectory, entryName, dirName, completedPartNumbers, completedPartMap, maxPartNo)
if prepCode != s3err.ErrNone || output != nil {
return prepCode
}
etagQuote := "\"" + completionState.multipartETag + "\""
// Check if versioning is configured for this bucket BEFORE creating any files.
versioningState, vErr := s3a.getVersioningState(*input.Bucket)
if vErr != nil {
glog.Errorf("completeMultipartUpload: failed to get versioning state for bucket %s: %v", *input.Bucket, vErr)
return s3err.ErrInternalError
}
if versioningState == s3_constants.VersioningEnabled {
// Use full object key (not just entryName) to ensure correct .versions directory is checked
normalizedKey := strings.TrimPrefix(*input.Key, "/")
useInvertedFormat := s3a.getVersionIdFormat(*input.Bucket, normalizedKey)
versionId := generateVersionId(useInvertedFormat)
versionFileName := s3a.getVersionFileName(versionId)
versionDir := dirName + "/" + entryName + s3_constants.VersionsFolder
// Capture timestamp and owner once for consistency between version entry and cache entry
versionMtime := time.Now().Unix()
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
// Create the version file in the .versions directory
if err := s3a.mkFile(versionDir, versionFileName, completionState.finalParts, func(versionEntry *filer_pb.Entry) {
if versionEntry.Extended == nil {
versionEntry.Extended = make(map[string][]byte)
}
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, err := json.Marshal(completionState.partBoundaries); err == nil {
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for versioned multipart objects
if amzAccountId != "" {
versionEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range completionState.pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
versionEntry.Extended[k] = v
}
}
// Persist ETag to ensure subsequent HEAD/GET uses the same value
versionEntry.Extended[s3_constants.ExtETagKey] = []byte(completionState.multipartETag)
// Store composite checksum if computed from per-part checksums
if completionState.checksumHeaderName != "" && completionState.checksumValue != "" {
versionEntry.Extended[s3_constants.ExtChecksumAlgorithm] = []byte(completionState.checksumHeaderName)
versionEntry.Extended[s3_constants.ExtChecksumValue] = []byte(completionState.checksumValue)
}
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(completionState.partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := completionState.partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(versionEntry, firstPartEntry, "versioned")
}
applyMultipartSSES3HeadersFromUploadEntry(versionEntry, completionState.sses3Info)
if completionState.pentry.Attributes != nil && completionState.pentry.Attributes.Mime != "" {
versionEntry.Attributes.Mime = completionState.pentry.Attributes.Mime
} else if completionState.mime != "" {
versionEntry.Attributes.Mime = completionState.mime
}
versionEntry.Attributes.FileSize = uint64(completionState.offset)
versionEntry.Attributes.Mtime = versionMtime
}); err != nil {
glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err)
return s3err.ErrInternalError
}
// Construct entry with metadata for caching in .versions directory
// Reuse versionMtime to keep list vs. HEAD timestamps aligned
// multipartETag is precomputed
versionEntryForCache := &filer_pb.Entry{
Attributes: &filer_pb.FuseAttributes{
FileSize: uint64(completionState.offset),
Mtime: versionMtime,
},
Extended: map[string][]byte{
s3_constants.ExtETagKey: []byte(completionState.multipartETag),
},
}
if amzAccountId != "" {
versionEntryForCache.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
// Update the .versions directory metadata to indicate this is the latest version
// Pass entry to cache its metadata for single-scan list efficiency
if err := s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache); err != nil {
if rollbackErr := s3a.rollbackMultipartVersion(versionDir, versionFileName); rollbackErr != nil {
glog.Errorf("completeMultipartUpload: failed to rollback version %s for %s/%s after latest pointer update error: %v", versionId, *input.Bucket, *input.Key, rollbackErr)
}
glog.Errorf("completeMultipartUpload: failed to update latest version in directory: %v", err)
return s3err.ErrInternalError
}
// For versioned buckets, all content is stored in .versions directory
// The latest version information is tracked in the .versions directory metadata
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
VersionId: aws.String(versionId),
ChecksumHeaderName: completionState.checksumHeaderName,
ChecksumValue: completionState.checksumValue,
}
return s3err.ErrNone
}
if versioningState == s3_constants.VersioningSuspended {
// For suspended versioning, add "null" version ID metadata and return "null" version ID
if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, jsonErr := json.Marshal(completionState.partBoundaries); jsonErr == nil {
entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for suspended versioning multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range completionState.pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
entry.Extended[k] = v
}
}
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(completionState.partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := completionState.partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning")
}
applyMultipartSSES3HeadersFromUploadEntry(entry, completionState.sses3Info)
// Persist ETag to ensure subsequent HEAD/GET uses the same value
entry.Extended[s3_constants.ExtETagKey] = []byte(completionState.multipartETag)
// Store composite checksum if computed from per-part checksums
if completionState.checksumHeaderName != "" && completionState.checksumValue != "" {
entry.Extended[s3_constants.ExtChecksumAlgorithm] = []byte(completionState.checksumHeaderName)
entry.Extended[s3_constants.ExtChecksumValue] = []byte(completionState.checksumValue)
}
if completionState.pentry.Attributes != nil && completionState.pentry.Attributes.Mime != "" {
entry.Attributes.Mime = completionState.pentry.Attributes.Mime
} else if completionState.mime != "" {
entry.Attributes.Mime = completionState.mime
}
entry.Attributes.FileSize = uint64(completionState.offset)
}); err != nil {
glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err)
return s3err.ErrInternalError
}
// Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
ChecksumHeaderName: completionState.checksumHeaderName,
ChecksumValue: completionState.checksumValue,
// VersionId field intentionally omitted for suspended versioning
}
return s3err.ErrNone
}
// For non-versioned buckets, create main object file
if err := s3a.mkFile(dirName, entryName, completionState.finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, err := json.Marshal(completionState.partBoundaries); err == nil {
entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for non-versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range completionState.pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
entry.Extended[k] = v
}
}
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(completionState.partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := completionState.partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(entry, firstPartEntry, "non-versioned")
}
applyMultipartSSES3HeadersFromUploadEntry(entry, completionState.sses3Info)
// Persist ETag to ensure subsequent HEAD/GET uses the same value
entry.Extended[s3_constants.ExtETagKey] = []byte(completionState.multipartETag)
// Store composite checksum if computed from per-part checksums
if completionState.checksumHeaderName != "" && completionState.checksumValue != "" {
entry.Extended[s3_constants.ExtChecksumAlgorithm] = []byte(completionState.checksumHeaderName)
entry.Extended[s3_constants.ExtChecksumValue] = []byte(completionState.checksumValue)
}
if completionState.pentry.Attributes != nil && completionState.pentry.Attributes.Mime != "" {
entry.Attributes.Mime = completionState.pentry.Attributes.Mime
} else if completionState.mime != "" {
entry.Attributes.Mime = completionState.mime
}
entry.Attributes.FileSize = uint64(completionState.offset)
// Set TTL-based S3 expiry (modification time)
if completionState.entityWithTtl {
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
}
}); err != nil {
glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
return s3err.ErrInternalError
}
// For non-versioned buckets, return response without VersionId
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
ChecksumHeaderName: completionState.checksumHeaderName,
ChecksumValue: completionState.checksumValue,
}
return s3err.ErrNone
})
if finalizeCode != s3err.ErrNone {
return nil, finalizeCode
}
if completionState != nil {
for _, deleteEntry := range completionState.deleteEntries {
if err := s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil {
glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err)
}
}
if err := s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil {
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
}
}
return
}
func (s3a *S3ApiServer) rollbackMultipartVersion(versionDir, versionFileName string) error {
return s3a.rmObject(versionDir, versionFileName, true, false)
}
func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInput) (string, string) {
entryName := path.Base(*input.Key)
dirName := path.Dir(*input.Key)
if dirName == "." {
dirName = ""
}
dirName = strings.TrimPrefix(dirName, "/")
dirName = fmt.Sprintf("%s/%s", s3a.bucketDir(*input.Bucket), dirName)
// remove suffix '/'
dirName = strings.TrimSuffix(dirName, "/")
return entryName, dirName
}
func parsePartNumber(fileName string) (int, error) {
var partNumberString string
index := strings.Index(fileName, "_")
if index != -1 {
partNumberString = fileName[:index]
} else {
partNumberString = fileName[:len(fileName)-len(multipartExt)]
}
return strconv.Atoi(partNumberString)
}
func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) {
glog.V(2).Infof("abortMultipartUpload input %v", input)
exists, err := s3a.exists(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true)
if err != nil {
glog.V(1).Infof("bucket %s abort upload %s: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrNoSuchUpload
}
if exists {
err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, true)
}
if err != nil {
glog.V(1).Infof("bucket %s remove upload %s: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrInternalError
}
return &s3.AbortMultipartUploadOutput{}, s3err.ErrNone
}
type ListMultipartUploadsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult"`
// copied from s3.ListMultipartUploadsOutput, the Uploads is not converting to <Upload></Upload>
Bucket *string `type:"string"`
Delimiter *string `type:"string"`
EncodingType *string `type:"string" enum:"EncodingType"`
IsTruncated *bool `type:"boolean"`
KeyMarker *string `type:"string"`
MaxUploads *int64 `type:"integer"`
NextKeyMarker *string `type:"string"`
NextUploadIdMarker *string `type:"string"`
Prefix *string `type:"string"`
UploadIdMarker *string `type:"string"`
Upload []*s3.MultipartUpload `locationName:"Upload" type:"list" flattened:"true"`
}
func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code s3err.ErrorCode) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html
glog.V(2).Infof("listMultipartUploads input %v", input)
output = &ListMultipartUploadsResult{
Bucket: input.Bucket,
Delimiter: input.Delimiter,
EncodingType: input.EncodingType,
KeyMarker: input.KeyMarker,
MaxUploads: input.MaxUploads,
Prefix: input.Prefix,
IsTruncated: aws.Bool(false),
}
entries, _, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, math.MaxInt32)
if err != nil {
glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err)
return
}
uploadsCount := int64(0)
for _, entry := range entries {
if entry.Extended != nil {
key := string(entry.Extended[s3_constants.ExtMultipartObjectKey])
if *input.KeyMarker != "" && *input.KeyMarker != key {
continue
}
if *input.Prefix != "" && !strings.HasPrefix(key, *input.Prefix) {
continue
}
output.Upload = append(output.Upload, &s3.MultipartUpload{
Key: objectKey(aws.String(key)),
UploadId: aws.String(entry.Name),
})
uploadsCount += 1
}
if uploadsCount >= *input.MaxUploads {
output.IsTruncated = aws.Bool(true)
output.NextUploadIdMarker = aws.String(entry.Name)
break
}
}
return
}
type ListPartsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult"`
// copied from s3.ListPartsOutput, the Parts is not converting to <Part></Part>
Bucket *string `type:"string"`
IsTruncated *bool `type:"boolean"`
Key *string `min:"1" type:"string"`
MaxParts *int64 `type:"integer"`
NextPartNumberMarker *int64 `type:"integer"`
PartNumberMarker *int64 `type:"integer"`
Part []*s3.Part `locationName:"Part" type:"list" flattened:"true"`
StorageClass *string `type:"string" enum:"StorageClass"`
UploadId *string `type:"string"`
}
func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListPartsResult, code s3err.ErrorCode) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html
glog.V(2).Infof("listObjectParts input %v", input)
output = &ListPartsResult{
Bucket: input.Bucket,
Key: objectKey(input.Key),
UploadId: input.UploadId,
MaxParts: input.MaxParts, // the maximum number of parts to return.
PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
StorageClass: aws.String("STANDARD"),
}
entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d%s", *input.PartNumberMarker, multipartExt), false, uint32(*input.MaxParts))
if err != nil {
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrNoSuchUpload
}
// Note: The upload directory is sort of a marker of the existence of an multipart upload request.
// So can not just delete empty upload folders.
output.IsTruncated = aws.Bool(!isLast)
for _, entry := range entries {
if strings.HasSuffix(entry.Name, multipartExt) && !entry.IsDirectory {
partNumber, err := parsePartNumber(entry.Name)
if err != nil {
glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err)
continue
}
partETag := filer.ETag(entry)
part := &s3.Part{
PartNumber: aws.Int64(int64(partNumber)),
LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0).UTC()),
Size: aws.Int64(int64(filer.FileSize(entry))),
ETag: aws.String("\"" + partETag + "\""),
}
output.Part = append(output.Part, part)
glog.V(3).Infof("listObjectParts: Added part %d, size=%d, etag=%s",
partNumber, filer.FileSize(entry), partETag)
if !isLast {
output.NextPartNumberMarker = aws.Int64(int64(partNumber))
}
}
}
glog.V(2).Infof("listObjectParts: Returning %d parts for uploadId=%s", len(output.Part), *input.UploadId)
return
}
// maxInt returns the maximum of two int values
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
// MultipartEncryptionConfig holds pre-prepared encryption configuration to avoid error handling in callbacks
type MultipartEncryptionConfig struct {
// SSE-KMS configuration
IsSSEKMS bool
KMSKeyID string
BucketKeyEnabled bool
EncryptionContext string
KMSBaseIVEncoded string
// SSE-S3 configuration
IsSSES3 bool
S3BaseIVEncoded string
S3KeyDataEncoded string
}
// prepareMultipartEncryptionConfig prepares encryption configuration with proper error handling
// This eliminates the need for criticalError variable in callback functions
// Updated to support bucket-default encryption (matches putToFiler behavior)
func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, bucket string, uploadIdString string) (*MultipartEncryptionConfig, error) {
config := &MultipartEncryptionConfig{}
// Check for explicit encryption headers first (priority over bucket defaults)
hasExplicitSSEKMS := IsSSEKMSRequest(r)
hasExplicitSSES3 := IsSSES3RequestInternal(r)
// Prepare SSE-KMS configuration (explicit request headers)
if hasExplicitSSEKMS {
config.IsSSEKMS = true
config.KMSKeyID = r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId)
config.BucketKeyEnabled = strings.ToLower(r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled)) == "true"
config.EncryptionContext = r.Header.Get(s3_constants.AmzServerSideEncryptionContext)
// Generate and encode base IV with proper error handling
baseIV := make([]byte, s3_constants.AESBlockSize)
n, err := rand.Read(baseIV)
if err != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for SSE-KMS multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV))
}
config.KMSBaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for explicit SSE-KMS multipart upload %s", baseIV[:8], uploadIdString)
}
// Prepare SSE-S3 configuration (explicit request headers)
if hasExplicitSSES3 {
config.IsSSES3 = true
// Generate and encode base IV with proper error handling
baseIV := make([]byte, s3_constants.AESBlockSize)
n, err := rand.Read(baseIV)
if err != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for SSE-S3 multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV))
}
config.S3BaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for explicit SSE-S3 multipart upload %s", baseIV[:8], uploadIdString)
// Generate and serialize SSE-S3 key with proper error handling
keyManager := GetSSES3KeyManager()
sseS3Key, err := keyManager.GetOrCreateKey("")
if err != nil {
return nil, fmt.Errorf("failed to generate SSE-S3 key for multipart upload: %v", err)
}
keyData, serErr := SerializeSSES3Metadata(sseS3Key)
if serErr != nil {
return nil, fmt.Errorf("failed to serialize SSE-S3 metadata for multipart upload: %v", serErr)
}
config.S3KeyDataEncoded = base64.StdEncoding.EncodeToString(keyData)
// Store key in manager for later retrieval
keyManager.StoreKey(sseS3Key)
glog.V(4).Infof("Stored SSE-S3 key %s for explicit multipart upload %s", sseS3Key.KeyID, uploadIdString)
}
// If no explicit encryption headers, check bucket-default encryption
// This matches AWS S3 behavior and putToFiler() implementation
if !hasExplicitSSEKMS && !hasExplicitSSES3 {
encryptionConfig, err := s3a.GetBucketEncryptionConfig(bucket)
if err != nil {
// Check if this is just "no encryption configured" vs a real error
if !errors.Is(err, ErrNoEncryptionConfig) {
// Real error - propagate to prevent silent encryption bypass
return nil, fmt.Errorf("failed to read bucket encryption config for multipart upload: %v", err)
}
// No default encryption configured, continue without encryption
} else if encryptionConfig != nil && encryptionConfig.SseAlgorithm != "" {
glog.V(3).Infof("prepareMultipartEncryptionConfig: applying bucket-default encryption %s for bucket %s, upload %s",
encryptionConfig.SseAlgorithm, bucket, uploadIdString)
switch encryptionConfig.SseAlgorithm {
case EncryptionTypeKMS:
// Apply SSE-KMS as bucket default
config.IsSSEKMS = true
config.KMSKeyID = encryptionConfig.KmsKeyId
config.BucketKeyEnabled = encryptionConfig.BucketKeyEnabled
// No encryption context for bucket defaults
// Generate and encode base IV
baseIV := make([]byte, s3_constants.AESBlockSize)
n, readErr := rand.Read(baseIV)
if readErr != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for bucket-default SSE-KMS multipart upload: %v (read %d/%d bytes)", readErr, n, len(baseIV))
}
config.KMSBaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for bucket-default SSE-KMS multipart upload %s", baseIV[:8], uploadIdString)
case EncryptionTypeAES256:
// Apply SSE-S3 (AES256) as bucket default
config.IsSSES3 = true
// Generate and encode base IV
baseIV := make([]byte, s3_constants.AESBlockSize)
n, readErr := rand.Read(baseIV)
if readErr != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for bucket-default SSE-S3 multipart upload: %v (read %d/%d bytes)", readErr, n, len(baseIV))
}
config.S3BaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for bucket-default SSE-S3 multipart upload %s", baseIV[:8], uploadIdString)
// Generate and serialize SSE-S3 key
keyManager := GetSSES3KeyManager()
sseS3Key, keyErr := keyManager.GetOrCreateKey("")
if keyErr != nil {
return nil, fmt.Errorf("failed to generate SSE-S3 key for bucket-default multipart upload: %v", keyErr)
}
keyData, serErr := SerializeSSES3Metadata(sseS3Key)
if serErr != nil {
return nil, fmt.Errorf("failed to serialize SSE-S3 metadata for bucket-default multipart upload: %v", serErr)
}
config.S3KeyDataEncoded = base64.StdEncoding.EncodeToString(keyData)
// Store key in manager for later retrieval
keyManager.StoreKey(sseS3Key)
glog.V(4).Infof("Stored SSE-S3 key %s for bucket-default multipart upload %s", sseS3Key.KeyID, uploadIdString)
default:
glog.V(3).Infof("prepareMultipartEncryptionConfig: unsupported bucket-default encryption algorithm %s for bucket %s",
encryptionConfig.SseAlgorithm, bucket)
}
}
}
return config, nil
}
// applyMultipartEncryptionConfig applies pre-prepared encryption configuration to filer entry
// This function is guaranteed not to fail since all error-prone operations were done during preparation
func (s3a *S3ApiServer) applyMultipartEncryptionConfig(entry *filer_pb.Entry, config *MultipartEncryptionConfig) {
// Apply SSE-KMS configuration
if config.IsSSEKMS {
entry.Extended[s3_constants.SeaweedFSSSEKMSKeyID] = []byte(config.KMSKeyID)
if config.BucketKeyEnabled {
entry.Extended[s3_constants.SeaweedFSSSEKMSBucketKeyEnabled] = []byte("true")
}
if config.EncryptionContext != "" {
entry.Extended[s3_constants.SeaweedFSSSEKMSEncryptionContext] = []byte(config.EncryptionContext)
}
entry.Extended[s3_constants.SeaweedFSSSEKMSBaseIV] = []byte(config.KMSBaseIVEncoded)
glog.V(3).Infof("applyMultipartEncryptionConfig: applied SSE-KMS settings with keyID %s", config.KMSKeyID)
}
// Apply SSE-S3 configuration
if config.IsSSES3 {
entry.Extended[s3_constants.SeaweedFSSSES3Encryption] = []byte(s3_constants.SSEAlgorithmAES256)
entry.Extended[s3_constants.SeaweedFSSSES3BaseIV] = []byte(config.S3BaseIVEncoded)
entry.Extended[s3_constants.SeaweedFSSSES3KeyData] = []byte(config.S3KeyDataEncoded)
glog.V(3).Infof("applyMultipartEncryptionConfig: applied SSE-S3 settings")
}
}
func sortEntriesByLatestChunk(entries []*filer_pb.Entry) {
slices.SortFunc(entries, func(a, b *filer_pb.Entry) int {
var aTs, bTs int64
if len(a.Chunks) > 0 {
aTs = a.Chunks[0].ModifiedTsNs
}
if len(b.Chunks) > 0 {
bTs = b.Chunks[0].ModifiedTsNs
}
return cmp.Compare(bTs, aTs)
})
}
func calculateMultipartETag(partEntries map[int][]*filer_pb.Entry, completedPartNumbers []int) string {
var etags []byte
for _, partNumber := range completedPartNumbers {
entries, ok := partEntries[partNumber]
if !ok || len(entries) == 0 {
continue
}
if len(entries) > 1 {
sortEntriesByLatestChunk(entries)
}
entry := entries[0]
etag := getEtagFromEntry(entry)
glog.V(4).Infof("calculateMultipartETag: part %d, entry %s, getEtagFromEntry result: %s", partNumber, entry.Name, etag)
etag = strings.Trim(etag, "\"")
if before, _, found := strings.Cut(etag, "-"); found {
etag = before
}
if etagBytes, err := hex.DecodeString(etag); err == nil {
etags = append(etags, etagBytes...)
} else {
glog.Warningf("calculateMultipartETag: failed to decode etag '%s' for part %d: %v", etag, partNumber, err)
}
}
return fmt.Sprintf("%x-%d", md5.Sum(etags), len(completedPartNumbers))
}
// computeCompositeChecksum computes a composite checksum from per-part checksums.
// It concatenates the raw (decoded) per-part checksums, hashes the result with the
// same algorithm, and returns the value as "base64-N" where N is the part count.
// This follows the AWS S3 multipart checksum specification.
// Returns an error if a part is missing its checksum (the upload was initiated with
// a checksum algorithm, so all parts must have been uploaded with checksums).
func computeCompositeChecksum(checksumHeaderName string, partEntries map[int][]*filer_pb.Entry, completedPartNumbers []int) (string, error) {
// Determine the algorithm from the header name
algo := checksumAlgorithmFromHeaderName(checksumHeaderName)
if algo == ChecksumAlgorithmNone {
return "", fmt.Errorf("unknown checksum algorithm for header %q", checksumHeaderName)
}
// Collect raw per-part checksums
var combined []byte
for _, partNumber := range completedPartNumbers {
entries, ok := partEntries[partNumber]
if !ok || len(entries) == 0 {
return "", fmt.Errorf("part %d not found", partNumber)
}
if len(entries) > 1 {
sortEntriesByLatestChunk(entries)
}
entry := entries[0]
if entry.Extended == nil {
return "", fmt.Errorf("part %d missing checksum: upload initiated with %s but part was uploaded without a checksum", partNumber, checksumHeaderName)
}
// Validate the part's checksum algorithm matches the upload's expected algorithm
partAlgo, ok := entry.Extended[s3_constants.ExtChecksumAlgorithm]
if !ok || len(partAlgo) == 0 {
return "", fmt.Errorf("part %d missing checksum: upload initiated with %s but part was uploaded without a checksum", partNumber, checksumHeaderName)
}
if string(partAlgo) != checksumHeaderName {
return "", fmt.Errorf("part %d checksum algorithm mismatch: upload expects %s but part has %s", partNumber, checksumHeaderName, string(partAlgo))
}
partChecksumB64, ok := entry.Extended[s3_constants.ExtChecksumValue]
if !ok || len(partChecksumB64) == 0 {
return "", fmt.Errorf("part %d missing checksum value: upload initiated with %s but part has no checksum value", partNumber, checksumHeaderName)
}
raw, err := base64.StdEncoding.DecodeString(string(partChecksumB64))
if err != nil {
return "", fmt.Errorf("part %d has invalid checksum encoding: %w", partNumber, err)
}
combined = append(combined, raw...)
}
// Hash the concatenated raw checksums
h := getCheckSumWriter(algo)
if h == nil {
return "", fmt.Errorf("failed to create hash writer for %s", checksumHeaderName)
}
h.Write(combined)
compositeRaw := h.Sum(nil)
return fmt.Sprintf("%s-%d", base64.StdEncoding.EncodeToString(compositeRaw), len(completedPartNumbers)), nil
}
// checksumAlgorithmFromHeaderName maps a canonical header name back to its algorithm.
func checksumAlgorithmFromHeaderName(headerName string) ChecksumAlgorithm {
for _, entry := range checksumHeaders {
if entry.name == headerName {
return entry.alg
}
}
return ChecksumAlgorithmNone
}
func getEtagFromEntry(entry *filer_pb.Entry) string {
if entry.Extended != nil {
if etagBytes, ok := entry.Extended[s3_constants.ExtETagKey]; ok {
etag := string(etagBytes)
if len(etag) > 0 {
if !strings.HasPrefix(etag, "\"") {
return "\"" + etag + "\""
}
return etag
}
// Empty stored ETag — fall through to filer.ETag calculation
}
}
// Fallback to filer.ETag which handles Attributes.Md5 consistently
etag := filer.ETag(entry)
entryName := entry.Name
if entryName == "" {
entryName = "entry"
}
glog.V(4).Infof("getEtagFromEntry: fallback to filer.ETag for %s: %s, chunkCount: %d", entryName, etag, len(entry.Chunks))
return "\"" + etag + "\""
}
func validateCompletePartETag(partETag string, entry *filer_pb.Entry) (match bool, invalid bool, normalizedPartETag string, normalizedEntryETag string) {
normalizedPartETag = strings.Trim(strings.TrimSpace(partETag), `"`)
if normalizedPartETag == "" {
return false, true, normalizedPartETag, ""
}
normalizedEntryETag = strings.Trim(getEtagFromEntry(entry), `"`)
if normalizedEntryETag == "" {
return false, true, normalizedPartETag, normalizedEntryETag
}
return normalizedPartETag == normalizedEntryETag, false, normalizedPartETag, normalizedEntryETag
}