Files
Chris Lu 79ac279fe1 fix(ec): don't mix EC shards from different encode runs (#9880)
* feat(ec): add encode_ts_ns to EC shard metadata and the shard read RPC

EcShardConfig and VolumeEcShardReadRequest gain an int64 encode_ts_ns
(encode time in unix nanos). It rides in .vif and the read request so a
read can be scoped to the encode run that produced the index.

* fix(ec): stamp each encode and reject cross-run shard reads

Generate stamps EncodeTsNs into the volume's .vif. Reads carry it to the
shard's owning volume (resolved together via FindEcVolumeWithShard, so a
multi-disk server validates the disk that actually serves the bytes) and
reject a shard from a different encode run, recovering from parity. A
zero on either side (pre-upgrade volume) skips the guard.

* fix(ec): stamp the encode identity on the worker-generated .vif

The worker-local encode path now writes EncodeTsNs (and the resolved EC
ratio) into the .vif, so the read guard is not silently off for volumes
encoded by the maintenance worker.

* fix(ec): wipe stale EC artifacts before re-encoding

VolumeEcShardsGenerate evicts any in-memory EcVolume for the volume and
removes its on-disk shard/index/sidecar files before writing fresh ones,
so a retried encode never builds on a partial prior run and the unlink
frees the inodes instead of leaving open fds serving old bytes.

* fix(ec): unmount EC shards across all disks

UnmountEcShards walked only the first disk holding the shard, leaving a
duplicate copy mounted on a sibling disk (split-disk reconciled volumes)
still serving and heartbeating. Traverse every disk and emit one
deletion delta per disk.

* fix(ec): delete orphan shards without a local .ecx

deleteEcShardIdsForEachLocation gated shard-file removal on a local .ecx,
so it could not clean an orphan .ecNN left by a failed copy on a disk
with no index. Delete the requested shard files unconditionally; the
index-file (.ecx/.ecj/.vif) routing stays gated as before.

* fix(ec): clear stale EC shards cluster-wide before re-encoding

ec.encode unmounts and deletes EC shards for the target volumes on every
node before regenerating: fatal for the shards the topology reports
(mounted leftovers), best-effort for the rest (a sweep that catches
unmounted failed-copy orphans). A down node is a no-op.

* fix(ec): don't nil EC fds on close so reads can't race eviction

A reader resolves an EcVolume/shard under the lock then reads after it is
released, so an eviction that nils ecxFile/ecdFile would race that read
and panic. Close the fds without nilling the fields: the field is now
write-once (no data race) and a concurrent read hits a closed fd, getting
a clean error that the caller recovers from parity.

* fix(ec): wipe stale EC artifacts on every disk and surface failures

The pre-encode wipe only deleted beside the source volume, so a stale
shard on a sibling disk survived and could be mounted against the new
index at reconcile. Sweep every disk. Removal also ignored os.Remove
errors, reporting a failed cleanup as success and letting a stale shard
join the next generation; surface the first real failure (treating
already-gone as success) from removeStaleEcArtifacts and the shard delete.

* fix(ec): log when a local shard is skipped for a different encode run

The cross-run guard returned errShardNotLocal, indistinguishable in logs
from a genuinely-absent shard. Add a V(1) line naming both EncodeTsNs so
operators can tell "wrong encode generation" from "shard not here".

* fix(ec): surface metadata removal failures in the shard delete path

deleteEcShardIdsForEachLocation still dropped os.Remove errors on the
.ecx/.ecj/.vif/sidecar cleanup. A surviving stale .ecx is the orphan-index
condition this path prevents, so route those through removeFileIfExists and
return the first real failure instead of reporting cleanup as success.

* fix(ec): fail orphan cleanup when a reachable node's delete fails

The pre-encode orphan sweep swallowed every error for unreported (node,
volume) pairs. That is only safe for an unreachable node, which cannot
receive this encode's new generation. A reachable node whose delete
genuinely failed (permission/IO) keeps an orphan shard that a later copy
re-stamps with the new run's volume-level .vif identity, so the read guard
would accept stale data. Surface those; stay best-effort only for
unreachable nodes (gRPC Unavailable / no status).

* fix(ec): guard ecjFile under its lock in the EC delete path

EcVolume.Close nils ecjFile under ecjFileAccessLock; a delete that resolved
its .ecx lookup before a concurrent eviction (the generate-time
UnloadEcVolume) could then reach the journal append with a nil fd. Bail
with a clear "volume closed" error under the lock instead.

* fix(ec): reject an unstamped shard when the caller has an encode identity

The read guard required both identities nonzero, so a current (stamped)
caller accepted a holder with identity 0 and could be served a stale
pre-upgrade shard. Reject when the caller is stamped and the holder
differs (including unstamped); stay lenient only when the caller itself
has no identity (pre-upgrade reader). A skipped shard recovers from parity.

* fix(ec): full-teardown delete so cluster cleanup wipes a whole generation

The pre-encode cluster sweep deleted only the listed canonical shards on
remote nodes, leaving index/sidecar (and, on builds with versioned
generations, those too) behind. Add a full_teardown flag to
VolumeEcShardsDelete that evicts the volume and wipes every EC artifact for
it on every disk via removeStaleEcArtifacts; the shell and worker pre-encode
cleanup paths set it. Other delete callers (balance/decode/repair) are
unchanged.

* fix(ec): take ecjFileAccessLock before the nil-check in Sync and Close

Sync and Close read ev.ecjFile before acquiring ecjFileAccessLock while
Close nils it under the lock, a data race on the field. Take the lock
first, then nil-check inside, in both.

* fix(ec): acknowledge full_teardown so a pre-upgrade server can't fake success

An old volume server silently ignores full_teardown and returns success
for an ordinary delete, so the caller wrongly believes the generation was
wiped and copies a fresh gen-0 onto an unwiped node. Echo full_teardown_done
in the response; the worker destination cleanup fails when it is absent, and
the shell cluster sweep fails for a reported (mounted) leftover while staying
best-effort for an unreported node. encode_ts_ns stays an accepted transient
(an old server just skips the new read guard, no regression).

* fix(ec): fail the pre-encode sweep for any reachable node that can't ack teardown

A reachable pre-upgrade server ignores full_teardown and returns success
without wiping an orphan, which a later copy then folds into the new
generation. Treat a missing full_teardown_done ack as fatal for every
reachable node (best-effort only for a gRPC-unreachable one), not just for
topology-reported pairs.

* fix(ec): return the served shard identity and validate it client-side

The encode identity was only enforced server-side, so a pre-upgrade server
ignored the request field and served bytes unchecked. Echo the served
shard's EncodeTsNs on every read response chunk and have the client reject a
mismatch (including 0 from an old server), so the guard holds regardless of
server version; a rejected read recovers from parity.

* fix(ec): reject a short/empty remote shard read instead of serving zeros

doReadRemoteEcShardInterval accepted an immediate EOF or a short stream and
returned success with a partly zero-filled, unvalidated buffer (the server
stamps the identity only on chunks that carry bytes). A non-deleted interval
must arrive whole: require n == len(buf), exempting the is_deleted
short-circuit (n=0), matching readLocalEcShardInterval's local check. A short
read now fails so the caller recovers from parity.

* test(ec): fake volume server echoes the full_teardown acknowledgement

The worker now fails a teardown delete that isn't acknowledged (so a
pre-upgrade server can't silently skip the wipe). The fake server's no-op
VolumeEcShardsDelete returned an empty response, which the worker read as a
skipped teardown and aborted the encode. Echo full_teardown_done.

* feat(ec): mirror the encode-run identity guard + full_teardown into the Rust volume server

The Go volume server stamps an encode-run identity (encode_ts_ns) into the .vif
and rejects a read served from a shard of a different run; full_teardown wipes a
whole generation and acknowledges it. The Rust volume server had none of it.
Mirror the shared logic: load encode_ts_ns from the .vif onto the EcVolume,
stamp it on every read response, and reject a request/response mismatch on both
the server and the distributed-read client (recovering from parity); handle
full_teardown by evicting the volume and wiping every EC artifact on each disk,
echoing full_teardown_done so the caller can detect a server that ignored it.

* fix(ec): remove a stale .vif on full teardown of a shard-only node

A shard copy installs shards + .ecx before .vif, so an interrupted copy after a
teardown could mount the new files under the previous run's identity / version /
shard ratio / dat_file_size carried by the surviving .vif. Remove .vif during
full teardown, gated on .idx absence so a source-volume holder keeps its live
.vif. In Rust this lives in a teardown-only helper so the reconcile / load-
fallback paths (which share the base removal) still preserve .vif.

* fix(ec): treat a missing teardown ack as fatal, not as an unreachable node

isNodeUnreachable returned true for any non-gRPC-status error, so a reachable
pre-upgrade server's missing full_teardown_done ack (a plain error) was
classified unreachable and the unreported pair was silently skipped. Classify
only a real codes.Unavailable as unreachable, and wrap the missing ack in a
sentinel the sweep treats as fatal regardless. A genuinely down node still
surfaces as Unavailable from the RPC and stays best-effort.

* fix(ec): reject a short shard read in the local EC needle reader

read_ec_shard_needle ignored the byte count from shard.read_at and appended the
whole pre-sized buffer, so a truncated shard's zero-filled tail passed the later
length check and parsed as garbage. Require n == buf.len() per interval, erroring
on a short read like the local interval reader already does.

* fix(ec): probe reachability before skipping a node that returns Unavailable

The pre-encode sweep skipped any node whose teardown delete returned
codes.Unavailable, but a reachable volume server in maintenance mode also
returns that code for the maintenance-gated delete, so its stale EC files were
left behind on a node that can still receive the new generation. Confirm with a
non-maintenance-gated empty-target Ping: skip only when the node fails the probe
too (genuinely unreachable).

* fix(ec): use try_exists for the teardown .vif .idx guard

The teardown-only .vif removal gated on Path::exists(), which returns false on a
permission/IO stat error, so a stat failure on a present .idx would read as a
shard-only node and delete the live source volume's .vif. Gate on
try_exists() == Ok(false) instead, preserving the sidecar on any stat error.

* fix(ec): only skip a sweep node when a Ping confirms it is transport-down

The pre-encode sweep skipped a node whenever its teardown delete and a liveness
Ping both failed, but it treated ANY Ping error as down — an application-level
Internal/ResourceExhausted, or Unimplemented from a pre-Ping server, left a
reachable node's stale generation in place. Classify the Ping tri-state and skip
only when it transport-fails with codes.Unavailable; a reachable or inconclusive
node stays fatal.

* fix(ec): exclude sweep-skipped nodes from the encode's rebalance

The pre-encode sweep skips a genuinely-down node best-effort, but the rebalance
then recollected the current topology — a node that recovered between the two
could become a copy target and receive the new generation while still holding
its stale, never-cleared shards. Have the sweep return the skipped set and
exclude those nodes from the rebalance for this encode, so a node we could not
clean cannot receive the new generation. Standalone ec.balance is unaffected.

* fix(ec): re-sweep recovered nodes before generation so they aren't stranded

A node skipped as down by the pre-encode sweep is excluded from the rebalance,
but it can recover and become the generation host — mounting all shards locally,
then being excluded from distribution. Union-only verification accepts all
shards on one node and deletes the originals: a single point of failure. Re-sweep
the skipped nodes just before generation; one whose teardown now succeeds leaves
the skipped set and rebalances normally, while a node still down stays skipped.

* fix(ec): abort the encode if a selected source is still skipped after re-sweep

The re-sweep un-skips a recovered node, but the source was selected before it and
a node can stay down through the re-sweep then recover just in time to be the
generation host — mounting all shards locally while still excluded from the
rebalance, which union-only verification accepts before deleting the originals.
Abort the encode when a selected source remains skipped after the re-sweep.

* fix(ec): batch delete returns retriable 503 when a volume became EC mid-batch

If a volume is not EC at the batch-delete classification but is encoded to EC and
its .dat deleted before the regular-volume mutation, the mutation returns an exact
"not found" that the filer chunk-GC treats as completed, dropping the delete.
Recheck EC presence under the mutation lock and return a retriable 503 with the
"try again" token so the filer requeues it onto the EC path.

* fix(ec): recheck EC state before the regular batch-delete mutation

ec.encode mounts EC shards (copied from the .dat) before deleting the originals,
so a volume can be EC while its .dat still exists. The batch delete only rechecked
EC after a NotFound, so a successful regular-volume delete in that window wrote a
tombstone to the soon-removed .dat — the delete was lost and the needle resurrected
from the pre-tombstone shards. Recheck has_ec_volume under the write lock before
delete_volume_needle and return a retriable 503 so the filer requeues onto the EC path.

* fix(volume): make the metrics push test independent of test order

test_push_metrics_once asserted the pushed body contains the request-counter
family without ever touching the counter — a CounterVec with no children emits
nothing, so the assertion only held when another test had already created a
labelset in the shared registry. Create one in the test itself.
2026-06-10 22:31:18 -07:00

538 lines
15 KiB
Go

package pluginworkers
import (
"context"
"fmt"
"io"
"net"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// VolumeServer provides a minimal volume server for erasure coding tests.
type VolumeServer struct {
volume_server_pb.UnimplementedVolumeServerServer
t *testing.T
server *grpc.Server
listener net.Listener
address string
baseDir string
mu sync.Mutex
receivedFiles map[string]uint64
mountRequests []*volume_server_pb.VolumeEcShardsMountRequest
deleteRequests []*volume_server_pb.VolumeDeleteRequest
markReadonlyCalls int
markWritableCalls int
readFileStatusCalls int
vacuumGarbageRatio float64
vacuumCommitReadOnly bool
vacuumCheckCalls int
vacuumCompactCalls int
vacuumCommitCalls int
vacuumCleanupCalls int
volumeCopyCalls int
volumeMountCalls int
tailReceiverCalls int
}
// NewVolumeServer starts a test volume server using the provided base directory.
func NewVolumeServer(t *testing.T, baseDir string) *VolumeServer {
t.Helper()
if baseDir == "" {
baseDir = t.TempDir()
}
if err := os.MkdirAll(baseDir, 0755); err != nil {
t.Fatalf("create volume base dir: %v", err)
}
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen volume server: %v", err)
}
grpcPort := listener.Addr().(*net.TCPAddr).Port
server := pb.NewGrpcServer()
vs := &VolumeServer{
t: t,
server: server,
listener: listener,
address: fmt.Sprintf("127.0.0.1:0.%d", grpcPort),
baseDir: baseDir,
receivedFiles: make(map[string]uint64),
}
volume_server_pb.RegisterVolumeServerServer(server, vs)
go func() {
_ = server.Serve(listener)
}()
t.Cleanup(func() {
vs.Shutdown()
})
return vs
}
// Address returns the gRPC address of the volume server.
func (v *VolumeServer) Address() string {
return v.address
}
// BaseDir returns the base directory used by the server.
func (v *VolumeServer) BaseDir() string {
return v.baseDir
}
// ReceivedFiles returns a snapshot of received files and byte counts.
func (v *VolumeServer) ReceivedFiles() map[string]uint64 {
v.mu.Lock()
defer v.mu.Unlock()
out := make(map[string]uint64, len(v.receivedFiles))
for key, value := range v.receivedFiles {
out[key] = value
}
return out
}
// SetVacuumGarbageRatio sets the garbage ratio returned by VacuumVolumeCheck.
func (v *VolumeServer) SetVacuumGarbageRatio(ratio float64) {
v.mu.Lock()
defer v.mu.Unlock()
v.vacuumGarbageRatio = ratio
}
// SetVacuumCommitReadOnly sets the IsReadOnly value returned by VacuumVolumeCommit.
func (v *VolumeServer) SetVacuumCommitReadOnly(readOnly bool) {
v.mu.Lock()
defer v.mu.Unlock()
v.vacuumCommitReadOnly = readOnly
}
// VacuumStats returns the vacuum RPC call counts.
func (v *VolumeServer) VacuumStats() (check, compact, commit, cleanup int) {
v.mu.Lock()
defer v.mu.Unlock()
return v.vacuumCheckCalls, v.vacuumCompactCalls, v.vacuumCommitCalls, v.vacuumCleanupCalls
}
// BalanceStats returns the balance RPC call counts.
func (v *VolumeServer) BalanceStats() (copyCalls, mountCalls, tailCalls int) {
v.mu.Lock()
defer v.mu.Unlock()
return v.volumeCopyCalls, v.volumeMountCalls, v.tailReceiverCalls
}
// MountRequests returns recorded mount requests.
func (v *VolumeServer) MountRequests() []*volume_server_pb.VolumeEcShardsMountRequest {
v.mu.Lock()
defer v.mu.Unlock()
out := make([]*volume_server_pb.VolumeEcShardsMountRequest, len(v.mountRequests))
copy(out, v.mountRequests)
return out
}
// DeleteRequests returns recorded delete requests.
func (v *VolumeServer) DeleteRequests() []*volume_server_pb.VolumeDeleteRequest {
v.mu.Lock()
defer v.mu.Unlock()
out := make([]*volume_server_pb.VolumeDeleteRequest, len(v.deleteRequests))
copy(out, v.deleteRequests)
return out
}
// MarkReadonlyCount returns the number of readonly calls.
func (v *VolumeServer) MarkReadonlyCount() int {
v.mu.Lock()
defer v.mu.Unlock()
return v.markReadonlyCalls
}
// MarkWritableCount returns the number of writable calls.
func (v *VolumeServer) MarkWritableCount() int {
v.mu.Lock()
defer v.mu.Unlock()
return v.markWritableCalls
}
// ReadFileStatusCount returns the number of ReadVolumeFileStatus calls.
func (v *VolumeServer) ReadFileStatusCount() int {
v.mu.Lock()
defer v.mu.Unlock()
return v.readFileStatusCalls
}
// Shutdown stops the volume server.
func (v *VolumeServer) Shutdown() {
if v.server != nil {
v.server.GracefulStop()
}
if v.listener != nil {
_ = v.listener.Close()
}
}
func (v *VolumeServer) filePath(volumeID uint32, ext string) string {
return filepath.Join(v.baseDir, fmt.Sprintf("%d%s", volumeID, ext))
}
func (v *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
if req == nil {
return fmt.Errorf("copy file request is nil")
}
path := v.filePath(req.VolumeId, req.Ext)
file, err := os.Open(path)
if err != nil {
if req.IgnoreSourceFileNotFound {
return nil
}
return err
}
defer file.Close()
buf := make([]byte, 64*1024)
remaining := int64(req.GetStopOffset())
for {
if remaining == 0 {
break
}
readBuf := buf
if remaining > 0 && remaining < int64(len(buf)) {
readBuf = buf[:remaining]
}
n, readErr := file.Read(readBuf)
if n > 0 {
if err := stream.Send(&volume_server_pb.CopyFileResponse{FileContent: readBuf[:n]}); err != nil {
return err
}
if remaining > 0 {
remaining -= int64(n)
}
}
if readErr == io.EOF {
break
}
if readErr != nil {
return readErr
}
}
return nil
}
func (v *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_ReceiveFileServer) error {
var (
info *volume_server_pb.ReceiveFileInfo
file *os.File
bytesWritten uint64
filePath string
)
defer func() {
if file != nil {
_ = file.Close()
}
}()
for {
req, err := stream.Recv()
if err == io.EOF {
if info == nil {
return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{Error: "missing file info"})
}
v.mu.Lock()
v.receivedFiles[filePath] = bytesWritten
v.mu.Unlock()
return stream.SendAndClose(&volume_server_pb.ReceiveFileResponse{BytesWritten: bytesWritten})
}
if err != nil {
return err
}
if reqInfo := req.GetInfo(); reqInfo != nil {
info = reqInfo
filePath = v.filePath(info.VolumeId, info.Ext)
if err := os.MkdirAll(filepath.Dir(filePath), 0755); err != nil {
return err
}
file, err = os.Create(filePath)
if err != nil {
return err
}
continue
}
chunk := req.GetFileContent()
if len(chunk) == 0 {
continue
}
if file == nil {
return fmt.Errorf("file info not received")
}
n, writeErr := file.Write(chunk)
if writeErr != nil {
return writeErr
}
bytesWritten += uint64(n)
}
}
func (v *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
v.mu.Lock()
v.mountRequests = append(v.mountRequests, req)
v.mu.Unlock()
return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
}
// VolumeEcShardsUnmount is a no-op stub: the worker's pre-distribute
// cleanup calls it against every destination, and the fake server has no
// mounted state to clear.
func (v *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
}
// VolumeEcShardsDelete is a no-op stub paired with VolumeEcShardsUnmount
// above; the fake server doesn't persist shard files beyond what
// ReceiveFile wrote, so there's nothing to remove. It still echoes the
// full_teardown acknowledgement so the worker doesn't treat the fake as a
// pre-upgrade server that silently skipped the teardown.
func (v *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
return &volume_server_pb.VolumeEcShardsDeleteResponse{FullTeardownDone: req.FullTeardown}, nil
}
func (v *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) {
if req == nil {
return nil, fmt.Errorf("VolumeEcShardsInfo request is nil")
}
v.mu.Lock()
defer v.mu.Unlock()
// Report whichever shards exist on disk: seeded or mounted. Collection
// comes from the matching mount request when one exists.
collectionByShard := make(map[uint32]string)
for _, mr := range v.mountRequests {
if mr == nil || mr.VolumeId != req.VolumeId {
continue
}
for _, shardId := range mr.ShardIds {
if _, ok := collectionByShard[shardId]; !ok {
collectionByShard[shardId] = mr.Collection
}
}
}
resp := &volume_server_pb.VolumeEcShardsInfoResponse{}
prefix := fmt.Sprintf("%d.ec", req.VolumeId)
entries, _ := os.ReadDir(v.baseDir)
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasPrefix(name, prefix) {
continue
}
suffix := strings.TrimPrefix(name, prefix)
if len(suffix) < 2 {
continue
}
var shardId uint32
if _, err := fmt.Sscanf(suffix[:2], "%d", &shardId); err != nil {
continue
}
var size int64
if info, err := entry.Info(); err == nil {
size = info.Size()
}
resp.EcShardInfos = append(resp.EcShardInfos, &volume_server_pb.EcShardInfo{
ShardId: shardId,
Size: size,
Collection: collectionByShard[shardId],
VolumeId: req.VolumeId,
})
}
return resp, nil
}
func (v *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
v.mu.Lock()
v.deleteRequests = append(v.deleteRequests, req)
v.mu.Unlock()
if req != nil {
_ = os.Remove(v.filePath(req.VolumeId, ".dat"))
_ = os.Remove(v.filePath(req.VolumeId, ".idx"))
}
return &volume_server_pb.VolumeDeleteResponse{}, nil
}
func (v *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
v.mu.Lock()
v.markReadonlyCalls++
v.mu.Unlock()
return &volume_server_pb.VolumeMarkReadonlyResponse{}, nil
}
func (v *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
v.mu.Lock()
v.markWritableCalls++
v.mu.Unlock()
return &volume_server_pb.VolumeMarkWritableResponse{}, nil
}
func (v *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
v.mu.Lock()
v.readFileStatusCalls++
v.mu.Unlock()
datInfo, err := os.Stat(v.filePath(req.VolumeId, ".dat"))
if err != nil {
return nil, err
}
idxInfo, err := os.Stat(v.filePath(req.VolumeId, ".idx"))
if err != nil {
return nil, err
}
return &volume_server_pb.ReadVolumeFileStatusResponse{
VolumeId: req.VolumeId,
DatFileSize: uint64(datInfo.Size()),
IdxFileSize: uint64(idxInfo.Size()),
FileCount: 1,
}, nil
}
func (v *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
v.mu.Lock()
v.vacuumCheckCalls++
ratio := v.vacuumGarbageRatio
v.mu.Unlock()
return &volume_server_pb.VacuumVolumeCheckResponse{GarbageRatio: ratio}, nil
}
func (v *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
v.mu.Lock()
v.vacuumCompactCalls++
v.mu.Unlock()
return stream.Send(&volume_server_pb.VacuumVolumeCompactResponse{ProcessedBytes: 1024})
}
func (v *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
v.mu.Lock()
v.vacuumCommitCalls++
readOnly := v.vacuumCommitReadOnly
v.mu.Unlock()
return &volume_server_pb.VacuumVolumeCommitResponse{IsReadOnly: readOnly}, nil
}
func (v *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_server_pb.VacuumVolumeCleanupRequest) (*volume_server_pb.VacuumVolumeCleanupResponse, error) {
v.mu.Lock()
v.vacuumCleanupCalls++
v.mu.Unlock()
return &volume_server_pb.VacuumVolumeCleanupResponse{}, nil
}
func (v *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error {
v.mu.Lock()
v.volumeCopyCalls++
v.mu.Unlock()
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
var statusResp *volume_server_pb.ReadVolumeFileStatusResponse
if err := operation.WithVolumeServerClient(false, pb.ServerAddress(req.SourceDataNode), dialOption,
func(client volume_server_pb.VolumeServerClient) error {
var readErr error
statusResp, readErr = client.ReadVolumeFileStatus(stream.Context(), &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: req.VolumeId,
})
return readErr
}); err != nil {
return err
}
if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".dat", statusResp.DatFileSize, dialOption); err != nil {
return err
}
if err := v.copyRemoteFile(stream.Context(), req.SourceDataNode, req.VolumeId, ".idx", statusResp.IdxFileSize, dialOption); err != nil {
return err
}
if err := stream.Send(&volume_server_pb.VolumeCopyResponse{ProcessedBytes: int64(statusResp.DatFileSize + statusResp.IdxFileSize)}); err != nil {
return err
}
return stream.Send(&volume_server_pb.VolumeCopyResponse{LastAppendAtNs: uint64(time.Now().UnixNano())})
}
func (v *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
v.mu.Lock()
v.volumeMountCalls++
v.mu.Unlock()
return &volume_server_pb.VolumeMountResponse{}, nil
}
func (v *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
v.mu.Lock()
v.tailReceiverCalls++
v.mu.Unlock()
return &volume_server_pb.VolumeTailReceiverResponse{}, nil
}
func (v *VolumeServer) copyRemoteFile(ctx context.Context, sourceDataNode string, volumeID uint32, ext string, fileSize uint64, dialOption grpc.DialOption) error {
path := v.filePath(volumeID, ext)
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return err
}
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
return operation.WithVolumeServerClient(true, pb.ServerAddress(sourceDataNode), dialOption,
func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
VolumeId: volumeID,
Ext: ext,
StopOffset: fileSize,
})
if err != nil {
return err
}
for {
resp, recvErr := stream.Recv()
if recvErr == io.EOF {
return nil
}
if recvErr != nil {
return recvErr
}
if len(resp.FileContent) == 0 {
continue
}
if _, err := file.Write(resp.FileContent); err != nil {
return err
}
}
})
}