This commit is contained in:
Chris Lu
2026-04-10 17:31:14 -07:00
parent 066f7c3a0d
commit e648c76bcf
51 changed files with 245 additions and 257 deletions
+5 -5
View File
@@ -39,11 +39,11 @@ type dlmTestCluster struct {
filerGrpcPorts [2]int
mountPoints [2]string
masterCmd *exec.Cmd
volumeCmd *exec.Cmd
filerCmds [2]*exec.Cmd
mountCmds [2]*exec.Cmd
logFiles []*os.File
masterCmd *exec.Cmd
volumeCmd *exec.Cmd
filerCmds [2]*exec.Cmd
mountCmds [2]*exec.Cmd
logFiles []*os.File
cleanupOnce sync.Once
}
+2 -3
View File
@@ -52,8 +52,8 @@ type MasterCluster struct {
// clusterStatus is the JSON returned by /cluster/status.
type clusterStatus struct {
IsLeader bool `json:"IsLeader"`
Leader string `json:"Leader"`
IsLeader bool `json:"IsLeader"`
Leader string `json:"Leader"`
Peers []string `json:"Peers"`
}
@@ -358,7 +358,6 @@ func (mc *MasterCluster) tailLog(i int) string {
return strings.Join(lines, "\n")
}
func findOrBuildWeedBinary() (string, error) {
if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" {
if isExecutableFile(fromEnv) {
@@ -592,7 +592,6 @@ func (c *distributedLockCluster) tailLog(name string) string {
return strings.Join(lines, "\n")
}
func stopProcess(cmd *exec.Cmd) {
if cmd == nil || cmd.Process == nil {
return
+13 -13
View File
@@ -39,19 +39,19 @@ const (
// TestCluster manages the weed mini instance for integration testing
type TestCluster struct {
dataDir string
ctx context.Context
cancel context.CancelFunc
s3Client *s3.S3
isRunning bool
startOnce sync.Once
wg sync.WaitGroup
masterPort int
volumePort int
filerPort int
s3Port int
s3Endpoint string
rustVolumeCmd *exec.Cmd
dataDir string
ctx context.Context
cancel context.CancelFunc
s3Client *s3.S3
isRunning bool
startOnce sync.Once
wg sync.WaitGroup
masterPort int
volumePort int
filerPort int
s3Port int
s3Endpoint string
rustVolumeCmd *exec.Cmd
}
// TestS3Integration demonstrates basic S3 operations against a running weed mini instance
@@ -317,4 +317,3 @@ func hasKey(policy map[string]interface{}, key string) bool {
return false
}
-2
View File
@@ -703,7 +703,6 @@ func uniqueName(prefix string) string {
// --- Test setup helpers ---
func startMiniCluster(t *testing.T) (*TestCluster, error) {
ports := testutil.MustAllocatePorts(t, 8)
masterPort, masterGrpcPort := ports[0], ports[1]
@@ -806,7 +805,6 @@ enabled = true
return cluster, nil
}
// startRustVolumeServer starts a Rust volume server that registers with the same master.
func (c *TestCluster) startRustVolumeServer(t *testing.T) error {
t.Helper()
-1
View File
@@ -272,7 +272,6 @@ func stopProcess(cmd *exec.Cmd) {
}
}
func newWorkDir() (dir string, keepLogs bool, err error) {
keepLogs = os.Getenv("VOLUME_SERVER_IT_KEEP_LOGS") == "1"
dir, err = os.MkdirTemp("", "seaweedfs_volume_server_it_")
@@ -778,8 +778,8 @@ func TestEcIndexConsistencyAfterEncode(t *testing.T) {
needles := []testNeedle{
{framework.NewFileID(volumeID, 1001, 0xAABB0001), []byte("small-needle-1")},
{framework.NewFileID(volumeID, 1002, 0xAABB0002), make([]byte, 1024)}, // 1KB
{framework.NewFileID(volumeID, 1003, 0xAABB0003), make([]byte, 64*1024)}, // 64KB
{framework.NewFileID(volumeID, 1004, 0xAABB0004), make([]byte, 256*1024)}, // 256KB
{framework.NewFileID(volumeID, 1003, 0xAABB0003), make([]byte, 64*1024)}, // 64KB
{framework.NewFileID(volumeID, 1004, 0xAABB0004), make([]byte, 256*1024)}, // 256KB
{framework.NewFileID(volumeID, 1005, 0xAABB0005), []byte("small-needle-2")},
}
+4 -3
View File
@@ -459,9 +459,10 @@ func sortDurations(d []time.Duration) {
// This reveals tail latency differences that short tests miss (GC pauses, lock contention, etc).
//
// Run:
// go test -v -count=1 -timeout 600s -run TestSustainedP99 ./test/volume_server/loadtest/...
// VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 600s -run TestSustainedP99 ./test/volume_server/loadtest/...
// LOADTEST_DURATION=120s VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 600s -run TestSustainedP99 ./test/volume_server/loadtest/...
//
// go test -v -count=1 -timeout 600s -run TestSustainedP99 ./test/volume_server/loadtest/...
// VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 600s -run TestSustainedP99 ./test/volume_server/loadtest/...
// LOADTEST_DURATION=120s VOLUME_SERVER_IMPL=rust go test -v -count=1 -timeout 600s -run TestSustainedP99 ./test/volume_server/loadtest/...
func TestSustainedP99(t *testing.T) {
if testing.Short() {
t.Skip("skipping sustained load test in short mode")
+8 -8
View File
@@ -289,8 +289,8 @@ func (cp *ConfigPersistence) LoadVacuumTaskConfig() (*VacuumTaskConfig, error) {
// Return default config if no valid config found
return &VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
}, nil
}
@@ -305,8 +305,8 @@ func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, erro
CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
},
},
}, nil
@@ -325,8 +325,8 @@ func (cp *ConfigPersistence) LoadVacuumTaskPolicy() (*worker_pb.TaskPolicy, erro
CheckIntervalSeconds: 6 * 3600, // 6 hours in seconds
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
},
},
}, nil
@@ -705,8 +705,8 @@ func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
},
},
}
+4 -4
View File
@@ -619,10 +619,10 @@ func TestRunLaneSchedulerIterationLockBehavior(t *testing.T) {
t.Parallel()
tests := []struct {
name string
lane SchedulerLane
jobType string
wantLock bool
name string
lane SchedulerLane
jobType string
wantLock bool
}{
{"Default", LaneDefault, "vacuum", true},
{"Iceberg", LaneIceberg, "iceberg_maintenance", false},
+1 -1
View File
@@ -114,7 +114,7 @@ type schedulerLaneState struct {
// Per-lane execution reservation pool. Each lane tracks how many
// execution slots it has reserved on each worker independently,
// so lanes cannot starve each other.
execMu sync.Mutex
execMu sync.Mutex
execRes map[string]int
}
@@ -20,10 +20,10 @@ var NoLockServerError = fmt.Errorf("no lock server found")
type ReplicateFunc func(server pb.ServerAddress, key string, expiredAtNs int64, token string, owner string, generation int64, seq int64, isUnlock bool)
type DistributedLockManager struct {
lockManager *LockManager
LockRing *LockRing
Host pb.ServerAddress
ReplicateFn ReplicateFunc // set by filer server after creation
lockManager *LockManager
LockRing *LockRing
Host pb.ServerAddress
ReplicateFn ReplicateFunc // set by filer server after creation
}
func NewDistributedLockManager(host pb.ServerAddress) *DistributedLockManager {
+1 -1
View File
@@ -23,7 +23,7 @@ const DefaultVnodeCount = 50
type HashRing struct {
mu sync.RWMutex
vnodeCount int
sortedHashes []uint32 // sorted ring positions
sortedHashes []uint32 // sorted ring positions
vnodeToServer map[uint32]pb.ServerAddress // ring position → server
servers map[pb.ServerAddress]struct{} // set of all servers
}
+6 -6
View File
@@ -18,18 +18,18 @@ var LockNotFound = fmt.Errorf("lock not found")
// LockManager local lock manager, used by distributed lock manager
type LockManager struct {
locks map[string]*Lock
accessLock sync.RWMutex
nextGeneration atomic.Int64
locks map[string]*Lock
accessLock sync.RWMutex
nextGeneration atomic.Int64
}
type Lock struct {
Token string
ExpiredAtNs int64
Key string // only used for moving locks
Owner string
IsBackup bool // true if this node holds the lock as a backup
Generation int64 // monotonic fencing token, increments on fresh acquisition
Seq int64 // per-lock sequence number, increments on every mutation (acquire/renew/unlock)
IsBackup bool // true if this node holds the lock as a backup
Generation int64 // monotonic fencing token, increments on fresh acquisition
Seq int64 // per-lock sequence number, increments on every mutation (acquire/renew/unlock)
}
func NewLockManager() *LockManager {
+1 -1
View File
@@ -22,7 +22,7 @@ type LockRing struct {
onTakeSnapshot func(snapshot []pb.ServerAddress)
cleanupWg sync.WaitGroup
Ring *HashRing // consistent hash ring
version int64 // monotonic version from master, rejects stale updates
version int64 // monotonic version from master, rejects stale updates
}
func NewLockRing(snapshotInterval time.Duration) *LockRing {
+7 -7
View File
@@ -17,13 +17,13 @@ const LockRingStabilizationInterval = 1 * time.Second
// so filers receive a single consistent ring update instead of multiple
// intermediate states.
type LockRingManager struct {
mu sync.Mutex
members map[FilerGroupName]map[pb.ServerAddress]struct{}
version map[FilerGroupName]int64
lastBroadcast map[FilerGroupName]*master_pb.LockRingUpdate
pendingTimer map[FilerGroupName]*time.Timer
broadcastFn func(resp *master_pb.KeepConnectedResponse)
stabilizeDelay time.Duration
mu sync.Mutex
members map[FilerGroupName]map[pb.ServerAddress]struct{}
version map[FilerGroupName]int64
lastBroadcast map[FilerGroupName]*master_pb.LockRingUpdate
pendingTimer map[FilerGroupName]*time.Timer
broadcastFn func(resp *master_pb.KeepConnectedResponse)
stabilizeDelay time.Duration
}
func NewLockRingManager(broadcastFn func(resp *master_pb.KeepConnectedResponse)) *LockRingManager {
+1 -1
View File
@@ -11,11 +11,11 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/security"
+9 -9
View File
@@ -15,10 +15,10 @@ import (
// tsMinHeap implements heap.Interface for int64 timestamps.
type tsMinHeap []int64
func (h tsMinHeap) Len() int { return len(h) }
func (h tsMinHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h tsMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *tsMinHeap) Push(x any) { *h = append(*h, x.(int64)) }
func (h tsMinHeap) Len() int { return len(h) }
func (h tsMinHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h tsMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *tsMinHeap) Push(x any) { *h = append(*h, x.(int64)) }
func (h *tsMinHeap) Pop() any {
old := *h
n := len(old)
@@ -34,11 +34,11 @@ type syncJobPaths struct {
}
type MetadataProcessor struct {
activeJobs map[int64]*syncJobPaths
activeJobsLock sync.Mutex
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
activeJobs map[int64]*syncJobPaths
activeJobsLock sync.Mutex
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
processedTsWatermark atomic.Int64
// Indexes for O(depth) conflict detection, replacing O(n) linear scan.
+2 -2
View File
@@ -11,8 +11,8 @@ import (
func makeResp(dir, name string, isDir bool, tsNs int64, isNew bool) *filer_pb.SubscribeMetadataResponse {
resp := &filer_pb.SubscribeMetadataResponse{
Directory: dir,
TsNs: tsNs,
Directory: dir,
TsNs: tsNs,
EventNotification: &filer_pb.EventNotification{},
}
entry := &filer_pb.Entry{
+31 -31
View File
@@ -6,38 +6,38 @@ import (
)
type MountOptions struct {
filer *string
filerMountRootPath *string
dir *string
dirAutoCreate *bool
collection *string
collectionQuota *int
replication *string
diskType *string
ttlSec *int
chunkSizeLimitMB *int
concurrentWriters *int
concurrentReaders *int
cacheMetaTtlSec *int
cacheDirForRead *string
cacheDirForWrite *string
cacheSizeMBForRead *int64
dataCenter *string
allowOthers *bool
defaultPermissions *bool
umaskString *string
nonempty *bool
volumeServerAccess *string
uidMap *string
gidMap *string
readOnly *bool
filer *string
filerMountRootPath *string
dir *string
dirAutoCreate *bool
collection *string
collectionQuota *int
replication *string
diskType *string
ttlSec *int
chunkSizeLimitMB *int
concurrentWriters *int
concurrentReaders *int
cacheMetaTtlSec *int
cacheDirForRead *string
cacheDirForWrite *string
cacheSizeMBForRead *int64
dataCenter *string
allowOthers *bool
defaultPermissions *bool
umaskString *string
nonempty *bool
volumeServerAccess *string
uidMap *string
gidMap *string
readOnly *bool
includeSystemEntries *bool
debug *bool
debugPort *int
localSocket *string
disableXAttr *bool
extraOptions []string
fuseCommandPid int
debug *bool
debugPort *int
localSocket *string
disableXAttr *bool
extraOptions []string
fuseCommandPid int
// Periodic metadata flush to protect against orphan chunk cleanup
metadataFlushSeconds *int
+9 -9
View File
@@ -345,16 +345,16 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
IsMacOs: runtime.GOOS == "darwin",
MetadataFlushSeconds: *option.metadataFlushSeconds,
// RDMA acceleration options
RdmaEnabled: *option.rdmaEnabled,
RdmaSidecarAddr: *option.rdmaSidecarAddr,
RdmaFallback: *option.rdmaFallback,
RdmaReadOnly: *option.rdmaReadOnly,
RdmaMaxConcurrent: *option.rdmaMaxConcurrent,
RdmaTimeoutMs: *option.rdmaTimeoutMs,
DirIdleEvictSec: *option.dirIdleEvictSec,
RdmaEnabled: *option.rdmaEnabled,
RdmaSidecarAddr: *option.rdmaSidecarAddr,
RdmaFallback: *option.rdmaFallback,
RdmaReadOnly: *option.rdmaReadOnly,
RdmaMaxConcurrent: *option.rdmaMaxConcurrent,
RdmaTimeoutMs: *option.rdmaTimeoutMs,
DirIdleEvictSec: *option.dirIdleEvictSec,
EnableDistributedLock: option.distributedLock != nil && *option.distributedLock,
WritebackCache: option.writebackCache != nil && *option.writebackCache,
PosixDirNlink: option.posixDirNlink != nil && *option.posixDirNlink,
WritebackCache: option.writebackCache != nil && *option.writebackCache,
PosixDirNlink: option.posixDirNlink != nil && *option.posixDirNlink,
})
// create mount root
+1 -1
View File
@@ -15,7 +15,7 @@ var (
shellOptions shell.ShellOptions
shellInitialFiler *string
shellCluster *string
shellDebug *bool
shellDebug *bool
)
func init() {
@@ -12,10 +12,10 @@ import (
)
type mockFilerOps struct {
countFn func(path util.FullPath) (int, error)
deleteFn func(path util.FullPath) error
attrsFn func(path util.FullPath) (map[string][]byte, error)
isDirKeyObjFn func(path util.FullPath) (bool, error)
countFn func(path util.FullPath) (int, error)
deleteFn func(path util.FullPath) error
attrsFn func(path util.FullPath) (map[string][]byte, error)
isDirKeyObjFn func(path util.FullPath) (bool, error)
}
func (m *mockFilerOps) CountDirectoryEntries(_ context.Context, dirPath util.FullPath, _ int) (int, error) {
+22 -22
View File
@@ -40,28 +40,28 @@ var (
)
type Filer struct {
UniqueFilerId int32
UniqueFilerEpoch int32
Store VirtualFilerStore
MasterClient *wdclient.MasterClient
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
DirBucketsPath string
Cipher bool
LocalMetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
metaLogReplication string
MetaAggregator *MetaAggregator
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
lazyFetchGroup singleflight.Group
lazyListGroup singleflight.Group
Dlm *lock_manager.DistributedLockManager
MaxFilenameLength uint32
deletionQuit chan struct{}
DeletionRetryQueue *DeletionRetryQueue
EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner
UniqueFilerId int32
UniqueFilerEpoch int32
Store VirtualFilerStore
MasterClient *wdclient.MasterClient
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
DirBucketsPath string
Cipher bool
LocalMetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
metaLogReplication string
MetaAggregator *MetaAggregator
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
lazyFetchGroup singleflight.Group
lazyListGroup singleflight.Group
Dlm *lock_manager.DistributedLockManager
MaxFilenameLength uint32
deletionQuit chan struct{}
DeletionRetryQueue *DeletionRetryQueue
EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner
EmptyFolderCleanupDelay time.Duration
}
+7 -7
View File
@@ -228,13 +228,13 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
}
stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "filer:" + string(self),
PathPrefix: "/",
SinceNs: lastTsNs,
ClientId: ma.filer.UniqueFilerId,
ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch),
ClientSupportsBatching: true,
ClientSupportsMetadataChunks: true,
ClientName: "filer:" + string(self),
PathPrefix: "/",
SinceNs: lastTsNs,
ClientId: ma.filer.UniqueFilerId,
ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch),
ClientSupportsBatching: true,
ClientSupportsMetadataChunks: true,
})
if err != nil {
glog.V(0).Infof("SubscribeLocalMetadata %v: %v", peer, err)
+5 -5
View File
@@ -14,8 +14,8 @@ import (
"testing"
"time"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
@@ -92,10 +92,10 @@ func createMockVolumeServer(chunkData map[string][]byte, latency time.Duration)
// benchmarkConfig holds parameters for a single benchmark scenario
type benchmarkConfig struct {
numChunks int
chunkSize int
latency time.Duration
prefetch int // 0 = sequential
numChunks int
chunkSize int
latency time.Duration
prefetch int // 0 = sequential
}
func (c benchmarkConfig) name() string {
+1 -1
View File
@@ -863,7 +863,7 @@ func (l *loggingT) exit(err error) {
// file rotation. There are conflicting methods, so the file cannot be embedded.
// l.mu is held for all its methods.
type syncBuffer struct {
logger *loggingT
logger *loggingT
*bufio.Writer
file *os.File
sev severity
+20 -20
View File
@@ -272,10 +272,10 @@ func BuildErrorResponse(correlationID uint32, errorCode int16) []byte {
func BuildAPIErrorResponse(apiKey, apiVersion uint16, errorCode int16) []byte {
ec := make([]byte, 2)
binary.BigEndian.PutUint16(ec, uint16(errorCode))
throttle := []byte{0, 0, 0, 0} // throttle_time_ms = 0
emptyArr := []byte{0, 0, 0, 0} // regular array length = 0
nullStr := []byte{0xFF, 0xFF} // nullable string = null
emptyStr := []byte{0, 0} // string length = 0
throttle := []byte{0, 0, 0, 0} // throttle_time_ms = 0
emptyArr := []byte{0, 0, 0, 0} // regular array length = 0
nullStr := []byte{0xFF, 0xFF} // nullable string = null
emptyStr := []byte{0, 0} // string length = 0
switch APIKey(apiKey) {
@@ -306,10 +306,10 @@ func BuildAPIErrorResponse(apiKey, apiVersion uint16, errorCode int16) []byte {
buf = append(buf, throttle...)
}
buf = append(buf, ec...)
buf = append(buf, nullStr...) // error_message
buf = append(buf, 0xFF, 0xFF, 0xFF, 0xFF) // node_id = -1
buf = append(buf, emptyStr...) // host
buf = append(buf, 0, 0, 0, 0) // port = 0
buf = append(buf, nullStr...) // error_message
buf = append(buf, 0xFF, 0xFF, 0xFF, 0xFF) // node_id = -1
buf = append(buf, emptyStr...) // host
buf = append(buf, 0, 0, 0, 0) // port = 0
return buf
case APIKeyJoinGroup:
@@ -320,10 +320,10 @@ func BuildAPIErrorResponse(apiKey, apiVersion uint16, errorCode int16) []byte {
}
buf = append(buf, ec...)
buf = append(buf, 0xFF, 0xFF, 0xFF, 0xFF) // generation_id = -1
buf = append(buf, nullStr...) // protocol_name
buf = append(buf, emptyStr...) // leader
buf = append(buf, emptyStr...) // member_id
buf = append(buf, emptyArr...) // members = []
buf = append(buf, nullStr...) // protocol_name
buf = append(buf, emptyStr...) // leader
buf = append(buf, emptyStr...) // member_id
buf = append(buf, emptyArr...) // members = []
return buf
case APIKeySyncGroup:
@@ -363,7 +363,7 @@ func BuildAPIErrorResponse(apiKey, apiVersion uint16, errorCode int16) []byte {
buf = append(buf, throttle...)
buf = append(buf, ec...)
buf = append(buf, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) // producer_id = -1
buf = append(buf, 0xFF, 0xFF) // producer_epoch = -1
buf = append(buf, 0xFF, 0xFF) // producer_epoch = -1
return buf
case APIKeyListGroups:
@@ -381,11 +381,11 @@ func BuildAPIErrorResponse(apiKey, apiVersion uint16, errorCode int16) []byte {
buf := make([]byte, 0, 24)
buf = append(buf, throttle...)
buf = append(buf, ec...)
buf = append(buf, nullStr...) // error_message
buf = append(buf, emptyStr...) // cluster_id
buf = append(buf, 0xFF, 0xFF, 0xFF, 0xFF) // controller_id = -1
buf = append(buf, emptyArr...) // brokers = []
buf = append(buf, 0, 0, 0, 0) // cluster_authorized_operations
buf = append(buf, nullStr...) // error_message
buf = append(buf, emptyStr...) // cluster_id
buf = append(buf, 0xFF, 0xFF, 0xFF, 0xFF) // controller_id = -1
buf = append(buf, emptyArr...) // brokers = []
buf = append(buf, 0, 0, 0, 0) // cluster_authorized_operations
return buf
// --- array-based responses (no top-level error_code) ----------------------
@@ -405,8 +405,8 @@ func BuildAPIErrorResponse(apiKey, apiVersion uint16, errorCode int16) []byte {
buf = append(buf, throttle...)
}
if apiVersion >= 7 {
buf = append(buf, ec...) // error_code
buf = append(buf, 0, 0, 0, 0) // session_id = 0
buf = append(buf, ec...) // error_code
buf = append(buf, 0, 0, 0, 0) // session_id = 0
}
buf = append(buf, emptyArr...) // topics = []
return buf
+12 -12
View File
@@ -37,7 +37,7 @@ type MetadataFollowOption struct {
// LogFileReaderFn, when non-nil, enables metadata chunks mode:
// the server sends log file chunk fids instead of streaming events,
// and the client reads directly from volume servers.
LogFileReaderFn LogFileReaderFn
LogFileReaderFn LogFileReaderFn
}
type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
@@ -66,17 +66,17 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: option.ClientName,
PathPrefix: option.PathPrefix,
PathPrefixes: option.AdditionalPathPrefixes,
Directories: option.DirectoriesToWatch,
SinceNs: option.StartTsNs,
Signature: option.SelfSignature,
ClientId: option.ClientId,
ClientEpoch: option.ClientEpoch,
UntilNs: option.StopTsNs,
ClientSupportsBatching: true,
ClientSupportsMetadataChunks: option.LogFileReaderFn != nil,
ClientName: option.ClientName,
PathPrefix: option.PathPrefix,
PathPrefixes: option.AdditionalPathPrefixes,
Directories: option.DirectoriesToWatch,
SinceNs: option.StartTsNs,
Signature: option.SelfSignature,
ClientId: option.ClientId,
ClientEpoch: option.ClientEpoch,
UntilNs: option.StopTsNs,
ClientSupportsBatching: true,
ClientSupportsMetadataChunks: option.LogFileReaderFn != nil,
})
if err != nil {
return fmt.Errorf("subscribe: %w", err)
+7 -8
View File
@@ -152,9 +152,9 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}},
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}},
"preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}},
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}},
"preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
@@ -169,9 +169,9 @@ func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
JobTypeMaxRuntimeSeconds: 1800,
},
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}},
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}},
"preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}},
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}},
"preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
},
}
}
@@ -404,11 +404,10 @@ func deriveECBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *ecBa
}
taskConfig.MinServerCount = int(minServerCountRaw)
taskConfig.PreferredTags = util.NormalizeTagList(readStringListConfig(values, "preferred_tags"))
return &ecBalanceWorkerConfig{
TaskConfig: taskConfig,
TaskConfig: taskConfig,
}
}
+1 -1
View File
@@ -592,7 +592,7 @@ func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *
taskConfig.PreferredTags = util.NormalizeTagList(readStringListConfig(values, "preferred_tags"))
return &erasureCodingWorkerConfig{
TaskConfig: taskConfig,
TaskConfig: taskConfig,
}
}
@@ -204,7 +204,6 @@ func TestErasureCodingHandlerRejectsUnsupportedJobType(t *testing.T) {
}
}
func TestEmitErasureCodingDetectionDecisionTraceNoTasks(t *testing.T) {
sender := &recordingDetectionSender{}
config := erasurecodingtask.NewDefaultConfig()
+1 -1
View File
@@ -1213,7 +1213,7 @@ func TestDetectSchedulesSnapshotExpiryDespiteCompactionEvaluationError(t *testin
handler := NewHandler(nil)
config := Config{
SnapshotRetentionHours: 24 * 365, // very long retention so age doesn't trigger
MaxSnapshotsToKeep: 1, // 2 snapshots > 1 triggers expiry
MaxSnapshotsToKeep: 1, // 2 snapshots > 1 triggers expiry
Operations: "compact,expire_snapshots",
}
@@ -151,7 +151,6 @@ func TestVacuumHandlerRejectsUnsupportedJobType(t *testing.T) {
}
}
func TestBuildExecutorActivity(t *testing.T) {
activity := BuildExecutorActivity("running", "vacuum in progress")
if activity == nil {
@@ -1069,7 +1069,6 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume
}
taskConfig.MinServerCount = minServerCount
maxConcurrentMoves64 := readInt64Config(values, "max_concurrent_moves", int64(defaultMaxConcurrentMoves))
if maxConcurrentMoves64 < 1 {
maxConcurrentMoves64 = 1
@@ -340,7 +340,6 @@ func TestVolumeBalanceHandlerRejectsUnsupportedJobType(t *testing.T) {
}
}
func TestEmitVolumeBalanceDetectionDecisionTraceNoTasks(t *testing.T) {
sender := &recordingDetectionSender{}
config := balancetask.NewDefaultConfig()
+2 -2
View File
@@ -11,8 +11,8 @@ import (
)
var (
sseInitMu sync.Mutex
sseInitialized bool
sseInitMu sync.Mutex
sseInitialized bool
)
// InitializeSSEForReplication sets up SSE-S3 and SSE-KMS decryption so that
+2 -2
View File
@@ -46,8 +46,8 @@ func TestBuildTaggingString_ShouldURLEncodeValues(t *testing.T) {
func TestBuildTaggingString_EmptyWhenNoTags(t *testing.T) {
extended := map[string][]byte{
"Content-Encoding": []byte("gzip"),
s3_constants.AmzUserMetaMtime: []byte("12345"),
"Content-Encoding": []byte("gzip"),
s3_constants.AmzUserMetaMtime: []byte("12345"),
s3_constants.SeaweedFSSSES3Key: []byte(`{"algorithm":"AES256","encryptedDEK":"abc"}`),
}
-1
View File
@@ -193,4 +193,3 @@ func writeOAuthError(w http.ResponseWriter, status int, errCode, description str
}
writeJSON(w, status, resp)
}
+7 -7
View File
@@ -77,13 +77,13 @@ const (
AmzObjectLockLegalHold = "X-Amz-Object-Lock-Legal-Hold"
// S3 checksum headers
AmzChecksumAlgorithm = "X-Amz-Checksum-Algorithm"
AmzChecksumCRC32 = "X-Amz-Checksum-Crc32"
AmzChecksumCRC32C = "X-Amz-Checksum-Crc32c"
AmzChecksumCRC64NVME = "X-Amz-Checksum-Crc64nvme"
AmzChecksumSHA1 = "X-Amz-Checksum-Sha1"
AmzChecksumSHA256 = "X-Amz-Checksum-Sha256"
AmzTrailer = "X-Amz-Trailer"
AmzChecksumAlgorithm = "X-Amz-Checksum-Algorithm"
AmzChecksumCRC32 = "X-Amz-Checksum-Crc32"
AmzChecksumCRC32C = "X-Amz-Checksum-Crc32c"
AmzChecksumCRC64NVME = "X-Amz-Checksum-Crc64nvme"
AmzChecksumSHA1 = "X-Amz-Checksum-Sha1"
AmzChecksumSHA256 = "X-Amz-Checksum-Sha256"
AmzTrailer = "X-Amz-Trailer"
AmzSdkChecksumAlgorithm = "X-Amz-Sdk-Checksum-Algorithm"
// S3 conditional headers
+8 -8
View File
@@ -858,11 +858,11 @@ var checksumAlgorithmMapping = map[string]struct {
alg ChecksumAlgorithm
name string
}{
"CRC32": {ChecksumAlgorithmCRC32, s3_constants.AmzChecksumCRC32},
"CRC32C": {ChecksumAlgorithmCRC32C, s3_constants.AmzChecksumCRC32C},
"CRC32": {ChecksumAlgorithmCRC32, s3_constants.AmzChecksumCRC32},
"CRC32C": {ChecksumAlgorithmCRC32C, s3_constants.AmzChecksumCRC32C},
"CRC64NVME": {ChecksumAlgorithmCRC64NVMe, s3_constants.AmzChecksumCRC64NVME},
"SHA1": {ChecksumAlgorithmSHA1, s3_constants.AmzChecksumSHA1},
"SHA256": {ChecksumAlgorithmSHA256, s3_constants.AmzChecksumSHA256},
"SHA1": {ChecksumAlgorithmSHA1, s3_constants.AmzChecksumSHA1},
"SHA256": {ChecksumAlgorithmSHA256, s3_constants.AmzChecksumSHA256},
}
// trailerToChecksumAlgorithm maps trailer header names to their algorithm and canonical header name.
@@ -870,11 +870,11 @@ var trailerToChecksumAlgorithm = map[string]struct {
alg ChecksumAlgorithm
name string
}{
"x-amz-checksum-crc32": {ChecksumAlgorithmCRC32, s3_constants.AmzChecksumCRC32},
"x-amz-checksum-crc32c": {ChecksumAlgorithmCRC32C, s3_constants.AmzChecksumCRC32C},
"x-amz-checksum-crc32": {ChecksumAlgorithmCRC32, s3_constants.AmzChecksumCRC32},
"x-amz-checksum-crc32c": {ChecksumAlgorithmCRC32C, s3_constants.AmzChecksumCRC32C},
"x-amz-checksum-crc64nvme": {ChecksumAlgorithmCRC64NVMe, s3_constants.AmzChecksumCRC64NVME},
"x-amz-checksum-sha1": {ChecksumAlgorithmSHA1, s3_constants.AmzChecksumSHA1},
"x-amz-checksum-sha256": {ChecksumAlgorithmSHA256, s3_constants.AmzChecksumSHA256},
"x-amz-checksum-sha1": {ChecksumAlgorithmSHA1, s3_constants.AmzChecksumSHA1},
"x-amz-checksum-sha256": {ChecksumAlgorithmSHA256, s3_constants.AmzChecksumSHA256},
}
// checksumHeaders is the ordered list of individual checksum headers to check.
+4 -4
View File
@@ -54,8 +54,8 @@ var federationNameRegex = regexp.MustCompile(`^[\w+=,.@-]+$`)
// STS duration constants (AWS specification)
const (
minDurationSeconds = int64(900) // 15 minutes
maxDurationSeconds = int64(43200) // 12 hours (AssumeRole)
minDurationSeconds = int64(900) // 15 minutes
maxDurationSeconds = int64(43200) // 12 hours (AssumeRole)
defaultFederationDurationSeconds = int64(43200) // 12 hours (GetFederationToken default)
maxFederationDurationSeconds = int64(129600) // 36 hours (GetFederationToken max)
)
@@ -958,8 +958,8 @@ type GetCallerIdentityResult struct {
// GetFederationTokenResponse is the response for GetFederationToken
type GetFederationTokenResponse struct {
XMLName xml.Name `xml:"https://sts.amazonaws.com/doc/2011-06-15/ GetFederationTokenResponse"`
Result GetFederationTokenResult `xml:"GetFederationTokenResult"`
XMLName xml.Name `xml:"https://sts.amazonaws.com/doc/2011-06-15/ GetFederationTokenResponse"`
Result GetFederationTokenResult `xml:"GetFederationTokenResult"`
ResponseMetadata struct {
RequestId string `xml:"RequestId,omitempty"`
} `xml:"ResponseMetadata,omitempty"`
+9 -9
View File
@@ -207,15 +207,15 @@ func (fs *FilerServer) doCacheRemoteObjectToLocalCluster(ctx context.Context, re
var etag string
err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, fetchErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
Offset: localOffset,
Size: size,
Replicas: replicas,
Auth: string(assignResult.Auth),
DownloadConcurrency: downloadConcurrency,
RemoteConf: storageConf,
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
Cookie: uint32(fileId.Cookie),
Offset: localOffset,
Size: size,
Replicas: replicas,
Auth: string(assignResult.Auth),
DownloadConcurrency: downloadConcurrency,
RemoteConf: storageConf,
RemoteLocation: &remote_pb.RemoteStorageLocation{
Name: remoteStorageMountedLocation.Name,
Bucket: remoteStorageMountedLocation.Bucket,
@@ -139,7 +139,7 @@ func (p *renameStreamProxy) Context() context.Context {
return p.parent.Context()
}
func (p *renameStreamProxy) SendMsg(m any) error { return p.parent.SendMsg(m) }
func (p *renameStreamProxy) SendMsg(m any) error { return p.parent.SendMsg(m) }
func (p *renameStreamProxy) RecvMsg(m any) error { return p.parent.RecvMsg(m) }
func (p *renameStreamProxy) SetHeader(md metadata.MD) error { return p.parent.SetHeader(md) }
func (p *renameStreamProxy) SendHeader(md metadata.MD) error { return p.parent.SendHeader(md) }
-1
View File
@@ -315,4 +315,3 @@ func (s *RaftServer) DoJoinCommand() {
}
}
+7 -7
View File
@@ -49,14 +49,14 @@ type s3AccountInfo struct {
}
type s3UserShowResult struct {
Name string `json:"name"`
Status string `json:"status"`
Source string `json:"source"`
Account *s3AccountInfo `json:"account,omitempty"`
Policies []string `json:"policies"`
Actions []string `json:"actions,omitempty"`
Name string `json:"name"`
Status string `json:"status"`
Source string `json:"source"`
Account *s3AccountInfo `json:"account,omitempty"`
Policies []string `json:"policies"`
Actions []string `json:"actions,omitempty"`
Credentials []s3CredentialInfo `json:"credentials"`
ServiceAccounts []string `json:"service_accounts,omitempty"`
ServiceAccounts []string `json:"service_accounts,omitempty"`
}
func (c *commandS3UserShow) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
@@ -18,8 +18,8 @@ import (
// existed in VolumeEcShardsGenerate before the fix in this PR.
//
// Previously, the order was:
// 1. WriteEcFilesWithContext(baseFileName, ecCtx) — EC shards from .dat
// 2. WriteSortedFileFromIdx(v.IndexFileName(), ".ecx") — .ecx from .idx
// 1. WriteEcFilesWithContext(baseFileName, ecCtx) — EC shards from .dat
// 2. WriteSortedFileFromIdx(v.IndexFileName(), ".ecx") — .ecx from .idx
//
// If a write appended data to .dat/.idx between steps 1 and 2, the .ecx would
// have entries pointing to data that doesn't exist in the EC shards.
@@ -287,7 +287,7 @@ func TestEcDecodeDatRoundTrip(t *testing.T) {
// to avoid needing huge test files. We test small block decode only.
// Each shard gets datSize/DataShards bytes in small 1MB blocks.
datSizes := []int64{
1000, // tiny
1000, // tiny
int64(DataShardsCount) * ErasureCodingSmallBlockSize, // exactly 1 small row (10MB)
int64(DataShardsCount)*ErasureCodingSmallBlockSize + 500, // 1 small row + partial
}
+2 -2
View File
@@ -10,6 +10,6 @@ const (
// Suffix appended to a path, e.g. "/buckets/b/foo/bar is a file"
ErrMsgIsAFile = " is a file"
// Prefix + suffix, e.g. "existing /buckets/b/foo is a directory"
ErrMsgExistingPrefix = "existing "
ErrMsgIsADirectory = " is a directory"
ErrMsgExistingPrefix = "existing "
ErrMsgIsADirectory = " is a directory"
)
+9 -9
View File
@@ -142,15 +142,15 @@ func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []
type MasterClient struct {
*vidMapClient // Embedded cache with shared logic
FilerGroup string
clientType string
clientHost pb.ServerAddress
rack string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
grpcTimeout time.Duration // Timeout for gRPC calls to master
FilerGroup string
clientType string
clientHost pb.ServerAddress
rack string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
grpcTimeout time.Duration // Timeout for gRPC calls to master
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
OnLockRingUpdate func(update *master_pb.LockRingUpdate)