mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
8d59069a0a
* s3: return BucketAlreadyOwnedByYou when recreating your own bucket PutBucket returned BucketAlreadyExists for every existing bucket, even when the caller already owns it, so idempotent re-creation (e.g. a container that creates its bucket on startup) couldn't tell "someone else took the name" from "it's already mine". Recreating a bucket you own now returns BucketAlreadyOwnedByYou, unless the request conflicts with the existing bucket: a different Object Lock setting, or an ACL on the request or the existing bucket. To detect the latter, a requested non-default canned/grant ACL is now persisted on creation instead of being dropped. * s3: fail PutBucket when the existing bucket's config can't be read When a bucket already exists, an unreadable config left the recreate defaulting to BucketAlreadyOwnedByYou, masking the backend error and possibly accepting a conflicting recreate (Object Lock / ACL unknown). Surface the read error instead. * s3: return the stored bucket ACL from GetBucketAcl GetBucketAcl always returned the owner's default full-control grant and ignored any stored ACL, so a bucket created with a canned ACL or one set via PutBucketAcl never read back correctly. Decode the stored grants instead, sharing one grants-to-XML helper with the object ACL handler. The shared helper also emits each grantee's real xsi:type (e.g. Group for public-read) instead of a hardcoded CanonicalUser, so group grants read back correctly for both bucket and object ACLs. * s3: resolve the right already-exists error on the concurrent-create race When two requests create the same bucket at once, the loser's mkdir fails and the handler fell back to a flat BucketAlreadyExists, bypassing the same-owner idempotency check. Route both the pre-check and the race fallback through one existingBucketError helper so a same-owner recreate still gets BucketAlreadyOwnedByYou. * s3: record the bucket owner's account id at creation setBucketOwner only stored the creating identity name, so the canonical account id wasn't available later. Persist it under ExtAmzOwnerKey too, the same field PutBucketAcl writes, so the bucket owner can be reported independently of whoever reads it. * s3: report the bucket owner from GetBucketAcl, not the caller GetBucketAcl built the ACL Owner from the caller's account header, so an admin or cross-account read returned the wrong owner. Use the owner persisted on the bucket, falling back to the caller only when none is recorded.
1340 lines
49 KiB
Go
1340 lines
49 KiB
Go
package s3api
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"encoding/xml"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/lifecycle_xml"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
|
)
|
|
|
|
func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
glog.V(3).Infof("ListBucketsHandler")
|
|
|
|
// Get authenticated identity from context (set by Auth middleware)
|
|
// For unauthenticated requests, this returns empty string
|
|
identityId := s3_constants.GetIdentityNameFromContext(r)
|
|
|
|
// Get the full identity object for permission and ownership checks
|
|
// This is especially important for JWT users whose identity is not in the identities list
|
|
// Note: We store the full Identity object in context for simplicity. Future optimization
|
|
// could use a lightweight, credential-free view (name, account, actions, principal ARN)
|
|
// for better data minimization.
|
|
var identity *Identity
|
|
if s3a.iam.isEnabled() {
|
|
// Try to get the full identity from context first (works for all auth types including JWT)
|
|
if identityObj := s3_constants.GetIdentityFromContext(r); identityObj != nil {
|
|
if id, ok := identityObj.(*Identity); ok {
|
|
identity = id
|
|
} else {
|
|
glog.Warningf("ListBucketsHandler: identity object in context has unexpected type: %T", identityObj)
|
|
}
|
|
}
|
|
// Fallback to looking up by name if not in context (backward compatibility)
|
|
if identity == nil && identityId != "" {
|
|
identity = s3a.iam.lookupByIdentityName(identityId)
|
|
}
|
|
}
|
|
|
|
var response ListAllMyBucketsResult
|
|
|
|
entries, _, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32)
|
|
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
var listBuckets ListAllMyBucketsList
|
|
for _, entry := range entries {
|
|
if entry.IsDirectory {
|
|
if strings.HasPrefix(entry.Name, ".") {
|
|
continue
|
|
}
|
|
// Unauthenticated users should not see any buckets
|
|
if identity == nil {
|
|
continue
|
|
}
|
|
|
|
// Check if bucket should be visible to this identity
|
|
// A bucket is visible if the user owns it OR has explicit permission to list it
|
|
isOwner := isBucketOwnedByIdentity(entry, identity)
|
|
|
|
// Skip permission check if user is already the owner (optimization)
|
|
if !isOwner {
|
|
if errCode := s3a.iam.VerifyActionPermission(r, identity, s3_constants.ACTION_LIST, entry.Name, ""); errCode != s3err.ErrNone {
|
|
continue
|
|
}
|
|
}
|
|
|
|
listBuckets.Bucket = append(listBuckets.Bucket, ListAllMyBucketsEntry{
|
|
Name: entry.Name,
|
|
CreationDate: time.Unix(entry.Attributes.Crtime, 0).UTC(),
|
|
})
|
|
}
|
|
}
|
|
|
|
response = ListAllMyBucketsResult{
|
|
Owner: CanonicalUser{
|
|
ID: identityId,
|
|
DisplayName: identityId,
|
|
},
|
|
Buckets: listBuckets,
|
|
}
|
|
|
|
glog.V(3).Infof("ListBucketsHandler response: %+v", response)
|
|
writeSuccessResponseXML(w, r, response)
|
|
}
|
|
|
|
// isBucketOwnedByIdentity checks if a bucket entry is owned by the given identity.
|
|
// Returns true if the identity owns the bucket, false otherwise.
|
|
//
|
|
// Ownership rules:
|
|
// - Admin users: considered owners of all buckets
|
|
// - Non-admin users: own buckets where AmzIdentityId matches identity.Name
|
|
// - Buckets without owner metadata are not owned by anyone (except admins)
|
|
func isBucketOwnedByIdentity(entry *filer_pb.Entry, identity *Identity) bool {
|
|
if !entry.IsDirectory {
|
|
return false
|
|
}
|
|
|
|
if identity == nil {
|
|
return false
|
|
}
|
|
|
|
// Admin users are considered owners of all buckets
|
|
if identity.isAdmin() {
|
|
return true
|
|
}
|
|
|
|
// Non-admin users with no name cannot own buckets.
|
|
// This prevents misconfigured identities from matching buckets with empty owner IDs.
|
|
if identity.Name == "" {
|
|
return false
|
|
}
|
|
|
|
// Check ownership via AmzIdentityId metadata
|
|
id, ok := entry.Extended[s3_constants.AmzIdentityId]
|
|
if !ok || string(id) != identity.Name {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
// collect parameters
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
|
|
// validate the bucket name
|
|
err := s3bucket.VerifyS3BucketName(bucket)
|
|
if err != nil {
|
|
glog.Errorf("put invalid bucket name: %v %v", bucket, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidBucketName)
|
|
return
|
|
}
|
|
|
|
// Check if bucket already exists and handle ownership/settings
|
|
// Get authenticated identity from context (secure, cannot be spoofed)
|
|
currentIdentityId := s3_constants.GetIdentityNameFromContext(r)
|
|
|
|
// Parse any requested bucket ACL (canned ACL or grant headers) up front so it
|
|
// can be validated, persisted on creation, and factored into the already-exists
|
|
// response. A "private" canned ACL is the default and counts as no explicit ACL.
|
|
requestHasACL := hasExplicitBucketACL(r)
|
|
var aclGrantsBytes []byte
|
|
if requestHasACL {
|
|
accountId := getAccountId(r)
|
|
_, grants, errCode := ParseAndValidateAclHeaders(r, s3a.iam, "", accountId, accountId, false)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
if len(grants) > 0 {
|
|
grantsBytes, err := json.Marshal(grants)
|
|
if err != nil {
|
|
glog.Errorf("PutBucketHandler: marshal ACL grants for %s: %v", bucket, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
aclGrantsBytes = grantsBytes
|
|
}
|
|
}
|
|
|
|
// Check collection existence first
|
|
collectionExists := false
|
|
if s3a.isTableBucket(bucket) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists)
|
|
return
|
|
}
|
|
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{
|
|
IncludeEcVolumes: true,
|
|
IncludeNormalVolumes: true,
|
|
}); err != nil {
|
|
glog.Errorf("list collection: %v", err)
|
|
return fmt.Errorf("list collections: %w", err)
|
|
} else {
|
|
for _, c := range resp.Collections {
|
|
if s3a.getCollectionName(bucket) == c.Name {
|
|
collectionExists = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// Bucket already exists: report whether the caller already owns it or the
|
|
// name is taken / the request conflicts.
|
|
if exist, err := s3a.exists(s3a.option.BucketsPath, bucket, true); err == nil && exist {
|
|
s3err.WriteErrorResponse(w, r, s3a.existingBucketError(r, bucket, currentIdentityId, requestHasACL))
|
|
return
|
|
}
|
|
|
|
// If collection exists but bucket directory doesn't, this is an inconsistent state
|
|
// that can occur when a previous bucket deletion partially completed (collection
|
|
// deletion failed but directory deletion succeeded, or volumes were recreated).
|
|
// Recover by proceeding to create the missing bucket directory.
|
|
if collectionExists {
|
|
glog.Warningf("PutBucketHandler: collection exists but bucket directory missing for %s, recovering by creating bucket directory", bucket)
|
|
}
|
|
|
|
// Check for x-amz-bucket-object-lock-enabled header BEFORE creating bucket
|
|
// This allows us to create the bucket with Object Lock configuration atomically
|
|
objectLockEnabled := strings.EqualFold(r.Header.Get(s3_constants.AmzBucketObjectLockEnabled), "true")
|
|
|
|
// Capture any Object Lock configuration error from within the callback
|
|
// The mkdir callback doesn't support returning errors, so we capture it here
|
|
var objectLockSetupError error
|
|
|
|
// Create the folder for bucket with all settings atomically
|
|
// This ensures Object Lock configuration is set in the same CreateEntry call,
|
|
// preventing race conditions where the bucket exists without Object Lock enabled
|
|
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, func(entry *filer_pb.Entry) {
|
|
// Set bucket owner
|
|
setBucketOwner(r)(entry)
|
|
|
|
// Persist a requested non-default ACL so GetBucketAcl and idempotent
|
|
// recreation observe it (private is the default and is not stored).
|
|
if len(aclGrantsBytes) > 0 {
|
|
if entry.Extended == nil {
|
|
entry.Extended = make(map[string][]byte)
|
|
}
|
|
entry.Extended[s3_constants.ExtAmzAclKey] = aclGrantsBytes
|
|
}
|
|
|
|
// Set Object Lock configuration atomically during bucket creation
|
|
if objectLockEnabled {
|
|
glog.V(3).Infof("PutBucketHandler: enabling Object Lock and Versioning for bucket %s atomically", bucket)
|
|
|
|
if entry.Extended == nil {
|
|
entry.Extended = make(map[string][]byte)
|
|
}
|
|
|
|
// Enable versioning (required for Object Lock)
|
|
entry.Extended[s3_constants.ExtVersioningKey] = []byte(s3_constants.VersioningEnabled)
|
|
|
|
// Create and store Object Lock configuration
|
|
objectLockConfig := &ObjectLockConfiguration{
|
|
ObjectLockEnabled: s3_constants.ObjectLockEnabled,
|
|
}
|
|
if err := StoreObjectLockConfigurationInExtended(entry, objectLockConfig); err != nil {
|
|
glog.Errorf("PutBucketHandler: failed to store Object Lock config for bucket %s: %v", bucket, err)
|
|
objectLockSetupError = err
|
|
// Note: The entry will still be created, but we'll roll it back below
|
|
} else {
|
|
glog.V(3).Infof("PutBucketHandler: set ObjectLockConfig for bucket %s: %+v", bucket, objectLockConfig)
|
|
}
|
|
}
|
|
}); err != nil {
|
|
// If mkdir failed because another request created the bucket concurrently,
|
|
// return the appropriate already-exists error instead of InternalError.
|
|
if exist, checkErr := s3a.exists(s3a.option.BucketsPath, bucket, true); checkErr == nil && exist {
|
|
glog.V(3).Infof("PutBucketHandler: bucket %s was created concurrently", bucket)
|
|
s3err.WriteErrorResponse(w, r, s3a.existingBucketError(r, bucket, currentIdentityId, requestHasACL))
|
|
return
|
|
}
|
|
glog.Errorf("PutBucketHandler mkdir: %v", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// If Object Lock setup failed, roll back the bucket creation
|
|
// This ensures we don't leave a bucket without the requested Object Lock configuration
|
|
if objectLockSetupError != nil {
|
|
glog.Errorf("PutBucketHandler: rolling back bucket %s creation due to Object Lock setup failure: %v", bucket, objectLockSetupError)
|
|
if deleteErr := s3a.rm(s3a.option.BucketsPath, bucket, true, true); deleteErr != nil {
|
|
glog.Errorf("PutBucketHandler: failed to rollback bucket %s after Object Lock setup failure: %v", bucket, deleteErr)
|
|
}
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// Remove bucket from negative cache after successful creation
|
|
if s3a.bucketConfigCache != nil {
|
|
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
|
|
}
|
|
|
|
w.Header().Set("Location", "/"+bucket)
|
|
writeSuccessResponseEmpty(w, r)
|
|
}
|
|
|
|
func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("DeleteBucketHandler %s", bucket)
|
|
|
|
if s3a.isTableBucket(bucket) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
|
|
return
|
|
}
|
|
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
// Check if bucket has object lock enabled
|
|
bucketConfig, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
// If object lock is enabled, check for objects with active locks
|
|
if bucketConfig.ObjectLockConfig != nil {
|
|
hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(r.Context(), bucket)
|
|
if checkErr != nil {
|
|
glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
if hasLockedObjects {
|
|
glog.V(3).Infof("DeleteBucketHandler: bucket %s has objects with active object locks, cannot delete", bucket)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty)
|
|
return
|
|
}
|
|
}
|
|
|
|
if !s3a.option.AllowDeleteBucketNotEmpty {
|
|
if hasUserObjects, err := s3a.bucketHasUserObjects(bucket); err != nil {
|
|
glog.Errorf("failed to list bucket %s: %v", bucket, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
} else if hasUserObjects {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Delete bucket directory first, then collection. This order ensures that if
|
|
// collection deletion fails, the bucket directory is already gone, preventing
|
|
// the "collection exists but bucket directory missing" inconsistency that blocks
|
|
// bucket recreation. An orphaned collection is harmless and will be cleaned up
|
|
// or reused when the bucket is recreated.
|
|
err := s3a.rm(s3a.option.BucketsPath, bucket, false, true)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
|
|
Collection: s3a.getCollectionName(bucket),
|
|
}
|
|
|
|
glog.V(1).Infof("delete collection: %v", deleteCollectionRequest)
|
|
if _, err := client.DeleteCollection(context.Background(), deleteCollectionRequest); err != nil {
|
|
return fmt.Errorf("delete collection %s: %v", bucket, err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
// Log but don't fail — the bucket directory is already removed, so the bucket
|
|
// is effectively deleted. The orphaned collection will be cleaned up or reused.
|
|
glog.Errorf("DeleteBucketHandler: failed to delete collection for bucket %s: %v", bucket, err)
|
|
}
|
|
|
|
// Clean up bucket-related caches, locks, and metrics after successful deletion
|
|
s3a.invalidateBucketConfigCache(bucket)
|
|
stats_collect.DeleteBucketMetrics(bucket)
|
|
|
|
// Prune identity actions that were scoped to this bucket via s3.configure.
|
|
// Use a bounded background context so the cleanup survives client disconnect;
|
|
// the bucket is already gone and this is best-effort bookkeeping.
|
|
if s3a.iam != nil {
|
|
pruneCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
if _, err := s3a.iam.PruneBucketFromConfiguration(pruneCtx, bucket); err != nil {
|
|
glog.Errorf("DeleteBucketHandler: failed to prune IAM actions for bucket %s: %v", bucket, err)
|
|
}
|
|
cancel()
|
|
}
|
|
|
|
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
|
|
}
|
|
|
|
// bucketHasUserObjects checks whether a bucket contains any non-special entries.
|
|
// Special entries (.uploads, *.versions) are internal to S3 and don't count as user objects.
|
|
func (s3a *S3ApiServer) bucketHasUserObjects(bucket string) (bool, error) {
|
|
bucketPath := s3a.option.BucketsPath + "/" + bucket
|
|
startFrom := ""
|
|
// Start with a small batch — most non-empty buckets have a real object early.
|
|
// If we only find special entries, switch to larger batches to page through quickly.
|
|
limit := uint32(10)
|
|
for {
|
|
entries, isLast, err := s3a.list(bucketPath, "", startFrom, false, limit)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, entry := range entries {
|
|
if entry.Name != s3_constants.MultipartUploadsFolder &&
|
|
!strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
|
return true, nil
|
|
}
|
|
startFrom = entry.Name
|
|
}
|
|
if isLast {
|
|
return false, nil
|
|
}
|
|
limit = 1000
|
|
}
|
|
}
|
|
|
|
// hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
|
|
// Delegates to the shared HasObjectsWithActiveLocks function in object_lock_utils.go
|
|
func (s3a *S3ApiServer) hasObjectsWithActiveLocks(ctx context.Context, bucket string) (bool, error) {
|
|
bucketPath := s3a.option.BucketsPath + "/" + bucket
|
|
var hasLocks bool
|
|
var checkErr error
|
|
|
|
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
hasLocks, checkErr = HasObjectsWithActiveLocks(ctx, client, bucketPath)
|
|
return checkErr
|
|
})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return hasLocks, nil
|
|
}
|
|
|
|
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("HeadBucketHandler %s", bucket)
|
|
|
|
if entry, err := s3a.getBucketEntry(bucket); entry == nil || errors.Is(err, filer_pb.ErrNotFound) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
|
return
|
|
}
|
|
|
|
writeSuccessResponseEmpty(w, r)
|
|
}
|
|
|
|
func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode {
|
|
// Use cached bucket config instead of direct getEntry call (optimization)
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
return errCode
|
|
}
|
|
|
|
//if iam is enabled, the access was already checked before
|
|
if s3a.iam.isEnabled() {
|
|
return s3err.ErrNone
|
|
}
|
|
if !s3a.hasAccess(r, config.Entry) {
|
|
return s3err.ErrAccessDenied
|
|
}
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// ErrAutoCreatePermissionDenied is returned when a user lacks permission to auto-create buckets
|
|
var ErrAutoCreatePermissionDenied = errors.New("permission denied - requires Admin permission")
|
|
|
|
// ErrInvalidBucketName is returned when a bucket name doesn't meet S3 naming requirements
|
|
var ErrInvalidBucketName = errors.New("invalid bucket name")
|
|
|
|
// existingBucketError returns the error for a PutBucket whose target bucket
|
|
// already exists: BucketAlreadyOwnedByYou for an idempotent recreate by the
|
|
// owner, or BucketAlreadyExists when the name is owned by someone else or the
|
|
// request conflicts with the existing bucket (a different Object Lock setting,
|
|
// or an ACL on the request or the existing bucket).
|
|
func (s3a *S3ApiServer) existingBucketError(r *http.Request, bucket, currentIdentityId string, requestHasACL bool) s3err.ErrorCode {
|
|
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
|
if err != nil {
|
|
// We just observed the bucket exists but can't read it; report it as taken.
|
|
glog.Errorf("PutBucketHandler: failed to read existing bucket %s: %v", bucket, err)
|
|
return s3err.ErrBucketAlreadyExists
|
|
}
|
|
|
|
var existingOwnerId string
|
|
if entry.Extended != nil {
|
|
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
|
|
existingOwnerId = string(id)
|
|
}
|
|
}
|
|
|
|
// Different owner: the name is taken in the shared namespace.
|
|
if existingOwnerId != "" && existingOwnerId != currentIdentityId {
|
|
glog.V(3).Infof("PutBucketHandler: bucket %s owned by %s, requested by %s", bucket, existingOwnerId, currentIdentityId)
|
|
return s3err.ErrBucketAlreadyExists
|
|
}
|
|
|
|
// Same owner (or an unowned bucket the caller can claim). Recreating your own
|
|
// bucket is idempotent and returns BucketAlreadyOwnedByYou, unless the request
|
|
// conflicts with the existing bucket: a different Object Lock setting, or an ACL
|
|
// on the request or the existing bucket.
|
|
// (s3-tests: test_bucket_create_exists vs test_bucket_recreate_*_acl.)
|
|
if requestHasACL {
|
|
return s3err.ErrBucketAlreadyExists
|
|
}
|
|
|
|
objectLockRequested := strings.EqualFold(r.Header.Get(s3_constants.AmzBucketObjectLockEnabled), "true")
|
|
bucketConfig, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
// Can't read the existing bucket's settings, so we can't tell whether this
|
|
// recreate conflicts; surface the failure instead of assuming idempotency.
|
|
glog.Errorf("PutBucketHandler: failed to get bucket config for %s: %v", bucket, errCode)
|
|
return errCode
|
|
}
|
|
currentObjectLockEnabled := bucketConfig.ObjectLockConfig != nil &&
|
|
bucketConfig.ObjectLockConfig.ObjectLockEnabled == s3_constants.ObjectLockEnabled
|
|
if objectLockRequested != currentObjectLockEnabled || len(bucketConfig.ACL) > 0 {
|
|
glog.V(3).Infof("PutBucketHandler: bucket %s already exists", bucket)
|
|
return s3err.ErrBucketAlreadyExists
|
|
}
|
|
|
|
glog.V(3).Infof("PutBucketHandler: bucket %s already owned by requester", bucket)
|
|
return s3err.ErrBucketAlreadyOwnedByYou
|
|
}
|
|
|
|
// hasExplicitBucketACL reports whether the request carries an explicit, non-default
|
|
// bucket ACL via a canned ACL header (other than "private") or grant headers.
|
|
func hasExplicitBucketACL(r *http.Request) bool {
|
|
if canned := r.Header.Get(s3_constants.AmzCannedAcl); canned != "" && !strings.EqualFold(canned, s3_constants.CannedAclPrivate) {
|
|
return true
|
|
}
|
|
for _, h := range []string{s3_constants.AmzAclFullControl, s3_constants.AmzAclRead, s3_constants.AmzAclReadAcp, s3_constants.AmzAclWrite, s3_constants.AmzAclWriteAcp} {
|
|
if r.Header.Get(h) != "" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// setBucketOwner creates a function that sets the bucket owner from the request context
|
|
func setBucketOwner(r *http.Request) func(entry *filer_pb.Entry) {
|
|
currentIdentityId := s3_constants.GetIdentityNameFromContext(r)
|
|
// Record the canonical account id too so GetBucketAcl can report the bucket
|
|
// owner instead of whoever is reading (e.g. an admin or another account).
|
|
accountId := r.Header.Get(s3_constants.AmzAccountId)
|
|
return func(entry *filer_pb.Entry) {
|
|
if currentIdentityId == "" && accountId == "" {
|
|
return
|
|
}
|
|
if entry.Extended == nil {
|
|
entry.Extended = make(map[string][]byte)
|
|
}
|
|
if currentIdentityId != "" {
|
|
entry.Extended[s3_constants.AmzIdentityId] = []byte(currentIdentityId)
|
|
}
|
|
if accountId != "" {
|
|
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(accountId)
|
|
}
|
|
}
|
|
}
|
|
|
|
// autoCreateBucket creates a bucket if it doesn't exist, setting the owner from the request context
|
|
// Only users with admin permissions are allowed to auto-create buckets
|
|
func (s3a *S3ApiServer) autoCreateBucket(r *http.Request, bucket string) error {
|
|
// Validate the bucket name before auto-creating
|
|
if err := s3bucket.VerifyS3BucketName(bucket); err != nil {
|
|
return fmt.Errorf("auto-create bucket %s: %w", bucket, errors.Join(ErrInvalidBucketName, err))
|
|
}
|
|
|
|
// Check if user has admin permissions
|
|
if !s3a.isUserAdmin(r) {
|
|
return fmt.Errorf("auto-create bucket %s: %w", bucket, ErrAutoCreatePermissionDenied)
|
|
}
|
|
|
|
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, setBucketOwner(r)); err != nil {
|
|
// In case of a race condition where another request created the bucket
|
|
// in the meantime, check for existence before returning an error.
|
|
if exist, err2 := s3a.exists(s3a.option.BucketsPath, bucket, true); err2 != nil {
|
|
glog.Warningf("autoCreateBucket: failed to check existence for bucket %s: %v", bucket, err2)
|
|
return fmt.Errorf("failed to auto-create bucket %s: %w", bucket, errors.Join(err, err2))
|
|
} else if exist {
|
|
// The bucket exists, which is fine. However, we should ensure it has an owner.
|
|
// If it was created by a concurrent request that didn't set an owner,
|
|
// we'll set it here to ensure consistency.
|
|
if entry, getErr := s3a.getEntry(s3a.option.BucketsPath, bucket); getErr == nil {
|
|
if entry.Extended == nil || len(entry.Extended[s3_constants.AmzIdentityId]) == 0 {
|
|
// No owner set, assign current admin as owner
|
|
setBucketOwner(r)(entry)
|
|
if updateErr := s3a.updateEntry(s3a.option.BucketsPath, entry); updateErr != nil {
|
|
glog.Warningf("autoCreateBucket: failed to set owner for existing bucket %s: %v", bucket, updateErr)
|
|
} else {
|
|
glog.V(1).Infof("Set owner for existing bucket %s (created by concurrent request)", bucket)
|
|
}
|
|
}
|
|
} else {
|
|
glog.Warningf("autoCreateBucket: failed to get entry for existing bucket %s: %v", bucket, getErr)
|
|
}
|
|
// Remove bucket from negative cache — it exists now
|
|
if s3a.bucketConfigCache != nil {
|
|
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("failed to auto-create bucket %s: %w", bucket, err)
|
|
}
|
|
|
|
// Remove bucket from negative cache after successful creation
|
|
if s3a.bucketConfigCache != nil {
|
|
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
|
|
}
|
|
|
|
glog.V(1).Infof("Auto-created bucket %s", bucket)
|
|
return nil
|
|
}
|
|
|
|
// handleAutoCreateBucket attempts to auto-create a bucket and writes appropriate error responses
|
|
// Returns true if the bucket was created successfully or already exists, false if an error was written
|
|
func (s3a *S3ApiServer) handleAutoCreateBucket(w http.ResponseWriter, r *http.Request, bucket, handlerName string) bool {
|
|
if err := s3a.autoCreateBucket(r, bucket); err != nil {
|
|
glog.Warningf("%s: %v", handlerName, err)
|
|
// Check for specific errors to return appropriate S3 error codes
|
|
if errors.Is(err, ErrInvalidBucketName) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidBucketName)
|
|
} else if errors.Is(err, ErrAutoCreatePermissionDenied) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
|
|
} else {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool {
|
|
// Check if user is properly authenticated as admin through IAM system
|
|
if s3a.isUserAdmin(r) {
|
|
return true
|
|
}
|
|
|
|
if entry.Extended == nil {
|
|
return true
|
|
}
|
|
|
|
// Get authenticated identity from context (secure, cannot be spoofed)
|
|
identityId := s3_constants.GetIdentityNameFromContext(r)
|
|
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
|
|
if identityId != string(id) {
|
|
glog.V(3).Infof("hasAccess: %s != %s (entry.Extended = %v)", identityId, id, entry.Extended)
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// isUserAdmin securely checks if the authenticated user is an admin
|
|
// This validates admin status through proper IAM authentication, not spoofable headers
|
|
func (s3a *S3ApiServer) isUserAdmin(r *http.Request) bool {
|
|
// Use a minimal admin action to authenticate and check admin status
|
|
adminAction := Action("Admin")
|
|
identity, errCode := s3a.iam.authRequest(r, adminAction)
|
|
if errCode != s3err.ErrNone {
|
|
return false
|
|
}
|
|
|
|
// Check if the authenticated identity has admin privileges
|
|
return identity != nil && identity.isAdmin()
|
|
}
|
|
|
|
// isBucketPublicRead checks if a bucket allows anonymous read access based on its cached ACL status
|
|
func (s3a *S3ApiServer) isBucketPublicRead(bucket string) bool {
|
|
// Get bucket configuration which contains cached public-read status
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
glog.V(4).Infof("isBucketPublicRead: failed to get bucket config for %s: %v", bucket, errCode)
|
|
return false
|
|
}
|
|
|
|
glog.V(4).Infof("isBucketPublicRead: bucket=%s, IsPublicRead=%v", bucket, config.IsPublicRead)
|
|
// Return the cached public-read status (no JSON parsing needed)
|
|
return config.IsPublicRead
|
|
}
|
|
|
|
// isPublicReadGrants checks if the grants allow public read access
|
|
func isPublicReadGrants(grants []*s3.Grant) bool {
|
|
for _, grant := range grants {
|
|
if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil {
|
|
// Check for AllUsers group with Read permission
|
|
if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers &&
|
|
(*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// buildResourceARN builds a resource ARN from bucket and object
|
|
// Used by the policy engine wrapper
|
|
func buildResourceARN(bucket, object string) string {
|
|
if object == "" || object == "/" {
|
|
return fmt.Sprintf("arn:aws:s3:::%s", bucket)
|
|
}
|
|
// Remove leading slash if present
|
|
object = strings.TrimPrefix(object, "/")
|
|
return fmt.Sprintf("arn:aws:s3:::%s/%s", bucket, object)
|
|
}
|
|
|
|
// AuthWithPublicRead creates an auth wrapper that allows anonymous access for public-read buckets
|
|
func (s3a *S3ApiServer) AuthWithPublicRead(handler http.HandlerFunc, action Action) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
authType := getRequestAuthType(r)
|
|
isAnonymous := authType == authTypeAnonymous
|
|
|
|
glog.V(4).Infof("AuthWithPublicRead: bucket=%s, object=%s, authType=%v, isAnonymous=%v", bucket, object, authType, isAnonymous)
|
|
|
|
// For anonymous requests, check if bucket allows public read via ACLs or bucket policies
|
|
if isAnonymous {
|
|
// First check ACL-based public access
|
|
isPublic := s3a.isBucketPublicRead(bucket)
|
|
glog.V(4).Infof("AuthWithPublicRead: bucket=%s, isPublicACL=%v", bucket, isPublic)
|
|
if isPublic {
|
|
glog.V(3).Infof("AuthWithPublicRead: allowing anonymous access to public-read bucket %s (ACL)", bucket)
|
|
handler(w, r)
|
|
return
|
|
}
|
|
|
|
// Check bucket policy for anonymous access using the policy engine
|
|
principal := "*" // Anonymous principal
|
|
// Evaluate bucket policy (objectEntry nil - not yet fetched)
|
|
allowed, evaluated, err := s3a.policyEngine.EvaluatePolicy(bucket, object, string(action), principal, r, nil, nil)
|
|
if err != nil {
|
|
// SECURITY: Fail-close on policy evaluation errors
|
|
// If we can't evaluate the policy, deny access rather than falling through to IAM
|
|
glog.Errorf("AuthWithPublicRead: error evaluating bucket policy for %s/%s: %v - denying access", bucket, object, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
|
|
return
|
|
} else if evaluated {
|
|
// A bucket policy exists and was evaluated with a matching statement
|
|
if allowed {
|
|
// Policy explicitly allows anonymous access
|
|
glog.V(3).Infof("AuthWithPublicRead: allowing anonymous access to bucket %s (bucket policy)", bucket)
|
|
handler(w, r)
|
|
return
|
|
} else {
|
|
// Policy explicitly denies anonymous access
|
|
glog.V(3).Infof("AuthWithPublicRead: bucket policy explicitly denies anonymous access to %s/%s", bucket, object)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
|
|
return
|
|
}
|
|
}
|
|
// No matching policy statement - fall through to check ACLs and then IAM auth
|
|
glog.V(3).Infof("AuthWithPublicRead: no bucket policy match for %s, checking ACLs", bucket)
|
|
}
|
|
|
|
// For all authenticated requests and anonymous requests to non-public buckets,
|
|
// use normal IAM auth to enforce policies
|
|
s3a.iam.Auth(handler, action)(w, r)
|
|
}
|
|
}
|
|
|
|
// GetBucketAclHandler Get Bucket ACL
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketAcl.html
|
|
func (s3a *S3ApiServer) GetBucketAclHandler(w http.ResponseWriter, r *http.Request) {
|
|
// collect parameters
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("GetBucketAclHandler %s", bucket)
|
|
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
// Report the bucket's owner (recorded at creation or by PutBucketAcl), not the
|
|
// caller; fall back to the caller only when no owner was persisted. Likewise
|
|
// return any stored ACL, defaulting to the owner's full-control grant.
|
|
ownerId := r.Header.Get(s3_constants.AmzAccountId)
|
|
var storedGrants []*s3.Grant
|
|
if bucketConfig, errCode := s3a.getBucketConfig(bucket); errCode == s3err.ErrNone && bucketConfig.Entry != nil {
|
|
storedGrants = GetAcpGrants(bucketConfig.Entry.Extended)
|
|
if bucketConfig.Owner != "" {
|
|
ownerId = bucketConfig.Owner
|
|
}
|
|
}
|
|
ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
|
|
response := AccessControlPolicy{
|
|
Owner: CanonicalUser{
|
|
ID: ownerId,
|
|
DisplayName: ownerDisplayName,
|
|
},
|
|
AccessControlList: buildAccessControlList(s3a.iam, storedGrants, ownerId, ownerDisplayName),
|
|
}
|
|
writeSuccessResponseXML(w, r, response)
|
|
}
|
|
|
|
// PutBucketAclHandler Put bucket ACL
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketAcl.html //
|
|
func (s3a *S3ApiServer) PutBucketAclHandler(w http.ResponseWriter, r *http.Request) {
|
|
// collect parameters
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("PutBucketAclHandler %s", bucket)
|
|
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
// Get account information for ACL processing
|
|
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
|
|
|
|
// Get bucket ownership settings (these would be used for ownership validation in a full implementation)
|
|
bucketOwnership := "" // Default/simplified for now - in a full implementation this would be retrieved from bucket config
|
|
bucketOwnerId := amzAccountId // Simplified - bucket owner is current account
|
|
|
|
// Use the existing ACL parsing logic to handle both canned ACLs and XML body
|
|
grants, errCode := ExtractAcl(r, s3a.iam, bucketOwnership, bucketOwnerId, amzAccountId, amzAccountId)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
glog.V(3).Infof("PutBucketAclHandler: bucket=%s, extracted %d grants", bucket, len(grants))
|
|
isPublic := isPublicReadGrants(grants)
|
|
glog.V(3).Infof("PutBucketAclHandler: bucket=%s, isPublicReadGrants=%v", bucket, isPublic)
|
|
|
|
// Store the bucket ACL in bucket metadata
|
|
errCode = s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
|
if len(grants) > 0 {
|
|
grantsBytes, err := json.Marshal(grants)
|
|
if err != nil {
|
|
glog.Errorf("PutBucketAclHandler: failed to marshal grants: %v", err)
|
|
return err
|
|
}
|
|
config.ACL = grantsBytes
|
|
// Cache the public-read status to avoid JSON parsing on every request
|
|
config.IsPublicRead = isPublicReadGrants(grants)
|
|
glog.V(4).Infof("PutBucketAclHandler: bucket=%s, setting IsPublicRead=%v", bucket, config.IsPublicRead)
|
|
} else {
|
|
config.ACL = nil
|
|
config.IsPublicRead = false
|
|
}
|
|
config.Owner = amzAccountId
|
|
return nil
|
|
})
|
|
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
glog.V(3).Infof("PutBucketAclHandler: Successfully stored ACL for bucket %s with %d grants", bucket, len(grants))
|
|
|
|
// Small delay to ensure ACL propagation across distributed caches
|
|
// This prevents race conditions in tests where anonymous access is attempted immediately after ACL change
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
writeSuccessResponseEmpty(w, r)
|
|
}
|
|
|
|
// GetBucketLifecycleConfigurationHandler Get Bucket Lifecycle configuration
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html
|
|
func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
|
|
// collect parameters
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("GetBucketLifecycleConfigurationHandler %s", bucket)
|
|
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
if lifecycleXML, transitionMinimumObjectSize, found, errCode := s3a.getStoredBucketLifecycleConfiguration(bucket); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
} else if found {
|
|
w.Header().Set(bucketLifecycleTransitionMinimumObjectSizeHeader, transitionMinimumObjectSize)
|
|
writeSuccessResponseXMLBytes(w, r, lifecycleXML)
|
|
return
|
|
}
|
|
// ReadFilerConfFromFilers provides multi-filer failover
|
|
fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil)
|
|
if err != nil {
|
|
glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
ttls := fc.GetCollectionTtls(s3a.getCollectionName(bucket))
|
|
if len(ttls) == 0 {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration)
|
|
return
|
|
}
|
|
|
|
response := lifecycle_xml.Lifecycle{}
|
|
// Sort locationPrefixes to ensure consistent ordering of lifecycle rules
|
|
var locationPrefixes []string
|
|
for locationPrefix := range ttls {
|
|
locationPrefixes = append(locationPrefixes, locationPrefix)
|
|
}
|
|
sort.Strings(locationPrefixes)
|
|
|
|
for _, locationPrefix := range locationPrefixes {
|
|
internalTtl := ttls[locationPrefix]
|
|
ttl, _ := needle.ReadTTL(internalTtl)
|
|
days := int(ttl.Minutes() / 60 / 24)
|
|
if days == 0 {
|
|
continue
|
|
}
|
|
prefix, found := strings.CutPrefix(locationPrefix, fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket))
|
|
if !found {
|
|
continue
|
|
}
|
|
response.Rules = append(response.Rules, lifecycle_xml.Rule{
|
|
ID: prefix,
|
|
Status: lifecycle_xml.Enabled,
|
|
Prefix: lifecycle_xml.NewPrefix(prefix),
|
|
Expiration: lifecycle_xml.NewExpirationDays(days),
|
|
})
|
|
}
|
|
|
|
if len(response.Rules) > 0 {
|
|
w.Header().Set(bucketLifecycleTransitionMinimumObjectSizeHeader, defaultLifecycleTransitionMinimumObjectSize)
|
|
}
|
|
writeSuccessResponseXML(w, r, response)
|
|
}
|
|
|
|
// PutBucketLifecycleConfigurationHandler Put Bucket Lifecycle configuration
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html
|
|
func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
|
|
// collect parameters
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("PutBucketLifecycleConfigurationHandler %s", bucket)
|
|
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
r.Body = http.MaxBytesReader(w, r.Body, maxBucketLifecycleConfigurationSize)
|
|
lifecycleXML, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
glog.Warningf("PutBucketLifecycleConfigurationHandler read body: %s", err)
|
|
var maxBytesErr *http.MaxBytesError
|
|
if errors.As(err, &maxBytesErr) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrEntityTooLarge)
|
|
return
|
|
}
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
|
|
lifeCycleConfig := lifecycle_xml.Lifecycle{}
|
|
if err := xmlDecoder(bytes.NewReader(lifecycleXML), &lifeCycleConfig, int64(len(lifecycleXML))); err != nil {
|
|
glog.Warningf("PutBucketLifecycleConfigurationHandler xml decode: %s", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
|
|
return
|
|
}
|
|
|
|
// Reject Transition rules — they require storage class migration
|
|
// infrastructure that does not exist yet. Validate before touching
|
|
// any backing state so a malformed PUT can't half-apply.
|
|
for _, rule := range lifeCycleConfig.Rules {
|
|
if rule.Status != lifecycle_xml.Enabled {
|
|
continue
|
|
}
|
|
if rule.Transition.Set() || rule.NoncurrentVersionTransition.Set() {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Migration: clear any day-TTL filer.conf entries this handler
|
|
// installed in older builds. Per-write TTL is now driven by the
|
|
// LifecycleTTLResolver constructed off the stored XML, so leaving a
|
|
// stale day-TTL entry under /buckets/<bucket>/ would double-stamp
|
|
// (volume server expires under the old rule) or contradict the new
|
|
// XML after a rule change. The add path is gone — this loop only
|
|
// shrinks the conf, never grows it.
|
|
fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil)
|
|
if err != nil {
|
|
glog.Errorf("PutBucketLifecycleConfigurationHandler read filer config: %s", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
collectionTtls := fc.GetCollectionTtls(s3a.getCollectionName(bucket))
|
|
changed := false
|
|
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
|
|
for prefix, ttl := range collectionTtls {
|
|
if !strings.HasPrefix(prefix, bucketPrefix) || !strings.HasSuffix(ttl, "d") {
|
|
continue
|
|
}
|
|
fc.DeleteLocationConf(prefix)
|
|
changed = true
|
|
}
|
|
|
|
if changed {
|
|
var buf bytes.Buffer
|
|
if err := fc.ToText(&buf); err != nil {
|
|
glog.Errorf("PutBucketLifecycleConfigurationHandler save config to text: %s", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
}
|
|
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
return filer.SaveInsideFiler(context.Background(), client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes())
|
|
}); err != nil {
|
|
glog.Errorf("PutBucketLifecycleConfigurationHandler save config inside filer: %s", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
}
|
|
|
|
if errCode := s3a.storeBucketLifecycleConfiguration(bucket, lifecycleXML, r.Header.Get(bucketLifecycleTransitionMinimumObjectSizeHeader)); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
writeSuccessResponseEmpty(w, r)
|
|
}
|
|
|
|
// DeleteBucketLifecycleHandler Delete Bucket Lifecycle
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html
|
|
func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
|
|
// collect parameters
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("DeleteBucketLifecycleHandler %s", bucket)
|
|
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil)
|
|
if err != nil {
|
|
glog.Errorf("DeleteBucketLifecycleHandler read filer config: %s", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
collectionTtls := fc.GetCollectionTtls(s3a.getCollectionName(bucket))
|
|
changed := false
|
|
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
|
|
for prefix, ttl := range collectionTtls {
|
|
if !strings.HasPrefix(prefix, bucketPrefix) || !strings.HasSuffix(ttl, "d") {
|
|
continue
|
|
}
|
|
fc.DeleteLocationConf(prefix)
|
|
changed = true
|
|
}
|
|
|
|
if changed {
|
|
var buf bytes.Buffer
|
|
if err := fc.ToText(&buf); err != nil {
|
|
glog.Errorf("DeleteBucketLifecycleHandler save config to text: %s", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
}
|
|
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
return filer.SaveInsideFiler(context.Background(), client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes())
|
|
}); err != nil {
|
|
glog.Errorf("DeleteBucketLifecycleHandler save config inside filer: %s", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
}
|
|
|
|
if errCode := s3a.clearStoredBucketLifecycleConfiguration(bucket); errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
|
|
}
|
|
|
|
// GetBucketLocationHandler Get bucket location
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html
|
|
func (s3a *S3ApiServer) GetBucketLocationHandler(w http.ResponseWriter, r *http.Request) {
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
writeSuccessResponseXML(w, r, CreateBucketConfiguration{})
|
|
}
|
|
|
|
// GetBucketRequestPaymentHandler Get bucket location
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketRequestPayment.html
|
|
func (s3a *S3ApiServer) GetBucketRequestPaymentHandler(w http.ResponseWriter, r *http.Request) {
|
|
writeSuccessResponseXML(w, r, RequestPaymentConfiguration{Payer: "BucketOwner"})
|
|
}
|
|
|
|
// PutBucketOwnershipControls https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketOwnershipControls.html
|
|
func (s3a *S3ApiServer) PutBucketOwnershipControls(w http.ResponseWriter, r *http.Request) {
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("PutBucketOwnershipControls %s", bucket)
|
|
|
|
errCode := s3a.checkAccessByOwnership(r, bucket)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
if r.Body == nil || r.Body == http.NoBody {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
|
|
var v s3.OwnershipControls
|
|
defer util_http.CloseRequest(r)
|
|
|
|
err := xmlutil.UnmarshalXML(&v, xml.NewDecoder(r.Body), "")
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
|
|
if len(v.Rules) != 1 || v.Rules[0] == nil || v.Rules[0].ObjectOwnership == nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
|
|
printOwnership := true
|
|
ownership := *v.Rules[0].ObjectOwnership
|
|
switch ownership {
|
|
case s3_constants.OwnershipObjectWriter:
|
|
case s3_constants.OwnershipBucketOwnerPreferred:
|
|
case s3_constants.OwnershipBucketOwnerEnforced:
|
|
printOwnership = false
|
|
default:
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
|
|
// Check if ownership needs to be updated
|
|
currentOwnership, errCode := s3a.getBucketOwnership(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
if currentOwnership != ownership {
|
|
errCode = s3a.setBucketOwnership(bucket, ownership)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
}
|
|
|
|
if printOwnership {
|
|
result := &s3.PutBucketOwnershipControlsInput{
|
|
OwnershipControls: &v,
|
|
}
|
|
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, result)
|
|
} else {
|
|
writeSuccessResponseEmpty(w, r)
|
|
}
|
|
}
|
|
|
|
// GetBucketOwnershipControls https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketOwnershipControls.html
|
|
func (s3a *S3ApiServer) GetBucketOwnershipControls(w http.ResponseWriter, r *http.Request) {
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("GetBucketOwnershipControls %s", bucket)
|
|
|
|
errCode := s3a.checkAccessByOwnership(r, bucket)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
// Get ownership using new bucket config system
|
|
ownership, errCode := s3a.getBucketOwnership(bucket)
|
|
if errCode == s3err.ErrNoSuchBucket {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
|
return
|
|
} else if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, s3err.OwnershipControlsNotFoundError)
|
|
return
|
|
}
|
|
|
|
result := &s3.PutBucketOwnershipControlsInput{
|
|
OwnershipControls: &s3.OwnershipControls{
|
|
Rules: []*s3.OwnershipControlsRule{
|
|
{
|
|
ObjectOwnership: &ownership,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, result)
|
|
}
|
|
|
|
// DeleteBucketOwnershipControls https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketOwnershipControls.html
|
|
func (s3a *S3ApiServer) DeleteBucketOwnershipControls(w http.ResponseWriter, r *http.Request) {
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("PutBucketOwnershipControls %s", bucket)
|
|
|
|
errCode := s3a.checkAccessByOwnership(r, bucket)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
bucketEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
|
|
return
|
|
}
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
_, ok := bucketEntry.Extended[s3_constants.ExtOwnershipKey]
|
|
if !ok {
|
|
s3err.WriteErrorResponse(w, r, s3err.OwnershipControlsNotFoundError)
|
|
return
|
|
}
|
|
|
|
delete(bucketEntry.Extended, s3_constants.ExtOwnershipKey)
|
|
err = s3a.updateEntry(s3a.option.BucketsPath, bucketEntry)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
emptyOwnershipControls := &s3.OwnershipControls{
|
|
Rules: []*s3.OwnershipControlsRule{},
|
|
}
|
|
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, emptyOwnershipControls)
|
|
}
|
|
|
|
// GetBucketVersioningHandler Get Bucket Versioning status
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
|
|
func (s3a *S3ApiServer) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("GetBucketVersioning %s", bucket)
|
|
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
// Get versioning status using new bucket config system
|
|
versioningStatus, errCode := s3a.getBucketVersioningStatus(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
// AWS S3 behavior: If versioning was never configured, don't return Status field
|
|
var response *s3.PutBucketVersioningInput
|
|
if versioningStatus == "" {
|
|
// No versioning configuration - return empty response (no Status field)
|
|
response = &s3.PutBucketVersioningInput{
|
|
VersioningConfiguration: &s3.VersioningConfiguration{},
|
|
}
|
|
} else {
|
|
// Versioning was explicitly configured - return the status
|
|
response = &s3.PutBucketVersioningInput{
|
|
VersioningConfiguration: &s3.VersioningConfiguration{
|
|
Status: aws.String(versioningStatus),
|
|
},
|
|
}
|
|
}
|
|
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, response)
|
|
}
|
|
|
|
// PutBucketVersioningHandler Put bucket Versioning
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
|
|
func (s3a *S3ApiServer) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("PutBucketVersioning %s", bucket)
|
|
|
|
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
if r.Body == nil || r.Body == http.NoBody {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
|
|
var versioningConfig s3.VersioningConfiguration
|
|
defer util_http.CloseRequest(r)
|
|
|
|
err := xmlutil.UnmarshalXML(&versioningConfig, xml.NewDecoder(r.Body), "")
|
|
if err != nil {
|
|
glog.Warningf("PutBucketVersioningHandler xml decode: %s", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
|
|
return
|
|
}
|
|
|
|
if versioningConfig.Status == nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
|
|
status := *versioningConfig.Status
|
|
if status != s3_constants.VersioningEnabled && status != s3_constants.VersioningSuspended {
|
|
glog.Errorf("PutBucketVersioningHandler: invalid status '%s' for bucket %s", status, bucket)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
|
|
return
|
|
}
|
|
|
|
// Check if trying to suspend versioning on a bucket with object lock enabled
|
|
if status == s3_constants.VersioningSuspended {
|
|
// Get bucket configuration to check for object lock
|
|
bucketConfig, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode == s3err.ErrNone && bucketConfig.ObjectLockConfig != nil {
|
|
// Object lock is enabled, cannot suspend versioning
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidBucketState)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Update bucket versioning configuration using new bucket config system
|
|
if errCode := s3a.setBucketVersioningStatus(bucket, status); errCode != s3err.ErrNone {
|
|
glog.Errorf("PutBucketVersioningHandler save config: bucket=%s, status='%s', errCode=%d", bucket, status, errCode)
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
writeSuccessResponseEmpty(w, r)
|
|
}
|