mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
2458f6c81c
* feat(s3api): apply lifecycle TTL at write time The S3 server already has the bucket's lifecycle XML at PUT time (via the cached BucketConfig), so volume-TTL routing is just a per-write decision instead of something that needs a separate filer.conf projection kept in sync via operator commands. - BucketConfig caches the canonical Rules parsed from the lifecycle XML once on load (BucketConfigCache invalidates on Put/Delete Lifecycle, so the rules stay current automatically). - resolveLifecycleTTLForWrite walks the cached rules: longest-prefix match, applies tag and size filters against the request, returns Days * 86400. Versioned buckets, non-Expiration.Days rules, and unevaluable size filters (no Content-Length) yield 0 — the lifecycle worker handles those at scan time. - putToFiler resolves TTL once and passes it through both the AssignVolumeRequest (so chunks land on a TTL volume) and the new entry's Attributes.TtlSec (so the filer's RocksDB compaction also expires the metadata). Lifecycle XML PUT/DELETE now influences write routing immediately — no operator command, no filer.conf bookkeeping. The lifecycle worker remains authoritative for the cases the fast path can't cover (existing objects via bootstrap, versioned buckets, noncurrent retention, abort-MPU, tag/size filters that didn't hold at PUT time). CompleteMultipartUpload and CopyObject still need wiring; left for follow-ups so this PR stays scoped. * perf(s3api): pre-filter and sort lifecycle rules for the per-PUT TTL walk resolveLifecycleTTLForWrite walked every lifecycle rule on every PutObject, including disabled / non-Expiration.Days rules that could never fire on the fast path, and computed "longest prefix wins" via a running max instead of an early exit. Cache a pre-filtered + pre-sorted slice in BucketConfig: - buildTTLFastPathRules drops everything except Status=Enabled + ExpirationDays>0; - sorts by descending prefix length (stable, so equal-length rules keep their XML order). The resolver returns on first prefix+filter match. A bucket whose lifecycle XML has no Expiration.Days rules is now O(1); a typical bucket with one Expiration.Days rule walks one HasPrefix per PUT. The cache is built once per bucket-config load. PutBucketLifecycle / DeleteBucketLifecycle already invalidate the cache, so the fast-path slice stays current automatically. * refactor(s3api): LifecycleTTLResolver object + four review fixes Pulls the per-PUT TTL resolution into a dedicated type so the bucket config holds one object instead of a slice + magic-walk function: - LifecycleTTLResolver wraps the pre-filtered, pre-sorted rules. nil-safe Resolve so the call site doesn't have to special-case buckets with no eligible rules. Four review findings: 1. (high) drop tag-filtered rules from the fast path. Tags are mutable post-PUT via PutObjectTagging but volume TTL is irreversible — an object that matched at write time would still expire after the tag was removed. Worker re-evaluates current tags at scan time. Fast path now keeps only stable predicates: prefix and size. 2. (high) move TTL resolution out of putToFiler. MPU parts, copy-part destinations, and other transient writes called putToFiler with object="" — bucket-wide rules (empty Prefix) matched and bound a TTL clock starting at part-upload time, before CompleteMultipartUpload existed. putToFiler now takes an explicit ttlSec parameter; only the user-visible PutObject paths (PutObjectHandler, postpolicy) feed it from the resolver. MPU and copy-part pass 0. 3. (medium) AWS overlapping-rule precedence is "shorter expiration wins", not "longest prefix wins". Sort by ExpirationDays ascending so the first prefix match is also the shortest applicable rule. 4. (medium) overflow no longer caps at math.MaxInt32 seconds (~68y). A longer policy would have expired early. Return 0 instead so the worker enforces the actual policy on its own schedule. Versioning gate moves into the resolver constructor — versioned buckets get a nil resolver. The five putToFiler callers all updated: PutObjectHandler + postpolicy resolve via lifecycleTTLForObjectWrite, suspended/versioned wrappers pass 0 by construction, MPU part and copy-part SSE pass 0 with a one-line comment about why. * refactor(s3api): drop unused BucketConfig.LifecycleRules field The full canonical rule set was set on every bucket-config load but never read — resolveLifecycleTTLForWrite worked off the resolver's filtered slice, and the lifecycle worker reads bucket entries straight off the meta-log instead of this cache. Remove the field and its s3lifecycle import. * perf(s3api): pre-compute LifecycleTTLResolver hot-path fields Resolve was doing per-call work that's actually constant per bucket- config load: int64 multiplication, max-int32 overflow check, field indirections through *s3lifecycle.Rule. Move it to the constructor and pack the rule into a compact ttlRule (prefix + ttlSec int32 + sizeGT/sizeLT) so the inner loop is HasPrefix → optional size check → return. Drop overflowing rules at construction rather than handling per- resolve: capping would expire long policies early, and returning 0 in the inner loop would prevent any shorter overlapping rule from firing. Drop-at-construction composes correctly with the ascending sort. Benchmarks (Apple M4): NilReceiver 0.99 ns/op 0 B/op OneRuleMatching 2.75 ns/op 0 B/op FiveRulesNoMatch 13.5 ns/op 0 B/op * fix(s3api): refresh LifecycleTTL resolver on bucket-config update storeBucketLifecycleConfiguration writes to Entry.Extended via updateBucketConfig, which clones the cached BucketConfig and calls the user fn, then caches the result. The clone inherits the prior LifecycleTTL pointer and nothing rebuilt it from the new XML, so add/replace/delete of a lifecycle policy left the wrong resolver in cache until eviction. Same gap on the meta-log side: peer-driven updates flowed through updateBucketConfigCacheFromEntry without re-deriving the resolver. Centralize the Entry -> derived-field mapping in one helper that resets every Extended-backed field then repopulates from the entry, and call it from getBucketConfig (initial load), updateBucketConfig (after updateEntry succeeds, before caching), and updateBucketConfigCacheFromEntry (meta-log path). Reset is the load-bearing part: deleting the lifecycle XML must yield a nil resolver, since stamping a stale TTL onto subsequent writes is irreversible. * fix(s3api): PostPolicy passes object size, not multipart wire size lifecycleTTLForObjectWrite was reading r.ContentLength, which on the PostPolicy path is the multipart envelope (form fields + boundaries), not the uploaded object body. A size-filtered rule would evaluate against that inflated total and stamp (or skip) a TTL the policy didn't intend. Take the object size as an explicit parameter. PutObject still passes r.ContentLength (correct there); PostPolicy passes the fileSize already extracted from the form part. Negative size means unknown and continues to skip any size-filtered rule. * fix(s3api): treat Object Lock as versioned for lifecycle TTL fast path Object Lock requires versioning at the API level, but it can be enabled at create time without S3 ever writing the explicit Versioning header. The lifecycle resolver construction site only checked Versioning, so an Object-Lock bucket with no Versioning byte would still get a fast-path resolver and stamp volume TTL onto writes — destroying noncurrent versions when the volume expires. Mirror the OR already used in BucketIsVersioned: ObjectLockConfig non-nil counts as versioned for resolver construction. Existing explicit-Versioning paths are unchanged.
621 lines
21 KiB
Go
621 lines
21 KiB
Go
package s3api
|
|
|
|
import (
|
|
"crypto/sha1"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"encoding/xml"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
"github.com/google/uuid"
|
|
"github.com/pquerna/cachecontrol/cacheobject"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
|
|
)
|
|
|
|
const (
|
|
maxObjectListSizeLimit = 1000 // Limit number of objects in a listObjectsResponse.
|
|
maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
|
|
maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
|
|
)
|
|
|
|
// NewMultipartUploadHandler - New multipart upload.
|
|
func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
|
|
if len(object) > s3_constants.MaxS3ObjectKeyLength {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrKeyTooLongError)
|
|
return
|
|
}
|
|
|
|
// Check if bucket exists, and create it if it doesn't (auto-create bucket)
|
|
if err := s3a.checkBucket(r, bucket); err == s3err.ErrNoSuchBucket {
|
|
// Auto-create bucket if it doesn't exist (requires Admin permission)
|
|
if !s3a.handleAutoCreateBucket(w, r, bucket, "NewMultipartUploadHandler") {
|
|
return
|
|
}
|
|
} else if err != s3err.ErrNone {
|
|
// Other errors (like access denied) should still fail
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
if err := s3a.validateTableBucketObjectPath(bucket, object); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
|
|
return
|
|
}
|
|
|
|
// Check if versioning is enabled for the bucket (needed for object lock)
|
|
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
|
return
|
|
}
|
|
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// Validate object lock headers before processing
|
|
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
|
|
glog.V(2).Infof("NewMultipartUploadHandler: object lock header validation failed for bucket %s, object %s: %v", bucket, object, err)
|
|
s3err.WriteErrorResponse(w, r, mapValidationErrorToS3Error(err))
|
|
return
|
|
}
|
|
|
|
// Validate Cache-Control header format if present
|
|
if cacheControl := r.Header.Get("Cache-Control"); cacheControl != "" {
|
|
if _, err := cacheobject.ParseRequestCacheControl(cacheControl); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Validate Expires header format if present
|
|
if expires := r.Header.Get("Expires"); expires != "" {
|
|
if _, err := time.Parse(http.TimeFormat, expires); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedDate)
|
|
return
|
|
}
|
|
}
|
|
|
|
createMultipartUploadInput := &s3.CreateMultipartUploadInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: objectKey(aws.String(object)),
|
|
Metadata: make(map[string]*string),
|
|
}
|
|
|
|
// Parse S3 metadata from request headers
|
|
metadata, errCode := ParseS3Metadata(r, nil, false)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
for k, v := range metadata {
|
|
createMultipartUploadInput.Metadata[k] = aws.String(string(v))
|
|
}
|
|
|
|
contentType := r.Header.Get("Content-Type")
|
|
if contentType != "" {
|
|
createMultipartUploadInput.ContentType = &contentType
|
|
}
|
|
response, errCode := s3a.createMultipartUpload(r, createMultipartUploadInput)
|
|
|
|
glog.V(3).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
|
|
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
writeSuccessResponseXML(w, r, response)
|
|
|
|
}
|
|
|
|
// CompleteMultipartUploadHandler - Completes multipart upload.
|
|
func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
|
|
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
|
|
// Check if bucket exists before completing multipart upload
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
if err := s3a.validateTableBucketObjectPath(bucket, object); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
|
|
return
|
|
}
|
|
|
|
parts := &CompleteMultipartUpload{}
|
|
if err := xmlDecoder(r.Body, parts, r.ContentLength); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
|
|
return
|
|
}
|
|
|
|
// Get upload id.
|
|
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
|
err := s3a.checkUploadId(object, uploadID)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
|
|
return
|
|
}
|
|
|
|
// Check conditional headers before completing multipart upload
|
|
// This implements AWS S3 behavior where conditional headers apply to CompleteMultipartUpload
|
|
if errCode := s3a.checkConditionalHeaders(r, bucket, object); errCode != s3err.ErrNone {
|
|
glog.V(3).Infof("CompleteMultipartUploadHandler: Conditional header check failed for %s/%s", bucket, object)
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
response, errCode := s3a.completeMultipartUpload(r, &s3.CompleteMultipartUploadInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: objectKey(aws.String(object)),
|
|
UploadId: aws.String(uploadID),
|
|
}, parts)
|
|
|
|
glog.V(3).Info("CompleteMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
|
|
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
// Set version ID in HTTP header if present
|
|
if response.VersionId != nil {
|
|
w.Header().Set("x-amz-version-id", *response.VersionId)
|
|
}
|
|
|
|
// Set composite checksum header if present
|
|
if response.ChecksumHeaderName != "" && response.ChecksumValue != "" {
|
|
w.Header().Set(response.ChecksumHeaderName, response.ChecksumValue)
|
|
}
|
|
|
|
stats_collect.RecordBucketActiveTime(bucket)
|
|
stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc()
|
|
|
|
writeSuccessResponseXML(w, r, response)
|
|
|
|
}
|
|
|
|
// AbortMultipartUploadHandler - Aborts multipart upload.
|
|
func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
|
|
// Check if bucket exists before aborting multipart upload
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
if err := s3a.validateTableBucketObjectPath(bucket, object); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
|
|
return
|
|
}
|
|
|
|
// Get upload id.
|
|
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
|
err := s3a.checkUploadId(object, uploadID)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
|
|
return
|
|
}
|
|
|
|
response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: objectKey(aws.String(object)),
|
|
UploadId: aws.String(uploadID),
|
|
})
|
|
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
glog.V(3).Info("AbortMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)))
|
|
|
|
//https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
|
|
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
|
|
s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
|
|
|
|
}
|
|
|
|
// ListMultipartUploadsHandler - Lists multipart uploads.
|
|
func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
|
|
// Check if bucket exists before listing multipart uploads
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
|
|
if maxUploads < 0 {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxUploads)
|
|
return
|
|
}
|
|
if keyMarker != "" {
|
|
// Marker not common with prefix is not implemented.
|
|
if !strings.HasPrefix(keyMarker, prefix) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
|
|
return
|
|
}
|
|
}
|
|
|
|
response, errCode := s3a.listMultipartUploads(&s3.ListMultipartUploadsInput{
|
|
Bucket: aws.String(bucket),
|
|
Delimiter: aws.String(delimiter),
|
|
EncodingType: aws.String(encodingType),
|
|
KeyMarker: aws.String(keyMarker),
|
|
MaxUploads: aws.Int64(int64(maxUploads)),
|
|
Prefix: aws.String(prefix),
|
|
UploadIdMarker: aws.String(uploadIDMarker),
|
|
})
|
|
|
|
glog.V(3).Infof("ListMultipartUploadsHandler %s errCode=%d", string(s3err.EncodeXMLResponse(response)), errCode)
|
|
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
// TODO handle encodingType
|
|
|
|
writeSuccessResponseXML(w, r, response)
|
|
}
|
|
|
|
// ListObjectPartsHandler - Lists object parts in a multipart upload.
|
|
func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
|
|
// Check if bucket exists before listing object parts
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
|
|
if partNumberMarker < 0 {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPartNumberMarker)
|
|
return
|
|
}
|
|
if maxParts < 0 {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
|
|
return
|
|
}
|
|
|
|
err := s3a.checkUploadId(object, uploadID)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
|
|
return
|
|
}
|
|
|
|
response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: objectKey(aws.String(object)),
|
|
MaxParts: aws.Int64(int64(maxParts)),
|
|
PartNumberMarker: aws.Int64(int64(partNumberMarker)),
|
|
UploadId: aws.String(uploadID),
|
|
})
|
|
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
glog.V(3).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part))
|
|
|
|
writeSuccessResponseXML(w, r, response)
|
|
|
|
}
|
|
|
|
// PutObjectPartHandler - Put an object part in a multipart upload.
|
|
func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
_, err := validateContentMd5(r.Header)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
|
|
return
|
|
}
|
|
// Check if bucket exists before putting object part
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
uploadID := r.URL.Query().Get("uploadId")
|
|
// validateTableBucketObjectPath is enforced at multipart initiation. checkUploadId
|
|
// cryptographically binds uploadID to object path, so parts cannot switch paths.
|
|
err = s3a.checkUploadId(object, uploadID)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
|
|
return
|
|
}
|
|
|
|
partIDString := r.URL.Query().Get("partNumber")
|
|
partID, err := strconv.Atoi(partIDString)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
|
|
return
|
|
}
|
|
if partID > s3_constants.MaxS3MultipartParts {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
|
|
return
|
|
}
|
|
if partID < 1 {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
|
|
return
|
|
}
|
|
|
|
dataReader, s3ErrCode := getRequestDataReader(s3a, r)
|
|
if s3ErrCode != s3err.ErrNone {
|
|
glog.Errorf("PutObjectPartHandler: getRequestDataReader failed with code %v", s3ErrCode)
|
|
s3err.WriteErrorResponse(w, r, s3ErrCode)
|
|
return
|
|
}
|
|
defer dataReader.Close()
|
|
|
|
glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
|
|
|
|
// Verify the multipart upload exists (rejects parts after abort)
|
|
uploadEntry, err := s3a.getEntry(s3a.genUploadsFolder(bucket), uploadID)
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
|
|
return
|
|
} else if err != nil {
|
|
glog.Errorf("Could not retrieve upload entry for %s/%s: %v", bucket, uploadID, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// Apply SSE settings from the upload entry (unless SSE-C headers are already present)
|
|
sseCustomerAlgorithm := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm)
|
|
if sseCustomerAlgorithm == "" && uploadEntry.Extended != nil {
|
|
if keyIDBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; exists {
|
|
keyID := string(keyIDBytes)
|
|
|
|
bucketKeyEnabled := false
|
|
if bucketKeyBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSBucketKeyEnabled]; exists && string(bucketKeyBytes) == "true" {
|
|
bucketKeyEnabled = true
|
|
}
|
|
|
|
var encryptionContext map[string]string
|
|
if contextBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSEncryptionContext]; exists {
|
|
if err := json.Unmarshal(contextBytes, &encryptionContext); err != nil {
|
|
glog.Errorf("Failed to parse encryption context for upload %s: %v", uploadID, err)
|
|
encryptionContext = BuildEncryptionContext(bucket, object, bucketKeyEnabled)
|
|
}
|
|
} else {
|
|
encryptionContext = BuildEncryptionContext(bucket, object, bucketKeyEnabled)
|
|
}
|
|
|
|
var baseIV []byte
|
|
if baseIVBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSBaseIV]; exists {
|
|
decodedIV, decodeErr := base64.StdEncoding.DecodeString(string(baseIVBytes))
|
|
if decodeErr != nil {
|
|
glog.Errorf("Failed to decode base IV for multipart upload %s: %v", uploadID, decodeErr)
|
|
} else if len(decodedIV) != s3_constants.AESBlockSize {
|
|
glog.Errorf("Invalid base IV length for multipart upload %s: expected %d bytes, got %d", uploadID, s3_constants.AESBlockSize, len(decodedIV))
|
|
} else {
|
|
baseIV = decodedIV
|
|
glog.V(4).Infof("Using stored base IV %x for multipart upload %s", baseIV[:8], uploadID)
|
|
}
|
|
}
|
|
|
|
if len(baseIV) == 0 {
|
|
glog.Errorf("No valid base IV found for SSE-KMS multipart upload %s - cannot proceed with encryption", uploadID)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
r.Header.Set(s3_constants.AmzServerSideEncryption, "aws:kms")
|
|
r.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID)
|
|
if bucketKeyEnabled {
|
|
r.Header.Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
|
|
}
|
|
if len(encryptionContext) > 0 {
|
|
if contextJSON, err := json.Marshal(encryptionContext); err == nil {
|
|
r.Header.Set(s3_constants.AmzServerSideEncryptionContext, base64.StdEncoding.EncodeToString(contextJSON))
|
|
}
|
|
}
|
|
|
|
r.Header.Set(s3_constants.SeaweedFSSSEKMSBaseIVHeader, base64.StdEncoding.EncodeToString(baseIV))
|
|
} else {
|
|
if err := s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID); err != nil {
|
|
glog.Errorf("Failed to setup SSE-S3 multipart headers: %v", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
filePath := s3a.genPartUploadPath(bucket, uploadID, partID)
|
|
|
|
if partID == 1 && r.Header.Get("Content-Type") == "" {
|
|
dataReader = mimeDetect(r, dataReader)
|
|
}
|
|
|
|
glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d",
|
|
bucket, object, uploadID, partID, r.ContentLength)
|
|
|
|
// MPU parts must NOT inherit the bucket's lifecycle Expiration.Days
|
|
// volume TTL: the rule targets the user-visible object, not the
|
|
// transient .uploads/<id>/<n> path, and a part write would otherwise
|
|
// start the TTL clock before CompleteMultipartUpload ever assembled
|
|
// the object.
|
|
etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, "", partID, 0, nil)
|
|
if errCode != s3err.ErrNone {
|
|
glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d",
|
|
errCode, bucket, object, partID)
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
glog.V(2).Infof("PutObjectPart: SUCCESS - bucket=%s, object=%s, partNumber=%d, etag=%s, sseType=%s",
|
|
bucket, object, partID, etag, sseMetadata.SSEType)
|
|
|
|
setEtag(w, etag)
|
|
|
|
// Set SSE response headers for multipart uploads
|
|
s3a.setSSEResponseHeaders(w, r, sseMetadata)
|
|
|
|
writeSuccessResponseEmpty(w, r)
|
|
|
|
}
|
|
|
|
func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
|
|
return fmt.Sprintf("%s/%s", s3a.bucketDir(bucket), s3_constants.MultipartUploadsFolder)
|
|
}
|
|
|
|
// getMultipartSSEAlgorithm returns the canonical SSE algorithm ("AES256" or
|
|
// "aws:kms") that was stored when the multipart upload was initiated, or ""
|
|
// if the upload entry is not found or had no SSE. It is used by the bucket
|
|
// policy engine to evaluate s3:x-amz-server-side-encryption conditions for
|
|
// UploadPart and UploadPartCopy, which do not re-send the SSE header.
|
|
func (s3a *S3ApiServer) getMultipartSSEAlgorithm(bucket, uploadID string) string {
|
|
entry, err := s3a.getEntry(s3a.genUploadsFolder(bucket), uploadID)
|
|
if err != nil || entry == nil || entry.Extended == nil {
|
|
return ""
|
|
}
|
|
if _, ok := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; ok {
|
|
return "aws:kms"
|
|
}
|
|
if _, ok := entry.Extended[s3_constants.SeaweedFSSSES3Encryption]; ok {
|
|
return "AES256"
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (s3a *S3ApiServer) genPartUploadPath(bucket, uploadID string, partID int) string {
|
|
// Returns just the file path - no filer address needed
|
|
// Upload traffic goes directly to volume servers, not through filer
|
|
return fmt.Sprintf("%s/%s/%04d_%s.part",
|
|
s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
|
|
}
|
|
|
|
// Generate uploadID hash string from object
|
|
func (s3a *S3ApiServer) generateUploadID(object string) string {
|
|
|
|
object = strings.TrimPrefix(object, "/")
|
|
h := sha1.New()
|
|
h.Write([]byte(object))
|
|
return fmt.Sprintf("%x", h.Sum(nil))
|
|
}
|
|
|
|
// Check object name and uploadID when processing multipart uploading
|
|
func (s3a *S3ApiServer) checkUploadId(object string, id string) error {
|
|
|
|
hash := s3a.generateUploadID(object)
|
|
|
|
if !strings.HasPrefix(id, hash) {
|
|
glog.Errorf("object %s and uploadID %s are not matched", object, id)
|
|
return fmt.Errorf("object %s and uploadID %s are not matched", object, id)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Parse bucket url queries for ?uploads
|
|
func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
|
|
prefix = values.Get("prefix")
|
|
keyMarker = values.Get("key-marker")
|
|
uploadIDMarker = values.Get("upload-id-marker")
|
|
delimiter = values.Get("delimiter")
|
|
if values.Get("max-uploads") != "" {
|
|
maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
|
|
} else {
|
|
maxUploads = maxUploadsList
|
|
}
|
|
encodingType = values.Get("encoding-type")
|
|
return
|
|
}
|
|
|
|
// Parse object url queries
|
|
func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
|
|
uploadID = values.Get("uploadId")
|
|
partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
|
|
if values.Get("max-parts") != "" {
|
|
maxParts, _ = strconv.Atoi(values.Get("max-parts"))
|
|
} else {
|
|
maxParts = maxPartsList
|
|
}
|
|
encodingType = values.Get("encoding-type")
|
|
return
|
|
}
|
|
|
|
func xmlDecoder(body io.Reader, v interface{}, size int64) error {
|
|
var lbody io.Reader
|
|
if size > 0 {
|
|
lbody = io.LimitReader(body, size)
|
|
} else {
|
|
lbody = body
|
|
}
|
|
d := xml.NewDecoder(lbody)
|
|
d.CharsetReader = func(label string, input io.Reader) (io.Reader, error) {
|
|
return input, nil
|
|
}
|
|
return d.Decode(v)
|
|
}
|
|
|
|
type CompleteMultipartUpload struct {
|
|
Parts []CompletedPart `xml:"Part"`
|
|
}
|
|
type CompletedPart struct {
|
|
ETag string
|
|
PartNumber int
|
|
}
|
|
|
|
// handleSSES3MultipartHeaders handles SSE-S3 multipart upload header setup to reduce nesting complexity
|
|
func (s3a *S3ApiServer) handleSSES3MultipartHeaders(r *http.Request, uploadEntry *filer_pb.Entry, uploadID string) error {
|
|
if encryptionTypeBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSES3Encryption]; exists && string(encryptionTypeBytes) == s3_constants.SSEAlgorithmAES256 {
|
|
|
|
// Set SSE-S3 headers to indicate server-side encryption
|
|
r.Header.Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
|
|
|
|
// Retrieve and set base IV for consistent multipart encryption - REQUIRED for security
|
|
var baseIV []byte
|
|
if baseIVBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSES3BaseIV]; exists {
|
|
// Decode the base64 encoded base IV
|
|
decodedIV, decodeErr := base64.StdEncoding.DecodeString(string(baseIVBytes))
|
|
if decodeErr != nil {
|
|
return fmt.Errorf("failed to decode base IV for SSE-S3 multipart upload %s: %v", uploadID, decodeErr)
|
|
}
|
|
if len(decodedIV) != s3_constants.AESBlockSize {
|
|
return fmt.Errorf("invalid base IV length for SSE-S3 multipart upload %s: expected %d bytes, got %d", uploadID, s3_constants.AESBlockSize, len(decodedIV))
|
|
}
|
|
baseIV = decodedIV
|
|
glog.V(4).Infof("Using stored base IV %x for SSE-S3 multipart upload %s", baseIV[:8], uploadID)
|
|
} else {
|
|
return fmt.Errorf("no base IV found for SSE-S3 multipart upload %s - required for encryption consistency", uploadID)
|
|
}
|
|
|
|
// Retrieve and set key data for consistent multipart encryption - REQUIRED for decryption
|
|
if keyDataBytes, exists := uploadEntry.Extended[s3_constants.SeaweedFSSSES3KeyData]; exists {
|
|
// Key data is already base64 encoded, pass it directly
|
|
keyDataStr := string(keyDataBytes)
|
|
r.Header.Set(s3_constants.SeaweedFSSSES3KeyDataHeader, keyDataStr)
|
|
glog.V(4).Infof("Using stored key data for SSE-S3 multipart upload %s", uploadID)
|
|
} else {
|
|
return fmt.Errorf("no SSE-S3 key data found for multipart upload %s - required for encryption", uploadID)
|
|
}
|
|
|
|
// Pass the base IV to putToFiler via header for offset calculation
|
|
r.Header.Set(s3_constants.SeaweedFSSSES3BaseIVHeader, base64.StdEncoding.EncodeToString(baseIV))
|
|
|
|
}
|
|
return nil
|
|
}
|