mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
ab7be7867d
* security: reload JWT signing keys on SIGHUP Signing keys were read once in the server constructors and never refreshed. After a key rotation (Secret update, divergent reads) the in-memory key stayed stale and every request kept failing "wrong jwt" until the affected process was restarted. Add Guard.UpdateSigningKeys and call it from the master, volume and filer reload paths and the s3 reload hook, next to the existing whitelist refresh. Make the global chunk-read JWT cache reloadable via an atomic swap, and register the master's Reload with grace.OnReload -- it was never wired, so the master ignored SIGHUP entirely. Mirror the same refresh in the Rust volume server's SIGHUP handler. * security: swap signing keys behind an atomic pointer Addresses review feedback on the in-place key swap: SigningKey is a []byte, so reassigning the Guard fields while a request handler reads them is a data race that can tear the multi-word slice header and read out of bounds. Hold the four signing-key fields in an immutable signingConfig snapshot behind atomic.Pointer; UpdateSigningKeys swaps the whole pointer, so a reader sees either the old keys or the new ones. Reads go through new SigningKey/ExpiresAfterSec/ReadSigningKey/ReadExpiresAfterSec accessors. The Rust guard is already safe: every read and the SIGHUP write go through the shared RwLock<Guard>. * security: fold whitelist + auth state into the atomic snapshot Review follow-up. UpdateSigningKeys still wrote isWriteActive while the request path read it (and the whitelist maps) unsynchronized, so a SIGHUP under load could expose an inconsistent mix of activation bits and whitelist contents. Move all hot-reloadable Guard state -- keys, expirations, whitelist, and the activation flags -- into a single immutable guardState swapped behind one atomic.Pointer. The Update* methods take a small mutex to serialize the read-modify-write; readers stay lock-free. The concurrency test now also rotates the whitelist and probes IsWhiteListed under -race. Also read each signing key once per branch in the volume/filer JWT auth checks, so a reload landing mid-check can't take the allow-fast-path after auth was enabled or verify against a different key than the branch saw.
216 lines
7.8 KiB
Go
216 lines
7.8 KiB
Go
package weed_server
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage"
|
|
)
|
|
|
|
type VolumeServer struct {
|
|
volume_server_pb.UnimplementedVolumeServerServer
|
|
inFlightUploadDataSize int64
|
|
inFlightDownloadDataSize int64
|
|
concurrentUploadLimit int64
|
|
concurrentDownloadLimit int64
|
|
inFlightUploadDataLimitCond *sync.Cond
|
|
inFlightDownloadDataLimitCond *sync.Cond
|
|
inflightUploadDataTimeout time.Duration
|
|
inflightDownloadDataTimeout time.Duration
|
|
hasSlowRead bool
|
|
readBufferSizeMB int
|
|
|
|
SeedMasterNodes []pb.ServerAddress
|
|
// seedMasterSet mirrors SeedMasterNodes keyed by the canonical http
|
|
// form. It is computed once in NewVolumeServer so admission paths can
|
|
// answer is-this-a-seed-master in O(1).
|
|
seedMasterSet map[string]struct{}
|
|
whiteList []string
|
|
currentMaster pb.ServerAddress
|
|
currentMasterLock sync.RWMutex
|
|
pulsePeriod time.Duration
|
|
dataCenter string
|
|
rack string
|
|
store *storage.Store
|
|
guard *security.Guard
|
|
grpcDialOption grpc.DialOption
|
|
|
|
needleMapKind storage.NeedleMapKind
|
|
ldbTimout int64
|
|
FixJpgOrientation bool
|
|
ReadMode string
|
|
AllowUntrustedRemoteEndpoints bool
|
|
compactionBytePerSecond int64
|
|
maintenanceBytePerSecond int64
|
|
metricsAddress string
|
|
metricsIntervalSec int
|
|
fileSizeLimitBytes int64
|
|
isHeartbeating bool
|
|
stopChan chan bool
|
|
}
|
|
|
|
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
|
|
port int, grpcPort int, publicUrl string, id string,
|
|
folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType, diskTags [][]string,
|
|
idxFolder string,
|
|
needleMapKind storage.NeedleMapKind,
|
|
masterNodes []pb.ServerAddress, pulsePeriod time.Duration,
|
|
dataCenter string, rack string,
|
|
whiteList []string,
|
|
fixJpgOrientation bool,
|
|
readMode string,
|
|
compactionMBPerSecond int,
|
|
maintenanceMBPerSecond int,
|
|
fileSizeLimitMB int,
|
|
concurrentUploadLimit int64,
|
|
concurrentDownloadLimit int64,
|
|
inflightUploadDataTimeout time.Duration,
|
|
inflightDownloadDataTimeout time.Duration,
|
|
hasSlowRead bool,
|
|
readBufferSizeMB int,
|
|
ldbTimeout int64,
|
|
allowUntrustedRemoteEndpoints bool,
|
|
diskProbeConfig stats.DiskIOProbeConfig,
|
|
) *VolumeServer {
|
|
|
|
v := util.GetViper()
|
|
signingKey := v.GetString("jwt.signing.key")
|
|
v.SetDefault("jwt.signing.expires_after_seconds", 10)
|
|
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
|
|
enableUiAccess := v.GetBool("access.ui")
|
|
|
|
readSigningKey := v.GetString("jwt.signing.read.key")
|
|
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
|
|
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
|
|
|
|
vs := &VolumeServer{
|
|
pulsePeriod: pulsePeriod,
|
|
dataCenter: dataCenter,
|
|
rack: rack,
|
|
needleMapKind: needleMapKind,
|
|
FixJpgOrientation: fixJpgOrientation,
|
|
ReadMode: readMode,
|
|
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
|
|
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
|
|
maintenanceBytePerSecond: int64(maintenanceMBPerSecond) * 1024 * 1024,
|
|
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
|
|
isHeartbeating: true,
|
|
stopChan: make(chan bool),
|
|
inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
|
|
inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
|
|
concurrentUploadLimit: concurrentUploadLimit,
|
|
concurrentDownloadLimit: concurrentDownloadLimit,
|
|
inflightUploadDataTimeout: inflightUploadDataTimeout,
|
|
inflightDownloadDataTimeout: inflightDownloadDataTimeout,
|
|
hasSlowRead: hasSlowRead,
|
|
readBufferSizeMB: readBufferSizeMB,
|
|
ldbTimout: ldbTimeout,
|
|
whiteList: whiteList,
|
|
AllowUntrustedRemoteEndpoints: allowUntrustedRemoteEndpoints,
|
|
}
|
|
|
|
whiteList = append(whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...)
|
|
// Copy the caller's slice so subsequent external mutation cannot desync
|
|
// SeedMasterNodes from the frozen lookup set built below.
|
|
seedMasters := make([]pb.ServerAddress, len(masterNodes))
|
|
copy(seedMasters, masterNodes)
|
|
vs.SeedMasterNodes = seedMasters
|
|
vs.seedMasterSet = make(map[string]struct{}, len(seedMasters))
|
|
for _, m := range seedMasters {
|
|
vs.seedMasterSet[m.ToHttpAddress()] = struct{}{}
|
|
}
|
|
|
|
vs.checkWithMaster()
|
|
|
|
vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, id, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, diskTags, ldbTimeout, diskProbeConfig)
|
|
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
|
|
|
|
handleStaticResources(adminMux)
|
|
adminMux.HandleFunc("/status", requestIDMiddleware(vs.statusHandler))
|
|
adminMux.HandleFunc("/healthz", requestIDMiddleware(vs.healthzHandler))
|
|
adminMux.HandleFunc("/readyz", requestIDMiddleware(vs.healthzHandler))
|
|
if signingKey == "" || enableUiAccess {
|
|
// only expose the volume server details for safe environments
|
|
adminMux.HandleFunc("/ui/index.html", requestIDMiddleware(vs.uiStatusHandler))
|
|
/*
|
|
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
|
|
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
|
|
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
|
|
*/
|
|
}
|
|
adminMux.HandleFunc("/", requestIDMiddleware(vs.privateStoreHandler))
|
|
if publicMux != adminMux {
|
|
// separated admin and public port
|
|
handleStaticResources(publicMux)
|
|
publicMux.HandleFunc("/", requestIDMiddleware(vs.publicReadOnlyHandler))
|
|
}
|
|
|
|
stats.VolumeServerConcurrentDownloadLimit.Set(float64(vs.concurrentDownloadLimit))
|
|
stats.VolumeServerConcurrentUploadLimit.Set(float64(vs.concurrentUploadLimit))
|
|
stats.VolumeServerStartTimeSeconds.Set(float64(time.Now().Unix()))
|
|
|
|
go vs.heartbeat()
|
|
go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
|
|
|
|
return vs
|
|
}
|
|
|
|
func (vs *VolumeServer) SetStopping() {
|
|
glog.V(0).Infoln("Stopping volume server...")
|
|
vs.store.SetStopping()
|
|
}
|
|
|
|
func (vs *VolumeServer) LoadNewVolumes() {
|
|
glog.V(0).Infoln(" Loading new volume ids ...")
|
|
vs.store.LoadNewVolumes()
|
|
}
|
|
|
|
func (vs *VolumeServer) Shutdown() {
|
|
glog.V(0).Infoln("Shutting down volume server...")
|
|
vs.store.Close()
|
|
glog.V(0).Infoln("Shut down successfully!")
|
|
}
|
|
|
|
func (vs *VolumeServer) Reload() {
|
|
glog.V(0).Infoln("Reload volume server...")
|
|
|
|
util.LoadConfiguration("security", false)
|
|
v := util.GetViper()
|
|
vs.guard.UpdateWhiteList(append(vs.whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...))
|
|
vs.guard.UpdateSigningKeys(
|
|
v.GetString("jwt.signing.key"),
|
|
v.GetInt("jwt.signing.expires_after_seconds"),
|
|
v.GetString("jwt.signing.read.key"),
|
|
v.GetInt("jwt.signing.read.expires_after_seconds"),
|
|
)
|
|
}
|
|
|
|
// Returns whether a volume server is in maintenance (i.e. read-only) mode.
|
|
func (vs *VolumeServer) MaintenanceMode() bool {
|
|
if vs.store == nil {
|
|
return false
|
|
}
|
|
return vs.store.State.Proto().GetMaintenance()
|
|
}
|
|
|
|
// Checks if a volume server is in maintenance mode, and returns an error explaining why.
|
|
func (vs *VolumeServer) CheckMaintenanceMode() error {
|
|
if !vs.MaintenanceMode() {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("volume server %s is in maintenance mode", vs.store.Id)
|
|
}
|