mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
79ac279fe1
* feat(ec): add encode_ts_ns to EC shard metadata and the shard read RPC EcShardConfig and VolumeEcShardReadRequest gain an int64 encode_ts_ns (encode time in unix nanos). It rides in .vif and the read request so a read can be scoped to the encode run that produced the index. * fix(ec): stamp each encode and reject cross-run shard reads Generate stamps EncodeTsNs into the volume's .vif. Reads carry it to the shard's owning volume (resolved together via FindEcVolumeWithShard, so a multi-disk server validates the disk that actually serves the bytes) and reject a shard from a different encode run, recovering from parity. A zero on either side (pre-upgrade volume) skips the guard. * fix(ec): stamp the encode identity on the worker-generated .vif The worker-local encode path now writes EncodeTsNs (and the resolved EC ratio) into the .vif, so the read guard is not silently off for volumes encoded by the maintenance worker. * fix(ec): wipe stale EC artifacts before re-encoding VolumeEcShardsGenerate evicts any in-memory EcVolume for the volume and removes its on-disk shard/index/sidecar files before writing fresh ones, so a retried encode never builds on a partial prior run and the unlink frees the inodes instead of leaving open fds serving old bytes. * fix(ec): unmount EC shards across all disks UnmountEcShards walked only the first disk holding the shard, leaving a duplicate copy mounted on a sibling disk (split-disk reconciled volumes) still serving and heartbeating. Traverse every disk and emit one deletion delta per disk. * fix(ec): delete orphan shards without a local .ecx deleteEcShardIdsForEachLocation gated shard-file removal on a local .ecx, so it could not clean an orphan .ecNN left by a failed copy on a disk with no index. Delete the requested shard files unconditionally; the index-file (.ecx/.ecj/.vif) routing stays gated as before. * fix(ec): clear stale EC shards cluster-wide before re-encoding ec.encode unmounts and deletes EC shards for the target volumes on every node before regenerating: fatal for the shards the topology reports (mounted leftovers), best-effort for the rest (a sweep that catches unmounted failed-copy orphans). A down node is a no-op. * fix(ec): don't nil EC fds on close so reads can't race eviction A reader resolves an EcVolume/shard under the lock then reads after it is released, so an eviction that nils ecxFile/ecdFile would race that read and panic. Close the fds without nilling the fields: the field is now write-once (no data race) and a concurrent read hits a closed fd, getting a clean error that the caller recovers from parity. * fix(ec): wipe stale EC artifacts on every disk and surface failures The pre-encode wipe only deleted beside the source volume, so a stale shard on a sibling disk survived and could be mounted against the new index at reconcile. Sweep every disk. Removal also ignored os.Remove errors, reporting a failed cleanup as success and letting a stale shard join the next generation; surface the first real failure (treating already-gone as success) from removeStaleEcArtifacts and the shard delete. * fix(ec): log when a local shard is skipped for a different encode run The cross-run guard returned errShardNotLocal, indistinguishable in logs from a genuinely-absent shard. Add a V(1) line naming both EncodeTsNs so operators can tell "wrong encode generation" from "shard not here". * fix(ec): surface metadata removal failures in the shard delete path deleteEcShardIdsForEachLocation still dropped os.Remove errors on the .ecx/.ecj/.vif/sidecar cleanup. A surviving stale .ecx is the orphan-index condition this path prevents, so route those through removeFileIfExists and return the first real failure instead of reporting cleanup as success. * fix(ec): fail orphan cleanup when a reachable node's delete fails The pre-encode orphan sweep swallowed every error for unreported (node, volume) pairs. That is only safe for an unreachable node, which cannot receive this encode's new generation. A reachable node whose delete genuinely failed (permission/IO) keeps an orphan shard that a later copy re-stamps with the new run's volume-level .vif identity, so the read guard would accept stale data. Surface those; stay best-effort only for unreachable nodes (gRPC Unavailable / no status). * fix(ec): guard ecjFile under its lock in the EC delete path EcVolume.Close nils ecjFile under ecjFileAccessLock; a delete that resolved its .ecx lookup before a concurrent eviction (the generate-time UnloadEcVolume) could then reach the journal append with a nil fd. Bail with a clear "volume closed" error under the lock instead. * fix(ec): reject an unstamped shard when the caller has an encode identity The read guard required both identities nonzero, so a current (stamped) caller accepted a holder with identity 0 and could be served a stale pre-upgrade shard. Reject when the caller is stamped and the holder differs (including unstamped); stay lenient only when the caller itself has no identity (pre-upgrade reader). A skipped shard recovers from parity. * fix(ec): full-teardown delete so cluster cleanup wipes a whole generation The pre-encode cluster sweep deleted only the listed canonical shards on remote nodes, leaving index/sidecar (and, on builds with versioned generations, those too) behind. Add a full_teardown flag to VolumeEcShardsDelete that evicts the volume and wipes every EC artifact for it on every disk via removeStaleEcArtifacts; the shell and worker pre-encode cleanup paths set it. Other delete callers (balance/decode/repair) are unchanged. * fix(ec): take ecjFileAccessLock before the nil-check in Sync and Close Sync and Close read ev.ecjFile before acquiring ecjFileAccessLock while Close nils it under the lock, a data race on the field. Take the lock first, then nil-check inside, in both. * fix(ec): acknowledge full_teardown so a pre-upgrade server can't fake success An old volume server silently ignores full_teardown and returns success for an ordinary delete, so the caller wrongly believes the generation was wiped and copies a fresh gen-0 onto an unwiped node. Echo full_teardown_done in the response; the worker destination cleanup fails when it is absent, and the shell cluster sweep fails for a reported (mounted) leftover while staying best-effort for an unreported node. encode_ts_ns stays an accepted transient (an old server just skips the new read guard, no regression). * fix(ec): fail the pre-encode sweep for any reachable node that can't ack teardown A reachable pre-upgrade server ignores full_teardown and returns success without wiping an orphan, which a later copy then folds into the new generation. Treat a missing full_teardown_done ack as fatal for every reachable node (best-effort only for a gRPC-unreachable one), not just for topology-reported pairs. * fix(ec): return the served shard identity and validate it client-side The encode identity was only enforced server-side, so a pre-upgrade server ignored the request field and served bytes unchecked. Echo the served shard's EncodeTsNs on every read response chunk and have the client reject a mismatch (including 0 from an old server), so the guard holds regardless of server version; a rejected read recovers from parity. * fix(ec): reject a short/empty remote shard read instead of serving zeros doReadRemoteEcShardInterval accepted an immediate EOF or a short stream and returned success with a partly zero-filled, unvalidated buffer (the server stamps the identity only on chunks that carry bytes). A non-deleted interval must arrive whole: require n == len(buf), exempting the is_deleted short-circuit (n=0), matching readLocalEcShardInterval's local check. A short read now fails so the caller recovers from parity. * test(ec): fake volume server echoes the full_teardown acknowledgement The worker now fails a teardown delete that isn't acknowledged (so a pre-upgrade server can't silently skip the wipe). The fake server's no-op VolumeEcShardsDelete returned an empty response, which the worker read as a skipped teardown and aborted the encode. Echo full_teardown_done. * feat(ec): mirror the encode-run identity guard + full_teardown into the Rust volume server The Go volume server stamps an encode-run identity (encode_ts_ns) into the .vif and rejects a read served from a shard of a different run; full_teardown wipes a whole generation and acknowledges it. The Rust volume server had none of it. Mirror the shared logic: load encode_ts_ns from the .vif onto the EcVolume, stamp it on every read response, and reject a request/response mismatch on both the server and the distributed-read client (recovering from parity); handle full_teardown by evicting the volume and wiping every EC artifact on each disk, echoing full_teardown_done so the caller can detect a server that ignored it. * fix(ec): remove a stale .vif on full teardown of a shard-only node A shard copy installs shards + .ecx before .vif, so an interrupted copy after a teardown could mount the new files under the previous run's identity / version / shard ratio / dat_file_size carried by the surviving .vif. Remove .vif during full teardown, gated on .idx absence so a source-volume holder keeps its live .vif. In Rust this lives in a teardown-only helper so the reconcile / load- fallback paths (which share the base removal) still preserve .vif. * fix(ec): treat a missing teardown ack as fatal, not as an unreachable node isNodeUnreachable returned true for any non-gRPC-status error, so a reachable pre-upgrade server's missing full_teardown_done ack (a plain error) was classified unreachable and the unreported pair was silently skipped. Classify only a real codes.Unavailable as unreachable, and wrap the missing ack in a sentinel the sweep treats as fatal regardless. A genuinely down node still surfaces as Unavailable from the RPC and stays best-effort. * fix(ec): reject a short shard read in the local EC needle reader read_ec_shard_needle ignored the byte count from shard.read_at and appended the whole pre-sized buffer, so a truncated shard's zero-filled tail passed the later length check and parsed as garbage. Require n == buf.len() per interval, erroring on a short read like the local interval reader already does. * fix(ec): probe reachability before skipping a node that returns Unavailable The pre-encode sweep skipped any node whose teardown delete returned codes.Unavailable, but a reachable volume server in maintenance mode also returns that code for the maintenance-gated delete, so its stale EC files were left behind on a node that can still receive the new generation. Confirm with a non-maintenance-gated empty-target Ping: skip only when the node fails the probe too (genuinely unreachable). * fix(ec): use try_exists for the teardown .vif .idx guard The teardown-only .vif removal gated on Path::exists(), which returns false on a permission/IO stat error, so a stat failure on a present .idx would read as a shard-only node and delete the live source volume's .vif. Gate on try_exists() == Ok(false) instead, preserving the sidecar on any stat error. * fix(ec): only skip a sweep node when a Ping confirms it is transport-down The pre-encode sweep skipped a node whenever its teardown delete and a liveness Ping both failed, but it treated ANY Ping error as down — an application-level Internal/ResourceExhausted, or Unimplemented from a pre-Ping server, left a reachable node's stale generation in place. Classify the Ping tri-state and skip only when it transport-fails with codes.Unavailable; a reachable or inconclusive node stays fatal. * fix(ec): exclude sweep-skipped nodes from the encode's rebalance The pre-encode sweep skips a genuinely-down node best-effort, but the rebalance then recollected the current topology — a node that recovered between the two could become a copy target and receive the new generation while still holding its stale, never-cleared shards. Have the sweep return the skipped set and exclude those nodes from the rebalance for this encode, so a node we could not clean cannot receive the new generation. Standalone ec.balance is unaffected. * fix(ec): re-sweep recovered nodes before generation so they aren't stranded A node skipped as down by the pre-encode sweep is excluded from the rebalance, but it can recover and become the generation host — mounting all shards locally, then being excluded from distribution. Union-only verification accepts all shards on one node and deletes the originals: a single point of failure. Re-sweep the skipped nodes just before generation; one whose teardown now succeeds leaves the skipped set and rebalances normally, while a node still down stays skipped. * fix(ec): abort the encode if a selected source is still skipped after re-sweep The re-sweep un-skips a recovered node, but the source was selected before it and a node can stay down through the re-sweep then recover just in time to be the generation host — mounting all shards locally while still excluded from the rebalance, which union-only verification accepts before deleting the originals. Abort the encode when a selected source remains skipped after the re-sweep. * fix(ec): batch delete returns retriable 503 when a volume became EC mid-batch If a volume is not EC at the batch-delete classification but is encoded to EC and its .dat deleted before the regular-volume mutation, the mutation returns an exact "not found" that the filer chunk-GC treats as completed, dropping the delete. Recheck EC presence under the mutation lock and return a retriable 503 with the "try again" token so the filer requeues it onto the EC path. * fix(ec): recheck EC state before the regular batch-delete mutation ec.encode mounts EC shards (copied from the .dat) before deleting the originals, so a volume can be EC while its .dat still exists. The batch delete only rechecked EC after a NotFound, so a successful regular-volume delete in that window wrote a tombstone to the soon-removed .dat — the delete was lost and the needle resurrected from the pre-tombstone shards. Recheck has_ec_volume under the write lock before delete_volume_needle and return a retriable 503 so the filer requeues onto the EC path. * fix(volume): make the metrics push test independent of test order test_push_metrics_once asserted the pushed body contains the request-counter family without ever touching the counter — a CounterVec with no children emits nothing, so the assertion only held when another test had already created a labelset in the shared registry. Create one in the test itself.
538 lines
15 KiB
Go
538 lines
15 KiB
Go
package pluginworkers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
// VolumeServer provides a minimal volume server for erasure coding tests.
|
|
type VolumeServer struct {
|
|
volume_server_pb.UnimplementedVolumeServerServer
|
|
|
|
t *testing.T
|
|
|
|
server *grpc.Server
|
|
listener net.Listener
|
|
address string
|
|
baseDir string
|
|
|
|
mu sync.Mutex
|
|
receivedFiles map[string]uint64
|
|
mountRequests []*volume_server_pb.VolumeEcShardsMountRequest
|
|
deleteRequests []*volume_server_pb.VolumeDeleteRequest
|
|
markReadonlyCalls int
|
|
markWritableCalls int
|
|
readFileStatusCalls int
|
|
vacuumGarbageRatio float64
|
|
vacuumCommitReadOnly bool
|
|
vacuumCheckCalls int
|
|
vacuumCompactCalls int
|
|
vacuumCommitCalls int
|
|
vacuumCleanupCalls int
|
|
volumeCopyCalls int
|
|
volumeMountCalls int
|
|
tailReceiverCalls int
|
|
}
|
|
|
|
// NewVolumeServer starts a test volume server using the provided base directory.
|
|
func NewVolumeServer(t *testing.T, baseDir string) *VolumeServer {
|
|
t.Helper()
|
|
|
|
if baseDir == "" {
|
|
baseDir = t.TempDir()
|
|
}
|
|
if err := os.MkdirAll(baseDir, 0755); err != nil {
|
|
t.Fatalf("create volume base dir: %v", err)
|
|
}
|
|
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
t.Fatalf("listen volume server: %v", err)
|
|
}
|
|
|
|
grpcPort := listener.Addr().(*net.TCPAddr).Port
|
|
server := pb.NewGrpcServer()
|
|
vs := &VolumeServer{
|
|
t: t,
|
|
server: server,
|
|
listener: listener,
|
|
address: fmt.Sprintf("127.0.0.1:0.%d", grpcPort),
|
|
baseDir: baseDir,
|
|
receivedFiles: make(map[string]uint64),
|
|
}
|
|
|
|
volume_server_pb.RegisterVolumeServerServer(server, vs)
|
|
go func() {
|
|
_ = server.Serve(listener)
|
|
}()
|
|
|
|
t.Cleanup(func() {
|
|
vs.Shutdown()
|
|
})
|
|
|
|
return vs
|
|
}
|
|
|
|
// Address returns the gRPC address of the volume server.
|
|
func (v *VolumeServer) Address() string {
|
|
return v.address
|
|
}
|
|
|
|
// BaseDir returns the base directory used by the server.
|
|
func (v *VolumeServer) BaseDir() string {
|
|
return v.baseDir
|
|
}
|
|
|
|
// ReceivedFiles returns a snapshot of received files and byte counts.
|
|
func (v *VolumeServer) ReceivedFiles() map[string]uint64 {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
|
|
out := make(map[string]uint64, len(v.receivedFiles))
|
|
for key, value := range v.receivedFiles {
|
|
out[key] = value
|
|
}
|
|
return out
|
|
}
|
|
|
|
// SetVacuumGarbageRatio sets the garbage ratio returned by VacuumVolumeCheck.
|
|
func (v *VolumeServer) SetVacuumGarbageRatio(ratio float64) {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
v.vacuumGarbageRatio = ratio
|
|
}
|
|
|
|
// SetVacuumCommitReadOnly sets the IsReadOnly value returned by VacuumVolumeCommit.
|
|
func (v *VolumeServer) SetVacuumCommitReadOnly(readOnly bool) {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
v.vacuumCommitReadOnly = readOnly
|
|
}
|
|
|
|
// VacuumStats returns the vacuum RPC call counts.
|
|
func (v *VolumeServer) VacuumStats() (check, compact, commit, cleanup int) {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
return v.vacuumCheckCalls, v.vacuumCompactCalls, v.vacuumCommitCalls, v.vacuumCleanupCalls
|
|
}
|
|
|
|
// BalanceStats returns the balance RPC call counts.
|
|
func (v *VolumeServer) BalanceStats() (copyCalls, mountCalls, tailCalls int) {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
return v.volumeCopyCalls, v.volumeMountCalls, v.tailReceiverCalls
|
|
}
|
|
|
|
// MountRequests returns recorded mount requests.
|
|
func (v *VolumeServer) MountRequests() []*volume_server_pb.VolumeEcShardsMountRequest {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
|
|
out := make([]*volume_server_pb.VolumeEcShardsMountRequest, len(v.mountRequests))
|
|
copy(out, v.mountRequests)
|
|
return out
|
|
}
|
|
|
|
// DeleteRequests returns recorded delete requests.
|
|
func (v *VolumeServer) DeleteRequests() []*volume_server_pb.VolumeDeleteRequest {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
|
|
out := make([]*volume_server_pb.VolumeDeleteRequest, len(v.deleteRequests))
|
|
copy(out, v.deleteRequests)
|
|
return out
|
|
}
|
|
|
|
// MarkReadonlyCount returns the number of readonly calls.
|
|
func (v *VolumeServer) MarkReadonlyCount() int {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
return v.markReadonlyCalls
|
|
}
|
|
|
|
// MarkWritableCount returns the number of writable calls.
|
|
func (v *VolumeServer) MarkWritableCount() int {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
return v.markWritableCalls
|
|
}
|
|
|
|
// ReadFileStatusCount returns the number of ReadVolumeFileStatus calls.
|
|
func (v *VolumeServer) ReadFileStatusCount() int {
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
return v.readFileStatusCalls
|
|
}
|
|
|
|
// Shutdown stops the volume server.
|
|
func (v *VolumeServer) Shutdown() {
|
|
if v.server != nil {
|
|
v.server.GracefulStop()
|
|
}
|
|
if v.listener != nil {
|
|
_ = v.listener.Close()
|
|
}
|
|
}
|
|
|
|
func (v *VolumeServer) filePath(volumeID uint32, ext string) string {
|
|
return filepath.Join(v.baseDir, fmt.Sprintf("%d%s", volumeID, ext))
|
|
}
|
|
|
|
func (v *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
|
|
if req == nil {
|
|
return fmt.Errorf("copy file request is nil")
|
|
}
|
|
path := v.filePath(req.VolumeId, req.Ext)
|
|
file, err := os.Open(path)
|
|
if err != nil {
|
|
if req.IgnoreSourceFileNotFound {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
buf := make([]byte, 64*1024)
|
|
remaining := int64(req.GetStopOffset())
|
|
for {
|
|
if remaining == 0 {
|
|
break
|
|
}
|
|
|
|
readBuf := buf
|
|
if remaining > 0 && remaining < int64(len(buf)) {
|
|
readBuf = buf[:remaining]
|
|
}
|
|
|
|
n, readErr := file.Read(readBuf)
|
|
if n > 0 {
|
|
if err := stream.Send(&volume_server_pb.CopyFileResponse{FileContent: readBuf[:n]}); err != nil {
|
|
return err
|
|
}
|
|
if remaining > 0 {
|
|
remaining -= int64(n)
|
|
}
|
|
}
|
|
if readErr == io.EOF {
|
|
break
|
|
}
|
|
if readErr != nil {
|
|
return readErr
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (v *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_ReceiveFileServer) error {
|
|
var (
|
|
info *volume_server_pb.ReceiveFileInfo
|
|
file *os.File
|
|
bytesWritten uint64
|
|
filePath string
|
|
)
|
|
defer func() {
|
|
if file != nil {
|
|
_ = file.Close()
|
|
}
|
|
}()
|
|
|
|
for {
|
|
req, err := stream.Recv()
|
|
if err == io.EOF {
|
|
if info == nil {
|
|
return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{Error: "missing file info"})
|
|
}
|
|
v.mu.Lock()
|
|
v.receivedFiles[filePath] = bytesWritten
|
|
v.mu.Unlock()
|
|
return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{BytesWritten: bytesWritten})
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if reqInfo := req.GetInfo(); reqInfo != nil {
|
|
info = reqInfo
|
|
filePath = v.filePath(info.VolumeId, info.Ext)
|
|
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
|
|
return err
|
|
}
|
|
file, err = os.Create(filePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
|
|
chunk := req.GetFileContent()
|
|
if len(chunk) == 0 {
|
|
continue
|
|
}
|
|
if file == nil {
|
|
return fmt.Errorf("file info not received")
|
|
}
|
|
n, writeErr := file.Write(chunk)
|
|
if writeErr != nil {
|
|
return writeErr
|
|
}
|
|
bytesWritten += uint64(n)
|
|
}
|
|
}
|
|
|
|
func (v *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
|
|
v.mu.Lock()
|
|
v.mountRequests = append(v.mountRequests, req)
|
|
v.mu.Unlock()
|
|
return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
|
|
}
|
|
|
|
// VolumeEcShardsUnmount is a no-op stub: the worker's pre-distribute
|
|
// cleanup calls it against every destination, and the fake server has no
|
|
// mounted state to clear.
|
|
func (v *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
|
|
return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
|
|
}
|
|
|
|
// VolumeEcShardsDelete is a no-op stub paired with VolumeEcShardsUnmount
|
|
// above; the fake server doesn't persist shard files beyond what
|
|
// ReceiveFile wrote, so there's nothing to remove. It still echoes the
|
|
// full_teardown acknowledgement so the worker doesn't treat the fake as a
|
|
// pre-upgrade server that silently skipped the teardown.
|
|
func (v *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
|
|
return &volume_server_pb.VolumeEcShardsDeleteResponse{FullTeardownDone: req.FullTeardown}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) {
|
|
if req == nil {
|
|
return nil, fmt.Errorf("VolumeEcShardsInfo request is nil")
|
|
}
|
|
v.mu.Lock()
|
|
defer v.mu.Unlock()
|
|
|
|
// Report whichever shards exist on disk: seeded or mounted. Collection
|
|
// comes from the matching mount request when one exists.
|
|
collectionByShard := make(map[uint32]string)
|
|
for _, mr := range v.mountRequests {
|
|
if mr == nil || mr.VolumeId != req.VolumeId {
|
|
continue
|
|
}
|
|
for _, shardId := range mr.ShardIds {
|
|
if _, ok := collectionByShard[shardId]; !ok {
|
|
collectionByShard[shardId] = mr.Collection
|
|
}
|
|
}
|
|
}
|
|
|
|
resp := &volume_server_pb.VolumeEcShardsInfoResponse{}
|
|
prefix := fmt.Sprintf("%d.ec", req.VolumeId)
|
|
entries, _ := os.ReadDir(v.baseDir)
|
|
for _, entry := range entries {
|
|
if entry.IsDir() {
|
|
continue
|
|
}
|
|
name := entry.Name()
|
|
if !strings.HasPrefix(name, prefix) {
|
|
continue
|
|
}
|
|
suffix := strings.TrimPrefix(name, prefix)
|
|
if len(suffix) < 2 {
|
|
continue
|
|
}
|
|
var shardId uint32
|
|
if _, err := fmt.Sscanf(suffix[:2], "%d", &shardId); err != nil {
|
|
continue
|
|
}
|
|
var size int64
|
|
if info, err := entry.Info(); err == nil {
|
|
size = info.Size()
|
|
}
|
|
resp.EcShardInfos = append(resp.EcShardInfos, &volume_server_pb.EcShardInfo{
|
|
ShardId: shardId,
|
|
Size: size,
|
|
Collection: collectionByShard[shardId],
|
|
VolumeId: req.VolumeId,
|
|
})
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (v *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
|
|
v.mu.Lock()
|
|
v.deleteRequests = append(v.deleteRequests, req)
|
|
v.mu.Unlock()
|
|
|
|
if req != nil {
|
|
_ = os.Remove(v.filePath(req.VolumeId, ".dat"))
|
|
_ = os.Remove(v.filePath(req.VolumeId, ".idx"))
|
|
}
|
|
|
|
return &volume_server_pb.VolumeDeleteResponse{}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
|
|
v.mu.Lock()
|
|
v.markReadonlyCalls++
|
|
v.mu.Unlock()
|
|
return &volume_server_pb.VolumeMarkReadonlyResponse{}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
|
|
v.mu.Lock()
|
|
v.markWritableCalls++
|
|
v.mu.Unlock()
|
|
return &volume_server_pb.VolumeMarkWritableResponse{}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
|
|
v.mu.Lock()
|
|
v.readFileStatusCalls++
|
|
v.mu.Unlock()
|
|
|
|
datInfo, err := os.Stat(v.filePath(req.VolumeId, ".dat"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
idxInfo, err := os.Stat(v.filePath(req.VolumeId, ".idx"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &volume_server_pb.ReadVolumeFileStatusResponse{
|
|
VolumeId: req.VolumeId,
|
|
DatFileSize: uint64(datInfo.Size()),
|
|
IdxFileSize: uint64(idxInfo.Size()),
|
|
FileCount: 1,
|
|
}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
|
|
v.mu.Lock()
|
|
v.vacuumCheckCalls++
|
|
ratio := v.vacuumGarbageRatio
|
|
v.mu.Unlock()
|
|
return &volume_server_pb.VacuumVolumeCheckResponse{GarbageRatio: ratio}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
|
|
v.mu.Lock()
|
|
v.vacuumCompactCalls++
|
|
v.mu.Unlock()
|
|
return stream.Send(&volume_server_pb.VacuumVolumeCompactResponse{ProcessedBytes: 1024})
|
|
}
|
|
|
|
func (v *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
|
|
v.mu.Lock()
|
|
v.vacuumCommitCalls++
|
|
readOnly := v.vacuumCommitReadOnly
|
|
v.mu.Unlock()
|
|
return &volume_server_pb.VacuumVolumeCommitResponse{IsReadOnly: readOnly}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_server_pb.VacuumVolumeCleanupRequest) (*volume_server_pb.VacuumVolumeCleanupResponse, error) {
|
|
v.mu.Lock()
|
|
v.vacuumCleanupCalls++
|
|
v.mu.Unlock()
|
|
return &volume_server_pb.VacuumVolumeCleanupResponse{}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error {
|
|
v.mu.Lock()
|
|
v.volumeCopyCalls++
|
|
v.mu.Unlock()
|
|
|
|
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
|
|
var statusResp *volume_server_pb.ReadVolumeFileStatusResponse
|
|
if err := operation.WithVolumeServerClient(false, pb.ServerAddress(req.SourceDataNode), dialOption,
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
var readErr error
|
|
statusResp, readErr = client.ReadVolumeFileStatus(stream.Context(), &volume_server_pb.ReadVolumeFileStatusRequest{
|
|
VolumeId: req.VolumeId,
|
|
})
|
|
return readErr
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".dat", statusResp.DatFileSize, dialOption); err != nil {
|
|
return err
|
|
}
|
|
if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".idx", statusResp.IdxFileSize, dialOption); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := stream.Send(&volume_server_pb.VolumeCopyResponse{ProcessedBytes: int64(statusResp.DatFileSize + statusResp.IdxFileSize)}); err != nil {
|
|
return err
|
|
}
|
|
return stream.Send(&volume_server_pb.VolumeCopyResponse{LastAppendAtNs: uint64(time.Now().UnixNano())})
|
|
}
|
|
|
|
func (v *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
|
|
v.mu.Lock()
|
|
v.volumeMountCalls++
|
|
v.mu.Unlock()
|
|
return &volume_server_pb.VolumeMountResponse{}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
|
|
v.mu.Lock()
|
|
v.tailReceiverCalls++
|
|
v.mu.Unlock()
|
|
return &volume_server_pb.VolumeTailReceiverResponse{}, nil
|
|
}
|
|
|
|
func (v *VolumeServer) copyRemoteFile(ctx context.Context, sourceDataNode string, volumeID uint32, ext string, fileSize uint64, dialOption grpc.DialOption) error {
|
|
path := v.filePath(volumeID, ext)
|
|
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
|
|
return err
|
|
}
|
|
|
|
file, err := os.Create(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
return operation.WithVolumeServerClient(true, pb.ServerAddress(sourceDataNode), dialOption,
|
|
func(client volume_server_pb.VolumeServerClient) error {
|
|
stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
|
|
VolumeId: volumeID,
|
|
Ext: ext,
|
|
StopOffset: fileSize,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
resp, recvErr := stream.Recv()
|
|
if recvErr == io.EOF {
|
|
return nil
|
|
}
|
|
if recvErr != nil {
|
|
return recvErr
|
|
}
|
|
if len(resp.FileContent) == 0 {
|
|
continue
|
|
}
|
|
if _, err := file.Write(resp.FileContent); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
})
|
|
}
|