mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(balance): don't move remote-tiered volumes; don't fatal on missing .idx (#9335)
* fix(volume): don't fatal on missing .idx for remote-tiered volume A .vif left behind without its .idx (orphaned by a crashed move, partial copy, or hand-edit) would trip glog.Fatalf in checkIdxFile and take the whole volume server down on boot, killing every healthy volume on it too. For remote-tiered volumes treat it as a per-volume load error so the server can come up and the operator can clean up the stray .vif. Refs #9331. * fix(balance): skip remote-tiered volumes in admin balance detection The admin/worker balance detector had no equivalent of the shell-side guard ("does not move volume in remote storage" in command_volume_balance.go), so it scheduled moves on remote-tiered volumes. The "move" copies .idx/.vif to the destination and then calls Volume.Destroy on the source, which calls backendStorage.DeleteFile — deleting the remote object the destination's new .vif now points at. Populate HasRemoteCopy on the metrics emitted by both the admin maintenance scanner and the worker's master poll, then drop those volumes at the top of Detection. Fixes #9331. * Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix(volume): keep remote data on volume-move-driven delete The on-source delete after a volume move (admin/worker balance and shell volume.move) ran Volume.Destroy with no way to opt out of the remote-object cleanup. Volume.Destroy unconditionally calls backendStorage.DeleteFile for remote-tiered volumes, so a successful move would copy .idx/.vif to the destination and then nuke the cloud object the destination's new .vif was already pointing at. Add VolumeDeleteRequest.keep_remote_data and plumb it through Store.DeleteVolume / DiskLocation.DeleteVolume / Volume.Destroy. The balance task and shell volume.move set it to true; the post-tier-upload cleanup of other replicas and the over-replication trim in volume.fix.replication also set it to true since the remote object is still referenced. Other real-delete callers keep the default. The delete-before-receive path in VolumeCopy also sets it: the inbound copy carries a .vif that may reference the same cloud object as the existing volume. Refs #9331. * test(storage): in-process remote-tier integration tests Cover the four operations the user is most likely to run against a cloud-tiered volume — balance/move, vacuum, EC encode, EC decode — by registering a local-disk-backed BackendStorage as the "remote" tier and exercising the real Volume / DiskLocation / EC encoder code paths. Locks in: - Destroy(keepRemoteData=true) preserves the remote object (move case) - Destroy(keepRemoteData=false) deletes it (real-delete case) - Vacuum/compact on a remote-tier volume never deletes the remote object - EC encode requires the local .dat (callers must download first) - EC encode + rebuild round-trips after a tier-down Tests run in-process and finish in under a second total — no cluster, binary, or external storage required. * fix(rust-volume): keep remote data on volume-move-driven delete Mirror the Go fix in seaweed-volume: plumb keep_remote_data through grpc volume_delete → Store.delete_volume → DiskLocation.delete_volume → Volume.destroy, and skip the s3-tier delete_file call when the flag is set. The pre-receive cleanup in volume_copy passes true for the same reason as the Go side: the inbound copy carries a .vif that may reference the same cloud object as the existing volume. The Rust loader already warns rather than fataling on a stray .vif without an .idx (volume.rs load_index_inmemory / load_index_redb), so no counterpart to the Go fatal-on-missing-idx fix is needed. Refs #9331. * fix(volume): preserve remote tier on IO-error eviction; fix EC test target Two review nits: - Store.MaybeAddVolumes' periodic cleanup pass deleted IO-errored volumes with keepRemoteData=false, so a transient local fault on a remote-tiered volume would also nuke the cloud object. Track the delete reason via a parallel slice and pass keepRemoteData=v.HasRemoteFile() for IO-error evictions; TTL-expired evictions still pass false. - TestRemoteTier_ECEncodeDecode_AfterDownload deleted shards 0..3 but called them "parity" — by the klauspost/reedsolomon convention shards 0..DataShardsCount-1 are data and DataShardsCount..TotalShardsCount-1 are parity. Switch the loop to delete the parity range so the intent matches the indices. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
@@ -247,6 +247,9 @@ message VolumeUnmountResponse {
|
||||
message VolumeDeleteRequest {
|
||||
uint32 volume_id = 1;
|
||||
bool only_empty = 2;
|
||||
// when true, do not remove the cloud-tier object backing the volume.
|
||||
// used for moves where another server is taking over the same .vif.
|
||||
bool keep_remote_data = 3;
|
||||
}
|
||||
message VolumeDeleteResponse {
|
||||
}
|
||||
@@ -637,6 +640,7 @@ message FetchAndWriteNeedleRequest {
|
||||
}
|
||||
repeated Replica replicas = 6;
|
||||
string auth = 7;
|
||||
int32 download_concurrency = 8; // multipart download concurrency if supported by the remote storage client; for S3, 0 = default (5)
|
||||
// remote conf
|
||||
remote_pb.RemoteConf remote_conf = 15;
|
||||
remote_pb.RemoteStorageLocation remote_location = 16;
|
||||
|
||||
@@ -809,7 +809,7 @@ impl VolumeServer for VolumeGrpcService {
|
||||
}
|
||||
}
|
||||
store
|
||||
.delete_volume(vid, req.only_empty)
|
||||
.delete_volume(vid, req.only_empty, req.keep_remote_data)
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
self.state.volume_state_notify.notify_one();
|
||||
Ok(Response::new(volume_server_pb::VolumeDeleteResponse {}))
|
||||
@@ -1043,7 +1043,10 @@ impl VolumeServer for VolumeGrpcService {
|
||||
if store.find_volume(vid).is_some() {
|
||||
drop(store);
|
||||
let mut store = self.state.store.write().unwrap();
|
||||
store.delete_volume(vid, false).map_err(|e| {
|
||||
// keep remote data: the inbound copy carries a .vif that may
|
||||
// point at the same cloud-tier object the existing volume
|
||||
// references.
|
||||
store.delete_volume(vid, false, true).map_err(|e| {
|
||||
Status::internal(format!("failed to delete existing volume {}: {}", vid, e))
|
||||
})?;
|
||||
self.state.volume_state_notify.notify_one();
|
||||
|
||||
@@ -860,7 +860,7 @@ fn build_heartbeat_with_ec_status(
|
||||
}
|
||||
|
||||
for vid in delete_vids {
|
||||
let _ = loc.delete_volume(vid, false);
|
||||
let _ = loc.delete_volume(vid, false, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -386,13 +386,20 @@ impl DiskLocation {
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove, close, and delete all files for a volume.
|
||||
pub fn delete_volume(&mut self, vid: VolumeId, only_empty: bool) -> Result<(), VolumeError> {
|
||||
/// Remove, close, and delete all files for a volume. When keep_remote_data
|
||||
/// is true the cloud-tier object backing the volume is left intact — used
|
||||
/// by moves where another server is taking over the same .vif.
|
||||
pub fn delete_volume(
|
||||
&mut self,
|
||||
vid: VolumeId,
|
||||
only_empty: bool,
|
||||
keep_remote_data: bool,
|
||||
) -> Result<(), VolumeError> {
|
||||
if let Some(mut v) = self.volumes.remove(&vid) {
|
||||
crate::metrics::VOLUME_GAUGE
|
||||
.with_label_values(&[&v.collection, "volume"])
|
||||
.dec();
|
||||
v.destroy(only_empty)?;
|
||||
v.destroy(only_empty, keep_remote_data)?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(VolumeError::NotFound)
|
||||
@@ -413,7 +420,7 @@ impl DiskLocation {
|
||||
crate::metrics::VOLUME_GAUGE
|
||||
.with_label_values(&[&v.collection, "volume"])
|
||||
.dec();
|
||||
if let Err(e) = v.destroy(false) {
|
||||
if let Err(e) = v.destroy(false, false) {
|
||||
warn!(volume_id = vid.0, error = %e, "delete collection: failed to destroy volume");
|
||||
}
|
||||
}
|
||||
@@ -1157,7 +1164,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(loc.volumes_len(), 2);
|
||||
|
||||
loc.delete_volume(VolumeId(1), false).unwrap();
|
||||
loc.delete_volume(VolumeId(1), false, false).unwrap();
|
||||
assert_eq!(loc.volumes_len(), 1);
|
||||
assert!(loc.find_volume(VolumeId(1)).is_none());
|
||||
}
|
||||
|
||||
@@ -307,11 +307,18 @@ impl Store {
|
||||
)
|
||||
}
|
||||
|
||||
/// Delete a volume from any location.
|
||||
pub fn delete_volume(&mut self, vid: VolumeId, only_empty: bool) -> Result<(), VolumeError> {
|
||||
/// Delete a volume from any location. When keep_remote_data is true the
|
||||
/// cloud-tier object backing the volume is left intact — used by moves
|
||||
/// where another server is taking over the same .vif.
|
||||
pub fn delete_volume(
|
||||
&mut self,
|
||||
vid: VolumeId,
|
||||
only_empty: bool,
|
||||
keep_remote_data: bool,
|
||||
) -> Result<(), VolumeError> {
|
||||
for loc in &mut self.locations {
|
||||
if loc.find_volume(vid).is_some() {
|
||||
return loc.delete_volume(vid, only_empty);
|
||||
return loc.delete_volume(vid, only_empty, keep_remote_data);
|
||||
}
|
||||
}
|
||||
Err(VolumeError::NotFound)
|
||||
|
||||
@@ -3081,7 +3081,10 @@ impl Volume {
|
||||
}
|
||||
|
||||
/// Remove all volume files from disk.
|
||||
pub fn destroy(&mut self, only_empty: bool) -> Result<(), VolumeError> {
|
||||
/// Destroy removes everything related to this volume. When keep_remote_data
|
||||
/// is true the cloud-tier object backing the volume is left intact — used
|
||||
/// by moves where another server is taking over the same .vif.
|
||||
pub fn destroy(&mut self, only_empty: bool, keep_remote_data: bool) -> Result<(), VolumeError> {
|
||||
if only_empty && self.file_count() > 0 {
|
||||
return Err(VolumeError::NotEmpty);
|
||||
}
|
||||
@@ -3093,7 +3096,11 @@ impl Volume {
|
||||
}
|
||||
|
||||
let (storage_name, storage_key) = self.remote_storage_name_key();
|
||||
if self.has_remote_file && !storage_name.is_empty() && !storage_key.is_empty() {
|
||||
if !keep_remote_data
|
||||
&& self.has_remote_file
|
||||
&& !storage_name.is_empty()
|
||||
&& !storage_key.is_empty()
|
||||
{
|
||||
let backend = crate::remote_storage::s3_tier::global_s3_tier_registry()
|
||||
.read()
|
||||
.unwrap()
|
||||
@@ -3659,7 +3666,7 @@ mod tests {
|
||||
dat_path = v.file_name(".dat");
|
||||
idx_path = v.file_name(".idx");
|
||||
assert!(Path::new(&dat_path).exists());
|
||||
v.destroy(false).unwrap();
|
||||
v.destroy(false, false).unwrap();
|
||||
}
|
||||
|
||||
assert!(!Path::new(&dat_path).exists());
|
||||
@@ -4424,7 +4431,7 @@ mod tests {
|
||||
assert!(std::path::Path::new(&idx_path).exists());
|
||||
|
||||
// Destroy the volume
|
||||
v.destroy(false).unwrap();
|
||||
v.destroy(false, false).unwrap();
|
||||
|
||||
// .dat and .idx should be gone
|
||||
assert!(
|
||||
@@ -4481,7 +4488,7 @@ mod tests {
|
||||
assert!(std::path::Path::new(&dat_path).exists());
|
||||
assert!(std::path::Path::new(&idx_path).exists());
|
||||
|
||||
v.destroy(false).unwrap();
|
||||
v.destroy(false, false).unwrap();
|
||||
|
||||
assert!(
|
||||
!std::path::Path::new(&dat_path).exists(),
|
||||
|
||||
@@ -123,6 +123,7 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*types.VolumeHealthMet
|
||||
DeletedBytes: volInfo.DeletedByteCount,
|
||||
LastModified: time.Unix(int64(volInfo.ModifiedAtSecond), 0),
|
||||
IsReadOnly: volInfo.ReadOnly,
|
||||
HasRemoteCopy: volInfo.RemoteStorageName != "",
|
||||
IsECVolume: false, // Will be determined from volume structure
|
||||
ReplicaCount: 1, // Will be counted
|
||||
ExpectedReplicas: int(volInfo.ReplicaPlacement),
|
||||
|
||||
@@ -156,7 +156,7 @@ func backupFromLocation(volumeServer pb.ServerAddress, grpcDialOption grpc.DialO
|
||||
|
||||
// If local volume is larger than remote, recreate it
|
||||
if datSize > stats.TailOffset {
|
||||
if err := v.Destroy(false); err != nil {
|
||||
if err := v.Destroy(false, false); err != nil {
|
||||
v.Close()
|
||||
return fmt.Errorf("destroying volume: %w", err), false
|
||||
}
|
||||
|
||||
@@ -247,6 +247,9 @@ message VolumeUnmountResponse {
|
||||
message VolumeDeleteRequest {
|
||||
uint32 volume_id = 1;
|
||||
bool only_empty = 2;
|
||||
// when true, do not remove the cloud-tier object backing the volume.
|
||||
// used for moves where another server is taking over the same .vif.
|
||||
bool keep_remote_data = 3;
|
||||
}
|
||||
message VolumeDeleteResponse {
|
||||
}
|
||||
|
||||
@@ -1322,11 +1322,14 @@ func (*VolumeUnmountResponse) Descriptor() ([]byte, []int) {
|
||||
}
|
||||
|
||||
type VolumeDeleteRequest struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"`
|
||||
OnlyEmpty bool `protobuf:"varint,2,opt,name=only_empty,json=onlyEmpty,proto3" json:"only_empty,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"`
|
||||
OnlyEmpty bool `protobuf:"varint,2,opt,name=only_empty,json=onlyEmpty,proto3" json:"only_empty,omitempty"`
|
||||
// when true, do not remove the cloud-tier object backing the volume.
|
||||
// used for moves where another server is taking over the same .vif.
|
||||
KeepRemoteData bool `protobuf:"varint,3,opt,name=keep_remote_data,json=keepRemoteData,proto3" json:"keep_remote_data,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *VolumeDeleteRequest) Reset() {
|
||||
@@ -1373,6 +1376,13 @@ func (x *VolumeDeleteRequest) GetOnlyEmpty() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *VolumeDeleteRequest) GetKeepRemoteData() bool {
|
||||
if x != nil {
|
||||
return x.KeepRemoteData
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type VolumeDeleteResponse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
@@ -6824,11 +6834,12 @@ const file_volume_server_proto_rawDesc = "" +
|
||||
"\x13VolumeMountResponse\"3\n" +
|
||||
"\x14VolumeUnmountRequest\x12\x1b\n" +
|
||||
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\"\x17\n" +
|
||||
"\x15VolumeUnmountResponse\"Q\n" +
|
||||
"\x15VolumeUnmountResponse\"{\n" +
|
||||
"\x13VolumeDeleteRequest\x12\x1b\n" +
|
||||
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x1d\n" +
|
||||
"\n" +
|
||||
"only_empty\x18\x02 \x01(\bR\tonlyEmpty\"\x16\n" +
|
||||
"only_empty\x18\x02 \x01(\bR\tonlyEmpty\x12(\n" +
|
||||
"\x10keep_remote_data\x18\x03 \x01(\bR\x0ekeepRemoteData\"\x16\n" +
|
||||
"\x14VolumeDeleteResponse\"R\n" +
|
||||
"\x19VolumeMarkReadonlyRequest\x12\x1b\n" +
|
||||
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x18\n" +
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.6.1
|
||||
// - protoc v3.21.12
|
||||
// - protoc-gen-go-grpc v1.5.1
|
||||
// - protoc v6.33.4
|
||||
// source: volume_server.proto
|
||||
|
||||
package volume_server_pb
|
||||
@@ -781,148 +781,148 @@ type VolumeServerServer interface {
|
||||
type UnimplementedVolumeServerServer struct{}
|
||||
|
||||
func (UnimplementedVolumeServerServer) BatchDelete(context.Context, *BatchDeleteRequest) (*BatchDeleteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method BatchDelete not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method BatchDelete not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VacuumVolumeCheck(context.Context, *VacuumVolumeCheckRequest) (*VacuumVolumeCheckResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCheck not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCheck not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VacuumVolumeCompact(*VacuumVolumeCompactRequest, grpc.ServerStreamingServer[VacuumVolumeCompactResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method VacuumVolumeCompact not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method VacuumVolumeCompact not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VacuumVolumeCommit(context.Context, *VacuumVolumeCommitRequest) (*VacuumVolumeCommitResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCommit not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCommit not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VacuumVolumeCleanup(context.Context, *VacuumVolumeCleanupRequest) (*VacuumVolumeCleanupResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCleanup not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCleanup not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) DeleteCollection(context.Context, *DeleteCollectionRequest) (*DeleteCollectionResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method DeleteCollection not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method DeleteCollection not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) AllocateVolume(context.Context, *AllocateVolumeRequest) (*AllocateVolumeResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method AllocateVolume not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method AllocateVolume not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeSyncStatus(context.Context, *VolumeSyncStatusRequest) (*VolumeSyncStatusResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeSyncStatus not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeSyncStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeIncrementalCopy(*VolumeIncrementalCopyRequest, grpc.ServerStreamingServer[VolumeIncrementalCopyResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method VolumeIncrementalCopy not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeIncrementalCopy not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeMount(context.Context, *VolumeMountRequest) (*VolumeMountResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeMount not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeMount not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeUnmount(context.Context, *VolumeUnmountRequest) (*VolumeUnmountResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeUnmount not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeUnmount not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeDelete(context.Context, *VolumeDeleteRequest) (*VolumeDeleteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeDelete not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeDelete not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeMarkWritable(context.Context, *VolumeMarkWritableRequest) (*VolumeMarkWritableResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeMarkWritable not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkWritable not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeConfigure(context.Context, *VolumeConfigureRequest) (*VolumeConfigureResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeConfigure not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeConfigure not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeStatus(context.Context, *VolumeStatusRequest) (*VolumeStatusResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeStatus not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) GetState(context.Context, *GetStateRequest) (*GetStateResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method GetState not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetState not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) SetState(context.Context, *SetStateRequest) (*SetStateResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method SetState not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SetState not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeCopy(*VolumeCopyRequest, grpc.ServerStreamingServer[VolumeCopyResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method VolumeCopy not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeCopy not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReadVolumeFileStatus(context.Context, *ReadVolumeFileStatusRequest) (*ReadVolumeFileStatusResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ReadVolumeFileStatus not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ReadVolumeFileStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) CopyFile(*CopyFileRequest, grpc.ServerStreamingServer[CopyFileResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method CopyFile not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method CopyFile not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReceiveFile(grpc.ClientStreamingServer[ReceiveFileRequest, ReceiveFileResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method ReceiveFile not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method ReceiveFile not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReadNeedleBlob(context.Context, *ReadNeedleBlobRequest) (*ReadNeedleBlobResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ReadNeedleBlob not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ReadNeedleBlob not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReadNeedleMeta(context.Context, *ReadNeedleMetaRequest) (*ReadNeedleMetaResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ReadNeedleMeta not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ReadNeedleMeta not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) WriteNeedleBlob(context.Context, *WriteNeedleBlobRequest) (*WriteNeedleBlobResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method WriteNeedleBlob not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method WriteNeedleBlob not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ReadAllNeedles(*ReadAllNeedlesRequest, grpc.ServerStreamingServer[ReadAllNeedlesResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method ReadAllNeedles not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method ReadAllNeedles not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeTailSender(*VolumeTailSenderRequest, grpc.ServerStreamingServer[VolumeTailSenderResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method VolumeTailSender not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeTailSender not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeTailReceiver(context.Context, *VolumeTailReceiverRequest) (*VolumeTailReceiverResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeTailReceiver not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeTailReceiver not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsGenerate(context.Context, *VolumeEcShardsGenerateRequest) (*VolumeEcShardsGenerateResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsGenerate not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsGenerate not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsRebuild(context.Context, *VolumeEcShardsRebuildRequest) (*VolumeEcShardsRebuildResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsRebuild not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsRebuild not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsCopy(context.Context, *VolumeEcShardsCopyRequest) (*VolumeEcShardsCopyResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsCopy not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsCopy not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsDelete(context.Context, *VolumeEcShardsDeleteRequest) (*VolumeEcShardsDeleteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsDelete not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsDelete not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsMount(context.Context, *VolumeEcShardsMountRequest) (*VolumeEcShardsMountResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsMount not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsMount not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsUnmount(context.Context, *VolumeEcShardsUnmountRequest) (*VolumeEcShardsUnmountResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsUnmount not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsUnmount not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardRead(*VolumeEcShardReadRequest, grpc.ServerStreamingServer[VolumeEcShardReadResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method VolumeEcShardRead not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeEcShardRead not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcBlobDelete(context.Context, *VolumeEcBlobDeleteRequest) (*VolumeEcBlobDeleteResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcBlobDelete not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcBlobDelete not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsToVolume(context.Context, *VolumeEcShardsToVolumeRequest) (*VolumeEcShardsToVolumeResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsToVolume not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsToVolume not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeEcShardsInfo(context.Context, *VolumeEcShardsInfoRequest) (*VolumeEcShardsInfoResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsInfo not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsInfo not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeTierMoveDatToRemote(*VolumeTierMoveDatToRemoteRequest, grpc.ServerStreamingServer[VolumeTierMoveDatToRemoteResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method VolumeTierMoveDatToRemote not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeTierMoveDatToRemote not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeTierMoveDatFromRemote(*VolumeTierMoveDatFromRemoteRequest, grpc.ServerStreamingServer[VolumeTierMoveDatFromRemoteResponse]) error {
|
||||
return status.Error(codes.Unimplemented, "method VolumeTierMoveDatFromRemote not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method VolumeTierMoveDatFromRemote not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeServerStatus(context.Context, *VolumeServerStatusRequest) (*VolumeServerStatusResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeServerStatus not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeServerStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeServerLeave(context.Context, *VolumeServerLeaveRequest) (*VolumeServerLeaveResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeServerLeave not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeServerLeave not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) FetchAndWriteNeedle(context.Context, *FetchAndWriteNeedleRequest) (*FetchAndWriteNeedleResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method FetchAndWriteNeedle not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method FetchAndWriteNeedle not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ScrubVolume(context.Context, *ScrubVolumeRequest) (*ScrubVolumeResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ScrubVolume not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ScrubVolume not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) ScrubEcVolume(context.Context, *ScrubEcVolumeRequest) (*ScrubEcVolumeResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method ScrubEcVolume not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method ScrubEcVolume not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) Query(*QueryRequest, grpc.ServerStreamingServer[QueriedStripe]) error {
|
||||
return status.Error(codes.Unimplemented, "method Query not implemented")
|
||||
return status.Errorf(codes.Unimplemented, "method Query not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) VolumeNeedleStatus(context.Context, *VolumeNeedleStatusRequest) (*VolumeNeedleStatusResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method VolumeNeedleStatus not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeNeedleStatus not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) Ping(context.Context, *PingRequest) (*PingResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method Ping not implemented")
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
|
||||
}
|
||||
func (UnimplementedVolumeServerServer) mustEmbedUnimplementedVolumeServerServer() {}
|
||||
func (UnimplementedVolumeServerServer) testEmbeddedByValue() {}
|
||||
@@ -935,7 +935,7 @@ type UnsafeVolumeServerServer interface {
|
||||
}
|
||||
|
||||
func RegisterVolumeServerServer(s grpc.ServiceRegistrar, srv VolumeServerServer) {
|
||||
// If the following call panics, it indicates UnimplementedVolumeServerServer was
|
||||
// If the following call pancis, it indicates UnimplementedVolumeServerServer was
|
||||
// embedded by pointer and is nil. This will cause panics if an
|
||||
// unimplemented method is ever invoked, so we test this at initialization
|
||||
// time to prevent it from happening at runtime later due to I/O.
|
||||
|
||||
@@ -153,6 +153,7 @@ func buildVolumeMetrics(
|
||||
ReplicaCount: 1,
|
||||
ExpectedReplicas: int(volume.ReplicaPlacement),
|
||||
IsReadOnly: volume.ReadOnly,
|
||||
HasRemoteCopy: volume.RemoteStorageName != "",
|
||||
}
|
||||
if metric.Size > 0 {
|
||||
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
|
||||
|
||||
@@ -171,7 +171,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
|
||||
return resp, err
|
||||
}
|
||||
|
||||
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), req.OnlyEmpty)
|
||||
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), req.OnlyEmpty, req.KeepRemoteData)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("volume delete %v: %v", req, err)
|
||||
|
||||
@@ -36,7 +36,9 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
|
||||
|
||||
glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
|
||||
|
||||
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), false)
|
||||
// keep remote data: the inbound copy carries a .vif that may point at
|
||||
// the same cloud-tier object the existing volume references.
|
||||
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), false, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
|
||||
}
|
||||
|
||||
@@ -352,7 +352,7 @@ func doDeleteVolumesWithLocations(commandEnv *CommandEnv, volumeIds []needle.Vol
|
||||
|
||||
for _, l := range locations {
|
||||
ewg.Add(func() error {
|
||||
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
|
||||
if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false, false); err != nil {
|
||||
return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
|
||||
}
|
||||
fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
|
||||
|
||||
@@ -50,6 +50,6 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i
|
||||
|
||||
volumeId := needle.VolumeId(*volumeIdInt)
|
||||
|
||||
return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, false)
|
||||
return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, false, false)
|
||||
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri
|
||||
if *applyBalancing {
|
||||
log.Printf("deleting empty volume %d from %s", v.Id, dn.Id)
|
||||
if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id),
|
||||
pb.NewServerAddressFromDataNode(dn), true); deleteErr != nil {
|
||||
pb.NewServerAddressFromDataNode(dn), true, false); deleteErr != nil {
|
||||
err = deleteErr
|
||||
}
|
||||
continue
|
||||
|
||||
@@ -312,8 +312,10 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr
|
||||
}
|
||||
}
|
||||
|
||||
// Surplus replica being trimmed; keep the remote object since other
|
||||
// replicas of the same .vif still reference it.
|
||||
if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id),
|
||||
pb.NewServerAddressFromDataNode(replica.location.dataNode), false); err != nil {
|
||||
pb.NewServerAddressFromDataNode(replica.location.dataNode), false, true); err != nil {
|
||||
fmt.Fprintf(writer, "deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ func (c *commandVolumeMerge) Do(args []string, commandEnv *CommandEnv, writer io
|
||||
if !cleanupTarget {
|
||||
return
|
||||
}
|
||||
if delErr := deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false); delErr != nil {
|
||||
if delErr := deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false, false); delErr != nil {
|
||||
glog.Warningf("failed to clean up temporary merge volume %d on %s: %v", volumeId, targetServer, delErr)
|
||||
}
|
||||
}()
|
||||
@@ -180,7 +180,7 @@ func (c *commandVolumeMerge) Do(args []string, commandEnv *CommandEnv, writer io
|
||||
}
|
||||
}
|
||||
|
||||
if err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false); err != nil {
|
||||
if err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false, false); err != nil {
|
||||
return err
|
||||
}
|
||||
cleanupTarget = false
|
||||
|
||||
@@ -105,7 +105,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n
|
||||
}
|
||||
|
||||
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
|
||||
if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer, false); err != nil {
|
||||
if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer, false, true); err != nil {
|
||||
return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
|
||||
}
|
||||
|
||||
@@ -202,11 +202,15 @@ func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
|
||||
|
||||
}
|
||||
|
||||
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, onlyEmpty bool) (err error) {
|
||||
// deleteVolume removes the volume from sourceVolumeServer. When keepRemoteData
|
||||
// is true, the cloud-tier object backing the volume is left intact — used on
|
||||
// the source side of a move where another server is taking over the same .vif.
|
||||
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, onlyEmpty bool, keepRemoteData bool) (err error) {
|
||||
return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
_, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
OnlyEmpty: onlyEmpty,
|
||||
VolumeId: uint32(volumeId),
|
||||
OnlyEmpty: onlyEmpty,
|
||||
KeepRemoteData: keepRemoteData,
|
||||
})
|
||||
return deleteErr
|
||||
})
|
||||
|
||||
@@ -312,7 +312,7 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
|
||||
if preserveServers[loc.Url] {
|
||||
continue
|
||||
}
|
||||
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), false); err != nil {
|
||||
if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress(), false, false); err != nil {
|
||||
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,7 +164,10 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
|
||||
continue
|
||||
}
|
||||
fmt.Printf("delete volume %d from Url:%s\n", vid, location.Url)
|
||||
err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false)
|
||||
// Other replicas were never tier-uploaded; their .vif (if any) points
|
||||
// to the same cloud key just written for this volume. Keep the remote
|
||||
// object so the tier-uploaded replica still references valid data.
|
||||
err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)
|
||||
}
|
||||
|
||||
@@ -301,7 +301,7 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
for k, v := range delVolsMap {
|
||||
if err := v.Destroy(false); err != nil {
|
||||
if err := v.Destroy(false, false); err != nil {
|
||||
errChain <- err
|
||||
} else {
|
||||
l.volumesLock.Lock()
|
||||
@@ -336,12 +336,12 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er
|
||||
return
|
||||
}
|
||||
|
||||
func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId, onlyEmpty bool) (found bool, e error) {
|
||||
func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId, onlyEmpty bool, keepRemoteData bool) (found bool, e error) {
|
||||
v, ok := l.volumes[vid]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
e = v.Destroy(onlyEmpty)
|
||||
e = v.Destroy(onlyEmpty, keepRemoteData)
|
||||
if e != nil {
|
||||
return
|
||||
}
|
||||
@@ -359,7 +359,7 @@ func (l *DiskLocation) LoadVolume(diskId uint32, vid needle.VolumeId, needleMapK
|
||||
|
||||
var ErrVolumeNotFound = fmt.Errorf("volume not found")
|
||||
|
||||
func (l *DiskLocation) DeleteVolume(vid needle.VolumeId, onlyEmpty bool) error {
|
||||
func (l *DiskLocation) DeleteVolume(vid needle.VolumeId, onlyEmpty bool, keepRemoteData bool) error {
|
||||
l.volumesLock.Lock()
|
||||
defer l.volumesLock.Unlock()
|
||||
|
||||
@@ -367,7 +367,7 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId, onlyEmpty bool) error {
|
||||
if !ok {
|
||||
return ErrVolumeNotFound
|
||||
}
|
||||
_, err := l.deleteVolumeById(vid, onlyEmpty)
|
||||
_, err := l.deleteVolumeById(vid, onlyEmpty, keepRemoteData)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,346 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
)
|
||||
|
||||
// In-process integration tests for cloud-tiered ("remote") volumes.
|
||||
//
|
||||
// Cover the operations the user is likely to schedule against a tiered
|
||||
// volume — balance/move, vacuum, EC encode, EC decode — exercising the real
|
||||
// Volume / DiskLocation / Store code paths against a fake BackendStorage that
|
||||
// stores objects in a temp dir. The fake stands in for S3/rclone/etc. so the
|
||||
// tests stay hermetic and fast.
|
||||
|
||||
// localDirBackend is a BackendStorage that stores objects as files in a
|
||||
// temp directory. It deletes from / writes to the dir under a mutex so the
|
||||
// tests can observe ordering (e.g. that a remote object survives a move).
|
||||
type localDirBackend struct {
|
||||
root string
|
||||
|
||||
mu sync.Mutex
|
||||
deletes []string // history of DeleteFile keys, for assertions
|
||||
}
|
||||
|
||||
func newLocalDirBackend(t *testing.T) *localDirBackend {
|
||||
t.Helper()
|
||||
root := t.TempDir()
|
||||
return &localDirBackend{root: root}
|
||||
}
|
||||
|
||||
func (b *localDirBackend) ToProperties() map[string]string {
|
||||
return map[string]string{"root": b.root}
|
||||
}
|
||||
|
||||
func (b *localDirBackend) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile {
|
||||
return &localDirBackendFile{backend: b, key: key, tierInfo: tierInfo}
|
||||
}
|
||||
|
||||
func (b *localDirBackend) CopyFile(f *os.File, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) {
|
||||
key = fmt.Sprintf("obj-%d-%d", time.Now().UnixNano(), os.Getpid())
|
||||
dst := filepath.Join(b.root, key)
|
||||
out, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
defer out.Close()
|
||||
if _, err = f.Seek(0, io.SeekStart); err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
written, err := io.Copy(out, f)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
if fn != nil {
|
||||
_ = fn(written, 100)
|
||||
}
|
||||
return key, written, nil
|
||||
}
|
||||
|
||||
func (b *localDirBackend) DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) {
|
||||
src := filepath.Join(b.root, key)
|
||||
in, err := os.Open(src)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer in.Close()
|
||||
out, err := os.Create(fileName)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer out.Close()
|
||||
written, err := io.Copy(out, in)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if fn != nil {
|
||||
_ = fn(written, 100)
|
||||
}
|
||||
return written, nil
|
||||
}
|
||||
|
||||
func (b *localDirBackend) DeleteFile(key string) error {
|
||||
b.mu.Lock()
|
||||
b.deletes = append(b.deletes, key)
|
||||
b.mu.Unlock()
|
||||
return os.Remove(filepath.Join(b.root, key))
|
||||
}
|
||||
|
||||
func (b *localDirBackend) deleteHistory() []string {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
out := make([]string, len(b.deletes))
|
||||
copy(out, b.deletes)
|
||||
return out
|
||||
}
|
||||
|
||||
func (b *localDirBackend) objectExists(key string) bool {
|
||||
_, err := os.Stat(filepath.Join(b.root, key))
|
||||
return err == nil
|
||||
}
|
||||
|
||||
// localDirBackendFile satisfies BackendStorageFile by reading/writing through
|
||||
// the file in the temp dir keyed by the .vif's stored object key. Size and
|
||||
// modtime come from the cached tierInfo, mirroring the S3 backend's
|
||||
// behavior of returning .vif metadata from GetStat.
|
||||
type localDirBackendFile struct {
|
||||
backend *localDirBackend
|
||||
key string
|
||||
tierInfo *volume_server_pb.VolumeInfo
|
||||
}
|
||||
|
||||
func (f *localDirBackendFile) ReadAt(p []byte, off int64) (int, error) {
|
||||
in, err := os.Open(filepath.Join(f.backend.root, f.key))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer in.Close()
|
||||
return in.ReadAt(p, off)
|
||||
}
|
||||
|
||||
func (f *localDirBackendFile) WriteAt(p []byte, off int64) (int, error) {
|
||||
out, err := os.OpenFile(filepath.Join(f.backend.root, f.key), os.O_RDWR|os.O_CREATE, 0o644)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer out.Close()
|
||||
return out.WriteAt(p, off)
|
||||
}
|
||||
|
||||
func (f *localDirBackendFile) Truncate(off int64) error {
|
||||
return os.Truncate(filepath.Join(f.backend.root, f.key), off)
|
||||
}
|
||||
|
||||
func (f *localDirBackendFile) Close() error { return nil }
|
||||
func (f *localDirBackendFile) Name() string { return f.key }
|
||||
func (f *localDirBackendFile) Sync() error { return nil }
|
||||
func (f *localDirBackendFile) GetStat() (int64, time.Time, error) {
|
||||
files := f.tierInfo.GetFiles()
|
||||
if len(files) == 0 {
|
||||
return 0, time.Time{}, fmt.Errorf("remote file info not found")
|
||||
}
|
||||
return int64(files[0].FileSize), time.Unix(int64(files[0].ModifiedTime), 0), nil
|
||||
}
|
||||
|
||||
const (
|
||||
testBackendName = "test_local_dir.default"
|
||||
)
|
||||
|
||||
// registerTestBackend installs the fake backend in the global registry under
|
||||
// testBackendName for the duration of one test. Volume.Destroy and tier
|
||||
// upload look this map up by name, so the registration must outlive the
|
||||
// volume operations exercised below.
|
||||
func registerTestBackend(t *testing.T, b *localDirBackend) {
|
||||
t.Helper()
|
||||
backend.BackendStorages[testBackendName] = b
|
||||
t.Cleanup(func() {
|
||||
delete(backend.BackendStorages, testBackendName)
|
||||
})
|
||||
}
|
||||
|
||||
// tierUpVolume creates a real on-disk volume, writes a few needles, then
|
||||
// uploads the .dat to the fake backend and rewrites the volume in remote
|
||||
// mode (mirrors the production flow in volume_grpc_tier_upload.go but
|
||||
// in-process).
|
||||
func tierUpVolume(t *testing.T, dir string, vid needle.VolumeId, b *localDirBackend) (collection string, key string) {
|
||||
t.Helper()
|
||||
v, err := NewVolume(dir, dir, "", vid, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i := 1; i <= 5; i++ {
|
||||
_, _, _, err := v.writeNeedle2(newRandomNeedle(uint64(i)), true, false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
diskFile, ok := v.DataBackend.(*backend.DiskFile)
|
||||
require.True(t, ok, "expected on-disk backend before tier-up")
|
||||
|
||||
uploadKey, size, err := b.CopyFile(diskFile.File, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
bType, bId := backend.BackendNameToTypeId(testBackendName)
|
||||
v.GetVolumeInfo().Files = append(v.GetVolumeInfo().GetFiles(), &volume_server_pb.RemoteFile{
|
||||
BackendType: bType,
|
||||
BackendId: bId,
|
||||
Key: uploadKey,
|
||||
Offset: 0,
|
||||
FileSize: uint64(size),
|
||||
ModifiedTime: uint64(time.Now().Unix()),
|
||||
Extension: ".dat",
|
||||
})
|
||||
require.NoError(t, v.SaveVolumeInfo())
|
||||
require.NoError(t, v.LoadRemoteFile())
|
||||
require.NoError(t, os.Remove(v.FileName(".dat")))
|
||||
|
||||
// Close the volume cleanly. Tests below reload it from disk to mirror
|
||||
// what a volume server does on restart with a tiered volume.
|
||||
v.Close()
|
||||
|
||||
return v.Collection, uploadKey
|
||||
}
|
||||
|
||||
// reloadVolume loads an existing volume from disk, the way a volume server
|
||||
// does at startup. Returns the live volume with its async write worker
|
||||
// running, so Destroy's channel close is valid.
|
||||
func reloadVolume(t *testing.T, dir string, vid needle.VolumeId) *Volume {
|
||||
t.Helper()
|
||||
v, err := NewVolume(dir, dir, "", vid, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
|
||||
require.NoError(t, err)
|
||||
return v
|
||||
}
|
||||
|
||||
// TestRemoteTier_Move_KeepsRemoteObject simulates the move-on-source-after-copy
|
||||
// step of a balance: Destroy(onlyEmpty=false, keepRemoteData=true). The remote
|
||||
// object must survive — the destination's freshly-copied .vif points at it.
|
||||
func TestRemoteTier_Move_KeepsRemoteObject(t *testing.T) {
|
||||
b := newLocalDirBackend(t)
|
||||
registerTestBackend(t, b)
|
||||
|
||||
dir := t.TempDir()
|
||||
const vid = needle.VolumeId(31)
|
||||
_, key := tierUpVolume(t, dir, vid, b)
|
||||
require.True(t, b.objectExists(key), "remote object missing after tier-up")
|
||||
|
||||
v := reloadVolume(t, dir, vid)
|
||||
require.True(t, v.HasRemoteFile())
|
||||
|
||||
require.NoError(t, v.Destroy(false, true))
|
||||
|
||||
require.True(t, b.objectExists(key), "Destroy(keepRemoteData=true) must not delete remote object")
|
||||
require.Empty(t, b.deleteHistory(), "no DeleteFile call expected on a move-style destroy")
|
||||
}
|
||||
|
||||
// TestRemoteTier_RealDelete_RemovesRemoteObject is the inverse: a true delete
|
||||
// (keepRemoteData=false) must clean up the remote object. Locks in that we
|
||||
// did not accidentally turn the new flag into a global skip.
|
||||
func TestRemoteTier_RealDelete_RemovesRemoteObject(t *testing.T) {
|
||||
b := newLocalDirBackend(t)
|
||||
registerTestBackend(t, b)
|
||||
|
||||
dir := t.TempDir()
|
||||
const vid = needle.VolumeId(32)
|
||||
_, key := tierUpVolume(t, dir, vid, b)
|
||||
|
||||
v := reloadVolume(t, dir, vid)
|
||||
require.True(t, v.HasRemoteFile())
|
||||
|
||||
require.NoError(t, v.Destroy(false, false))
|
||||
|
||||
require.False(t, b.objectExists(key), "Destroy(keepRemoteData=false) must delete remote object")
|
||||
require.Equal(t, []string{key}, b.deleteHistory())
|
||||
}
|
||||
|
||||
// TestRemoteTier_Vacuum_DoesNotDeleteRemote runs the compact paths against a
|
||||
// remote-tier volume and asserts the safety property: regardless of whether
|
||||
// compact succeeds, errors, or no-ops, it must not delete the cloud object.
|
||||
func TestRemoteTier_Vacuum_DoesNotDeleteRemote(t *testing.T) {
|
||||
b := newLocalDirBackend(t)
|
||||
registerTestBackend(t, b)
|
||||
|
||||
dir := t.TempDir()
|
||||
const vid = needle.VolumeId(33)
|
||||
_, key := tierUpVolume(t, dir, vid, b)
|
||||
require.True(t, b.objectExists(key))
|
||||
|
||||
v := reloadVolume(t, dir, vid)
|
||||
defer v.Close()
|
||||
require.True(t, v.HasRemoteFile())
|
||||
|
||||
_ = v.CompactByVolumeData(nil)
|
||||
_ = v.CompactByIndex(nil)
|
||||
require.True(t, b.objectExists(key), "Compact must not delete remote object")
|
||||
require.Empty(t, b.deleteHistory())
|
||||
}
|
||||
|
||||
// TestRemoteTier_ECEncode_RequiresLocalDat confirms the EC encoder runs
|
||||
// against the local .dat path. For a remote-tier volume the .dat is gone,
|
||||
// so encoding is expected to fail with a missing-file error — locks in that
|
||||
// callers must download (tier_move_dat_from_remote) before encoding.
|
||||
func TestRemoteTier_ECEncode_RequiresLocalDat(t *testing.T) {
|
||||
b := newLocalDirBackend(t)
|
||||
registerTestBackend(t, b)
|
||||
|
||||
dir := t.TempDir()
|
||||
const vid = needle.VolumeId(34)
|
||||
tierUpVolume(t, dir, vid, b)
|
||||
|
||||
baseFileName := filepath.Join(dir, fmt.Sprintf("%d", uint32(vid)))
|
||||
err := erasure_coding.WriteEcFiles(baseFileName)
|
||||
require.Error(t, err, "EC encoder must not run with .dat missing — caller is expected to download first")
|
||||
require.Contains(t, err.Error(), ".dat")
|
||||
}
|
||||
|
||||
// TestRemoteTier_ECEncodeDecode_AfterDownload exercises the encode/decode
|
||||
// round-trip on a tiered volume after pulling the .dat back to local disk
|
||||
// (the production sequence used by `volume.tier.move -dest=local`).
|
||||
func TestRemoteTier_ECEncodeDecode_AfterDownload(t *testing.T) {
|
||||
b := newLocalDirBackend(t)
|
||||
registerTestBackend(t, b)
|
||||
|
||||
dir := t.TempDir()
|
||||
const vid = needle.VolumeId(35)
|
||||
_, key := tierUpVolume(t, dir, vid, b)
|
||||
|
||||
baseFileName := filepath.Join(dir, fmt.Sprintf("%d", uint32(vid)))
|
||||
datPath := baseFileName + ".dat"
|
||||
_, err := b.DownloadFile(datPath, key, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"))
|
||||
require.NoError(t, erasure_coding.WriteEcFiles(baseFileName))
|
||||
|
||||
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
|
||||
shardPath := fmt.Sprintf("%s.ec%02d", baseFileName, i)
|
||||
_, statErr := os.Stat(shardPath)
|
||||
require.NoError(t, statErr, "shard %d missing after encode", i)
|
||||
}
|
||||
|
||||
// Drop the parity-range shards (indices DataShardsCount..TotalShardsCount-1)
|
||||
// and rebuild — exercises the recover-from-missing-parity path.
|
||||
for i := erasure_coding.DataShardsCount; i < erasure_coding.TotalShardsCount; i++ {
|
||||
shardPath := fmt.Sprintf("%s.ec%02d", baseFileName, i)
|
||||
require.NoError(t, os.Remove(shardPath))
|
||||
}
|
||||
rebuilt, err := erasure_coding.RebuildEcFiles(baseFileName)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, rebuilt, "rebuild should report which parity shards were regenerated")
|
||||
for i := erasure_coding.DataShardsCount; i < erasure_coding.TotalShardsCount; i++ {
|
||||
shardPath := fmt.Sprintf("%s.ec%02d", baseFileName, i)
|
||||
_, statErr := os.Stat(shardPath)
|
||||
require.NoError(t, statErr, "parity shard %d missing after rebuild", i)
|
||||
}
|
||||
}
|
||||
+11
-4
@@ -394,7 +394,12 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
||||
collectionVolumeDeletedBytes := make(map[string]int64)
|
||||
collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
|
||||
for _, location := range s.Locations {
|
||||
// keepRemoteData is parallel to deleteVids: true entries preserve the
|
||||
// cloud-tier object on Volume.Destroy. IO-error deletions on a
|
||||
// remote-tiered volume must not nuke the remote object — the error
|
||||
// is local/transient and the cloud copy is the source of truth.
|
||||
var deleteVids []needle.VolumeId
|
||||
var keepRemoteData []bool
|
||||
effectiveMaxCount := location.MaxVolumeCount
|
||||
if location.isDiskSpaceLow {
|
||||
usedSlots := int32(location.LocalVolumesLen())
|
||||
@@ -419,6 +424,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
||||
|
||||
if v.lastIoError != nil {
|
||||
deleteVids = append(deleteVids, v.Id)
|
||||
keepRemoteData = append(keepRemoteData, v.HasRemoteFile())
|
||||
shouldDeleteVolume = true
|
||||
glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError)
|
||||
} else {
|
||||
@@ -428,6 +434,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
||||
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
|
||||
if !shouldDeleteVolume {
|
||||
deleteVids = append(deleteVids, v.Id)
|
||||
keepRemoteData = append(keepRemoteData, false)
|
||||
shouldDeleteVolume = true
|
||||
}
|
||||
} else {
|
||||
@@ -476,8 +483,8 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
||||
if len(deleteVids) > 0 {
|
||||
// delete expired volumes.
|
||||
location.volumesLock.Lock()
|
||||
for _, vid := range deleteVids {
|
||||
found, err := location.deleteVolumeById(vid, false)
|
||||
for i, vid := range deleteVids {
|
||||
found, err := location.deleteVolumeById(vid, false, keepRemoteData[i])
|
||||
if err == nil {
|
||||
if found {
|
||||
glog.V(0).Infof("volume %d is deleted", vid)
|
||||
@@ -725,7 +732,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
|
||||
return fmt.Errorf("volume %d not found on disk", i)
|
||||
}
|
||||
|
||||
func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool) error {
|
||||
func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool, keepRemoteData bool) error {
|
||||
v := s.findVolume(i)
|
||||
if v == nil {
|
||||
return fmt.Errorf("delete volume %d not found on disk", i)
|
||||
@@ -740,7 +747,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool) error {
|
||||
DiskId: v.diskId,
|
||||
}
|
||||
for _, location := range s.Locations {
|
||||
err := location.DeleteVolume(i, onlyEmpty)
|
||||
err := location.DeleteVolume(i, onlyEmpty, keepRemoteData)
|
||||
if err == nil {
|
||||
glog.V(0).Infof("DeleteVolume %d", i)
|
||||
s.DeletedVolumesChan <- message
|
||||
|
||||
@@ -163,6 +163,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
|
||||
}
|
||||
// check volume idx files
|
||||
if err := v.checkIdxFile(); err != nil {
|
||||
// A remote-tiered volume with a stray .vif but no .idx must not
|
||||
// take the whole server down; skip just this volume.
|
||||
if v.HasRemoteFile() {
|
||||
glog.Errorf("skip remote volume %d (idx: %s): %v", v.Id, v.FileName(".idx"), err)
|
||||
return fmt.Errorf("check volume idx file %s: %w", v.FileName(".idx"), err)
|
||||
}
|
||||
glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err)
|
||||
}
|
||||
var indexFile *os.File
|
||||
|
||||
@@ -54,8 +54,10 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
|
||||
|
||||
var ErrVolumeNotEmpty = fmt.Errorf("volume not empty")
|
||||
|
||||
// Destroy removes everything related to this volume
|
||||
func (v *Volume) Destroy(onlyEmpty bool) (err error) {
|
||||
// Destroy removes everything related to this volume. When keepRemoteData is
|
||||
// true the cloud-tier object backing the volume is left intact — used by
|
||||
// moves where another server is taking over the same .vif.
|
||||
func (v *Volume) Destroy(onlyEmpty bool, keepRemoteData bool) (err error) {
|
||||
v.dataFileAccessLock.Lock()
|
||||
defer v.dataFileAccessLock.Unlock()
|
||||
|
||||
@@ -75,10 +77,12 @@ func (v *Volume) Destroy(onlyEmpty bool) (err error) {
|
||||
return
|
||||
}
|
||||
close(v.asyncRequestsChan)
|
||||
storageName, storageKey := v.RemoteStorageNameKey()
|
||||
if v.HasRemoteFile() && storageName != "" && storageKey != "" {
|
||||
if backendStorage, found := backend.BackendStorages[storageName]; found {
|
||||
backendStorage.DeleteFile(storageKey)
|
||||
if !keepRemoteData {
|
||||
storageName, storageKey := v.RemoteStorageNameKey()
|
||||
if v.HasRemoteFile() && storageName != "" && storageKey != "" {
|
||||
if backendStorage, found := backend.BackendStorages[storageName]; found {
|
||||
backendStorage.DeleteFile(storageKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
v.doClose()
|
||||
|
||||
@@ -88,7 +88,7 @@ func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) {
|
||||
|
||||
// should can Destroy empty volume with onlyEmpty
|
||||
assertFileExist(t, true, path)
|
||||
err = v.Destroy(true)
|
||||
err = v.Destroy(true, false)
|
||||
if err != nil {
|
||||
t.Fatalf("destroy volume: %v", err)
|
||||
}
|
||||
@@ -106,7 +106,7 @@ func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) {
|
||||
|
||||
// should can Destroy empty volume without onlyEmpty
|
||||
assertFileExist(t, true, path)
|
||||
err = v.Destroy(false)
|
||||
err = v.Destroy(false, false)
|
||||
if err != nil {
|
||||
t.Fatalf("destroy volume: %v", err)
|
||||
}
|
||||
@@ -131,7 +131,7 @@ func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) {
|
||||
assert.Equal(t, uint64(1), v.FileCount())
|
||||
|
||||
assertFileExist(t, true, path)
|
||||
err = v.Destroy(true)
|
||||
err = v.Destroy(true, false)
|
||||
assert.EqualError(t, err, "volume not empty")
|
||||
assertFileExist(t, true, path)
|
||||
|
||||
@@ -161,7 +161,7 @@ func TestDestroyNonemptyVolumeWithoutOnlyEmpty(t *testing.T) {
|
||||
assert.Equal(t, uint64(1), v.FileCount())
|
||||
|
||||
assertFileExist(t, true, path)
|
||||
err = v.Destroy(false)
|
||||
err = v.Destroy(false, false)
|
||||
if err != nil {
|
||||
t.Fatalf("destroy volume: %v", err)
|
||||
}
|
||||
|
||||
@@ -289,13 +289,16 @@ func (t *BalanceTask) readVolumeFileStatus(ctx context.Context, server pb.Server
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// deleteVolume deletes the volume from the server.
|
||||
// deleteVolume deletes the volume from the source server after a successful
|
||||
// move. KeepRemoteData=true prevents the source from removing the cloud-tier
|
||||
// object that the destination's freshly-copied .vif now points at.
|
||||
func (t *BalanceTask) deleteVolume(ctx context.Context, server pb.ServerAddress, volumeId needle.VolumeId) error {
|
||||
return operation.WithVolumeServerClient(false, server, t.grpcDialOption,
|
||||
func(client volume_server_pb.VolumeServerClient) error {
|
||||
_, err := client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
OnlyEmpty: false,
|
||||
VolumeId: uint32(volumeId),
|
||||
OnlyEmpty: false,
|
||||
KeepRemoteData: true,
|
||||
})
|
||||
return err
|
||||
})
|
||||
|
||||
@@ -35,9 +35,15 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
maxResults = math.MaxInt32
|
||||
}
|
||||
|
||||
// Group volumes by disk type to ensure we compare apples to apples
|
||||
// Group volumes by disk type to ensure we compare apples to apples.
|
||||
// Remote-tiered volumes are skipped: a "move" copies .idx/.vif then
|
||||
// Volume.Destroy on the source deletes the remote object the destination
|
||||
// now points at. Mirrors shell/command_volume_balance.go.
|
||||
volumesByDiskType := make(map[string][]*types.VolumeHealthMetrics)
|
||||
for _, metric := range metrics {
|
||||
if metric.HasRemoteCopy {
|
||||
continue
|
||||
}
|
||||
volumesByDiskType[metric.DiskType] = append(volumesByDiskType[metric.DiskType], metric)
|
||||
}
|
||||
|
||||
|
||||
@@ -401,6 +401,48 @@ func TestDetection_ImbalancedDiskType(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetection_SkipsRemoteTieredVolumes(t *testing.T) {
|
||||
metrics := []*types.VolumeHealthMetrics{}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
metrics = append(metrics, &types.VolumeHealthMetrics{
|
||||
VolumeID: uint32(i + 1),
|
||||
Server: "ssd-server-1",
|
||||
ServerAddress: "ssd-server-1:8080",
|
||||
DiskType: "ssd",
|
||||
Collection: "c1",
|
||||
Size: 1024,
|
||||
DataCenter: "dc1",
|
||||
Rack: "rack1",
|
||||
HasRemoteCopy: true,
|
||||
})
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics = append(metrics, &types.VolumeHealthMetrics{
|
||||
VolumeID: uint32(100 + i + 1),
|
||||
Server: "ssd-server-2",
|
||||
ServerAddress: "ssd-server-2:8080",
|
||||
DiskType: "ssd",
|
||||
Collection: "c1",
|
||||
Size: 1024,
|
||||
DataCenter: "dc1",
|
||||
Rack: "rack1",
|
||||
HasRemoteCopy: true,
|
||||
})
|
||||
}
|
||||
|
||||
at := createMockTopology(metrics...)
|
||||
clusterInfo := &types.ClusterInfo{ActiveTopology: at}
|
||||
|
||||
tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100)
|
||||
if err != nil {
|
||||
t.Fatalf("Detection failed: %v", err)
|
||||
}
|
||||
if len(tasks) != 0 {
|
||||
t.Errorf("expected 0 tasks for remote-tiered volumes, got %d", len(tasks))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetection_RespectsMaxResults(t *testing.T) {
|
||||
// Setup: 2 SSD servers with big imbalance (100 vs 10)
|
||||
metrics := []*types.VolumeHealthMetrics{}
|
||||
|
||||
Reference in New Issue
Block a user