mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
* fix(volume_server): load orphan EC shards across disks on startup (#9212) When ec.balance / ec.rebuild copies an EC shard onto a destination node without also pinning subsequent shards to the disk that holds .ecx, the shard ends up on a different physical disk than its index files. The per-disk loadAllEcShards has no visibility into other DiskLocations on the same store, so those orphan shards were silently left out of ecVolumes and never reported to master — volume.list showed partial counts, and ec.rebuild reported the volume as unrepairable even though all shards were physically present. After every DiskLocation finishes its initial pass, sweep the store for shard files that are on disk but not yet in any EcVolume, look up the .ecx-owning sibling disk, and load each shard against its physical disk with dirIdx pointing at the sibling. Each shard is still registered on its own disk's ecVolumes map so heartbeat reporting carries the right DiskId per shard (master fix #9219 already aggregates per-disk messages correctly). Also fall back to dirIdx for .vif lookup when dir != dirIdx, so the reconciliation path doesn't write a stub .vif on the shard disk and lose the real EC config and datFileSize. * fix(volume_server): track actual .ecx dir in cross-disk reconcile indexEcxOwners scans both IdxDirectory and Directory to find each volume's .ecx — the second scan covers the legacy case where index files were written into the data dir before -dir.idx was configured (removeEcVolumeFiles already accounts for this in disk_location_ec.go). But the returned map dropped which directory matched, and reconcile unconditionally passed owner.IdxDirectory to loadEcShardsWithIdxDir. When the owner's .ecx is in Directory and IdxDirectory != Directory (server later re-configured with -dir.idx pointing at a fresh path), NewEcVolume opens IdxDirectory/.ecx → ENOENT, retries the same-disk fallback at dataBaseFileName+.ecx — but dataBaseFileName uses the *orphan* disk's data dir, not the owner's, so it ENOENTs again and the orphan shards stay unloaded. Track which scan dir matched in indexEcxOwners and pass it through. Adds TestLoadEcShardsWhenOwnerEcxIsInDataDir as the regression. Reported in PR #9244 review by @gemini-code-assist and @coderabbitai. * refactor(storage): thread dataShardCount as a parameter into calculateExpectedShardSize The helper used erasure_coding.DataShardsCount directly, but tests in store_ec_orphan_shard_test.go save .vif with a local dataShards=10 constant. If the package default ever diverged from 10 (e.g. an enterprise build), the test would write a .vif for one layout while sizing shard files for another and silently break. Take dataShardCount as a parameter. Existing callers (validateEcVolume + size-validation tests + real-world tests) pass erasure_coding.DataShardsCount unchanged. The orphan-shard tests pass the same dataShards local they save into .vif, so the persisted shape and the on-disk shape stay consistent. Reported in PR #9244 review by @coderabbitai.
This commit is contained in:
@@ -115,6 +115,15 @@ func (l *DiskLocation) HasEcxFileOnDisk(collection string, vid needle.VolumeId)
|
||||
}
|
||||
|
||||
func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolume, error) {
|
||||
return l.loadEcShardWithIdxDir(collection, vid, shardId, l.IdxDirectory)
|
||||
}
|
||||
|
||||
// loadEcShardWithIdxDir is like LoadEcShard but uses the supplied idxDir as
|
||||
// the source of .ecx / .ecj rather than this disk's own IdxDirectory. The
|
||||
// orphan-shard reconciliation calls this with a sibling disk's idx folder
|
||||
// when shards live on a disk that does not own the index files itself
|
||||
// (issue #9212).
|
||||
func (l *DiskLocation) loadEcShardWithIdxDir(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, idxDir string) (*erasure_coding.EcVolume, error) {
|
||||
|
||||
ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId)
|
||||
if err != nil {
|
||||
@@ -127,7 +136,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
|
||||
defer l.ecVolumesLock.Unlock()
|
||||
ecVolume, found := l.ecVolumes[vid]
|
||||
if !found {
|
||||
ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid)
|
||||
ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, idxDir, collection, vid)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err)
|
||||
}
|
||||
@@ -261,6 +270,38 @@ func (l *DiskLocation) loadAllEcShards(onShardLoad func(collection string, vid n
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadEcShardsWithIdxDir loads each shard file in shards into l.ecVolumes,
|
||||
// using idxDir as the source of .ecx / .ecj / .vif (NewEcVolume falls back
|
||||
// to dirIdx for .vif when the data dir does not have one). Used by the
|
||||
// store-level orphan-shard reconciliation in #9212; stops on the first
|
||||
// failure so the caller can log and continue with other volumes.
|
||||
func (l *DiskLocation) loadEcShardsWithIdxDir(shards []string, collection string, vid needle.VolumeId, idxDir string, onShardLoad func(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume)) error {
|
||||
|
||||
for _, shard := range shards {
|
||||
ext := path.Ext(shard)
|
||||
if len(ext) < 4 {
|
||||
return fmt.Errorf("unexpected ec shard name %v", shard)
|
||||
}
|
||||
shardId, err := strconv.ParseInt(ext[3:], 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse ec shard name %v: %w", shard, err)
|
||||
}
|
||||
if shardId < 0 || shardId > 255 {
|
||||
return fmt.Errorf("shard ID out of range: %d", shardId)
|
||||
}
|
||||
|
||||
ecVolume, err := l.loadEcShardWithIdxDir(collection, vid, erasure_coding.ShardId(shardId), idxDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load ec shard %v: %w", shard, err)
|
||||
}
|
||||
if onShardLoad != nil {
|
||||
onShardLoad(collection, vid, erasure_coding.ShardId(shardId), ecVolume)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) {
|
||||
// Add write lock since we're modifying the ecVolumes map
|
||||
l.ecVolumesLock.Lock()
|
||||
@@ -376,21 +417,29 @@ func (l *DiskLocation) checkOrphanedShards(shards []string, collection string, v
|
||||
|
||||
// calculateExpectedShardSize computes the exact expected shard size based on .dat file size
|
||||
// The EC encoding process is deterministic:
|
||||
// 1. Data is processed in batches of (LargeBlockSize * DataShardsCount) for large blocks
|
||||
// 2. Remaining data is processed in batches of (SmallBlockSize * DataShardsCount) for small blocks
|
||||
// 1. Data is processed in batches of (LargeBlockSize * dataShardCount) for large blocks
|
||||
// 2. Remaining data is processed in batches of (SmallBlockSize * dataShardCount) for small blocks
|
||||
// 3. Each shard gets exactly its portion, with zero-padding applied to incomplete blocks
|
||||
func calculateExpectedShardSize(datFileSize int64) int64 {
|
||||
//
|
||||
// dataShardCount is taken as a parameter rather than read from
|
||||
// erasure_coding.DataShardsCount so that tests writing a custom layout
|
||||
// to .vif compute the matching shard size, and so custom-ratio builds
|
||||
// (e.g. enterprise) can swap the default without touching this helper.
|
||||
func calculateExpectedShardSize(datFileSize int64, dataShardCount int) int64 {
|
||||
if dataShardCount <= 0 {
|
||||
return 0
|
||||
}
|
||||
var shardSize int64
|
||||
|
||||
// Process large blocks (1GB * 10 = 10GB batches)
|
||||
largeBatchSize := int64(erasure_coding.ErasureCodingLargeBlockSize) * int64(erasure_coding.DataShardsCount)
|
||||
// Process large blocks (1GB * dataShardCount per batch)
|
||||
largeBatchSize := int64(erasure_coding.ErasureCodingLargeBlockSize) * int64(dataShardCount)
|
||||
numLargeBatches := datFileSize / largeBatchSize
|
||||
shardSize = numLargeBatches * int64(erasure_coding.ErasureCodingLargeBlockSize)
|
||||
remainingSize := datFileSize - (numLargeBatches * largeBatchSize)
|
||||
|
||||
// Process remaining data in small blocks (1MB * 10 = 10MB batches)
|
||||
// Process remaining data in small blocks (1MB * dataShardCount per batch)
|
||||
if remainingSize > 0 {
|
||||
smallBatchSize := int64(erasure_coding.ErasureCodingSmallBlockSize) * int64(erasure_coding.DataShardsCount)
|
||||
smallBatchSize := int64(erasure_coding.ErasureCodingSmallBlockSize) * int64(dataShardCount)
|
||||
numSmallBatches := (remainingSize + smallBatchSize - 1) / smallBatchSize // Ceiling division
|
||||
shardSize += numSmallBatches * int64(erasure_coding.ErasureCodingSmallBlockSize)
|
||||
}
|
||||
@@ -410,10 +459,13 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId)
|
||||
var expectedShardSize int64 = -1
|
||||
datExists := false
|
||||
|
||||
// If .dat file exists, compute exact expected shard size from it
|
||||
// If .dat file exists, compute exact expected shard size from it.
|
||||
// Pass the build's default data-shard count; calculateExpectedShardSize
|
||||
// takes it as a parameter so tests / enterprise builds can supply
|
||||
// their own.
|
||||
if datFileInfo, err := os.Stat(datFileName); err == nil {
|
||||
datExists = true
|
||||
expectedShardSize = calculateExpectedShardSize(datFileInfo.Size())
|
||||
expectedShardSize = calculateExpectedShardSize(datFileInfo.Size(), erasure_coding.DataShardsCount)
|
||||
} else if !os.IsNotExist(err) {
|
||||
// If stat fails with unexpected error (permission, I/O), fail validation
|
||||
// Don't treat this as "distributed EC" - it could be a temporary error
|
||||
|
||||
@@ -83,7 +83,7 @@ func TestCalculateExpectedShardSizeWithRealEncoding(t *testing.T) {
|
||||
datFile.Close()
|
||||
|
||||
// Calculate expected shard size using our function
|
||||
expectedShardSize := calculateExpectedShardSize(tt.datFileSize)
|
||||
expectedShardSize := calculateExpectedShardSize(tt.datFileSize, erasure_coding.DataShardsCount)
|
||||
|
||||
// Run actual EC encoding
|
||||
err = erasure_coding.WriteEcFiles(baseFileName)
|
||||
@@ -164,7 +164,7 @@ func TestCalculateExpectedShardSizeEdgeCases(t *testing.T) {
|
||||
datFile.Close()
|
||||
|
||||
// Calculate expected
|
||||
expectedShardSize := calculateExpectedShardSize(tt.datFileSize)
|
||||
expectedShardSize := calculateExpectedShardSize(tt.datFileSize, erasure_coding.DataShardsCount)
|
||||
|
||||
// Run actual EC encoding
|
||||
err = erasure_coding.WriteEcFiles(baseFileName)
|
||||
|
||||
@@ -128,7 +128,7 @@ func TestCalculateExpectedShardSize(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
actualShardSize := calculateExpectedShardSize(tt.datFileSize)
|
||||
actualShardSize := calculateExpectedShardSize(tt.datFileSize, dataShards)
|
||||
|
||||
if actualShardSize != tt.expectedShardSize {
|
||||
t.Errorf("Expected shard size %d, got %d. %s",
|
||||
@@ -143,6 +143,7 @@ func TestCalculateExpectedShardSize(t *testing.T) {
|
||||
|
||||
// TestShardSizeValidationScenarios tests realistic scenarios
|
||||
func TestShardSizeValidationScenarios(t *testing.T) {
|
||||
const dataShards = 10
|
||||
scenarios := []struct {
|
||||
name string
|
||||
datFileSize int64
|
||||
@@ -183,7 +184,7 @@ func TestShardSizeValidationScenarios(t *testing.T) {
|
||||
|
||||
for _, scenario := range scenarios {
|
||||
t.Run(scenario.name, func(t *testing.T) {
|
||||
expectedSize := calculateExpectedShardSize(scenario.datFileSize)
|
||||
expectedSize := calculateExpectedShardSize(scenario.datFileSize, dataShards)
|
||||
isValid := scenario.actualShardSize == expectedSize
|
||||
|
||||
if isValid != scenario.shouldBeValid {
|
||||
|
||||
@@ -123,7 +123,7 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
|
||||
|
||||
// Use deterministic but small size: 10MB .dat => 1MB per shard
|
||||
datFileSize := int64(10 * 1024 * 1024) // 10MB
|
||||
expectedShardSize := calculateExpectedShardSize(datFileSize)
|
||||
expectedShardSize := calculateExpectedShardSize(datFileSize, erasure_coding.DataShardsCount)
|
||||
|
||||
// Create .dat file if needed
|
||||
if tt.createDatFile {
|
||||
@@ -325,7 +325,7 @@ func TestValidateEcVolume(t *testing.T) {
|
||||
// For test purposes, use a small .dat file size that still exercises the logic
|
||||
// 10MB .dat file = 1MB per shard (one small batch, fast and deterministic)
|
||||
datFileSize := int64(10 * 1024 * 1024) // 10MB
|
||||
expectedShardSize := calculateExpectedShardSize(datFileSize)
|
||||
expectedShardSize := calculateExpectedShardSize(datFileSize, erasure_coding.DataShardsCount)
|
||||
|
||||
// Create .dat file if needed
|
||||
if tt.createDatFile {
|
||||
|
||||
@@ -105,9 +105,23 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
|
||||
glog.Warningf("ec volume %d: load deleted needles from .ecj: %v", vid, loadErr)
|
||||
}
|
||||
|
||||
// read volume info
|
||||
// read volume info. Prefer .vif at the data dir (where shards live), but
|
||||
// fall back to the index dir when the data dir does not have one — the
|
||||
// orphan-shard reconciliation in Store loads shards on a disk whose only
|
||||
// EC artefacts are .ec?? files, with .ecx / .ecj / .vif on a sibling disk
|
||||
// (issue #9212). Without this fallback we'd write a stub .vif on the
|
||||
// shard disk and lose the real EC config + datFileSize.
|
||||
vifFileName := dataBaseFileName + ".vif"
|
||||
if dirIdx != dir {
|
||||
if _, statErr := os.Stat(vifFileName); statErr != nil && os.IsNotExist(statErr) {
|
||||
altVif := EcShardFileName(collection, dirIdx, int(vid)) + ".vif"
|
||||
if _, altStatErr := os.Stat(altVif); altStatErr == nil {
|
||||
vifFileName = altVif
|
||||
}
|
||||
}
|
||||
}
|
||||
ev.Version = needle.Version3
|
||||
if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
|
||||
if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(vifFileName); found {
|
||||
ev.Version = needle.Version(volumeInfo.Version)
|
||||
ev.datFileSize = volumeInfo.DatFileSize
|
||||
ev.ExpireAtSec = volumeInfo.ExpireAtSec
|
||||
@@ -134,7 +148,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
|
||||
ev.ECContext = NewDefaultECContext(collection, vid)
|
||||
}
|
||||
} else {
|
||||
glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName)
|
||||
glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, vifFileName)
|
||||
volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
|
||||
ev.ECContext = NewDefaultECContext(collection, vid)
|
||||
}
|
||||
|
||||
@@ -154,6 +154,15 @@ func NewStore(
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// After every DiskLocation has finished its per-disk EC scan, sweep the
|
||||
// store for shards that live on a disk without local index files and
|
||||
// load them by reaching across to a sibling disk's .ecx / .ecj / .vif.
|
||||
// This is the volume-server side of issue #9212: ec.balance can move
|
||||
// shards onto a destination node's second disk while leaving the index
|
||||
// on the disk that already held the volume, and without this pass those
|
||||
// orphan shards stay invisible to the master.
|
||||
s.reconcileEcShardsAcrossDisks()
|
||||
|
||||
// Resolve state.pb's directory via the first disk location so it inherits
|
||||
// the same `~` expansion and empty-idxFolder fallback used for .idx files,
|
||||
// and is never written as a relative path against the process CWD (#9173).
|
||||
|
||||
@@ -0,0 +1,458 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
// TestLoadEcShardsWhenIndexFilesOnDifferentDisk reproduces issue #9212.
|
||||
// On the user's volume server, ec.balance moved EC shards onto a different
|
||||
// physical disk than the disk that holds the .ecx/.ecj/.vif index files.
|
||||
// The volume server must still load those orphan shards on startup and
|
||||
// register them with the master — otherwise ec.rebuild reports the volume
|
||||
// as unrepairable even though all shards are physically present.
|
||||
//
|
||||
// Layout under test (mirrors volume-0 + volume-2 rows for grafana-loki_1093
|
||||
// in the ls -l attached to the issue):
|
||||
//
|
||||
// dir0 (diskId 0): .ec00, .ec12 <-- shards but no .ecx / .ecj / .vif
|
||||
// dir1 (diskId 1): .ec01, .ecx, .ecj, .vif
|
||||
func TestLoadEcShardsWhenIndexFilesOnDifferentDisk(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
dir0 := filepath.Join(tempDir, "data1")
|
||||
dir1 := filepath.Join(tempDir, "data2")
|
||||
if err := os.MkdirAll(dir0, 0o755); err != nil {
|
||||
t.Fatalf("mkdir dir0: %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(dir1, 0o755); err != nil {
|
||||
t.Fatalf("mkdir dir1: %v", err)
|
||||
}
|
||||
|
||||
collection := "grafana-loki"
|
||||
vid := needle.VolumeId(1093)
|
||||
|
||||
// EC shape used to populate .vif. Kept as locals (not the package
|
||||
// constants) so the test stays valid when enterprise builds use a
|
||||
// different default ratio.
|
||||
const dataShards, parityShards = 10, 4
|
||||
|
||||
// Use a small but realistic shard size so calculateExpectedShardSize
|
||||
// validation lines up with the .dat-derived size if it ever runs.
|
||||
const datSize int64 = 10 * 1024 * 1024
|
||||
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
|
||||
|
||||
writeShard := func(dir string, shardId int) {
|
||||
t.Helper()
|
||||
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
|
||||
f, err := os.Create(base + erasure_coding.ToExt(shardId))
|
||||
if err != nil {
|
||||
t.Fatalf("create shard %d in %s: %v", shardId, dir, err)
|
||||
}
|
||||
if err := f.Truncate(expectedShardSize); err != nil {
|
||||
f.Close()
|
||||
t.Fatalf("truncate shard %d in %s: %v", shardId, dir, err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatalf("close shard %d in %s: %v", shardId, dir, err)
|
||||
}
|
||||
}
|
||||
|
||||
// dir0: orphan shards. No .ecx / .ecj / .vif on this disk.
|
||||
writeShard(dir0, 0)
|
||||
writeShard(dir0, 12)
|
||||
|
||||
// dir1: one shard plus the index files.
|
||||
writeShard(dir1, 1)
|
||||
|
||||
base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid))
|
||||
|
||||
// Build a valid sealed .ecx with one entry so NewEcVolume can open it.
|
||||
// Layout: NeedleId(8) + Offset(8) + Size(4) = 20 bytes per entry.
|
||||
ecxBytes := make([]byte, 20)
|
||||
if err := os.WriteFile(base1+".ecx", ecxBytes, 0o644); err != nil {
|
||||
t.Fatalf("write .ecx: %v", err)
|
||||
}
|
||||
|
||||
// Empty .ecj is fine (no deletes journaled yet).
|
||||
if err := os.WriteFile(base1+".ecj", nil, 0o644); err != nil {
|
||||
t.Fatalf("write .ecj: %v", err)
|
||||
}
|
||||
|
||||
// .vif with the EC config the test set above so EcVolume picks up the
|
||||
// right shape regardless of the build's default ratio.
|
||||
if err := volume_info.SaveVolumeInfo(base1+".vif", &volume_server_pb.VolumeInfo{
|
||||
Version: uint32(needle.Version3),
|
||||
DatFileSize: datSize,
|
||||
EcShardConfig: &volume_server_pb.EcShardConfig{
|
||||
DataShards: dataShards,
|
||||
ParityShards: parityShards,
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("save .vif: %v", err)
|
||||
}
|
||||
|
||||
// Build the Store with both disks. NewStore triggers per-disk loading
|
||||
// during construction, which is the codepath under test.
|
||||
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
|
||||
[]string{dir0, dir1},
|
||||
[]int32{100, 100},
|
||||
[]util.MinFreeSpace{{}, {}},
|
||||
"",
|
||||
NeedleMapInMemory,
|
||||
[]types.DiskType{types.HardDriveType, types.HardDriveType},
|
||||
nil,
|
||||
3,
|
||||
)
|
||||
|
||||
// Drain heartbeat-style channels so loading does not block.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-store.NewVolumesChan:
|
||||
case <-store.NewEcShardsChan:
|
||||
case <-store.DeletedVolumesChan:
|
||||
case <-store.DeletedEcShardsChan:
|
||||
case <-store.StateUpdateChan:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
store.Close()
|
||||
close(done)
|
||||
})
|
||||
|
||||
if got, want := len(store.Locations), 2; got != want {
|
||||
t.Fatalf("store has %d disk locations, want %d", got, want)
|
||||
}
|
||||
|
||||
// Sanity: dir1's shard ec01 lives on the disk that owns .ecx and must load.
|
||||
loc1 := store.Locations[1]
|
||||
if _, found := loc1.FindEcShard(vid, 1); !found {
|
||||
t.Fatalf("baseline broken: shard 1 on dir1 (which has .ecx) was not loaded")
|
||||
}
|
||||
|
||||
// The bug: shards on dir0 have no local .ecx/.vif — they should still be
|
||||
// loaded by reaching across to dir1's index files, but currently the
|
||||
// per-disk loader silently drops them when no .dat file is present.
|
||||
loc0 := store.Locations[0]
|
||||
for _, sid := range []erasure_coding.ShardId{0, 12} {
|
||||
if _, found := loc0.FindEcShard(vid, sid); !found {
|
||||
t.Errorf("issue #9212: shard %d on dir0 was not loaded; .ecx is on dir1", sid)
|
||||
}
|
||||
}
|
||||
|
||||
// Files on dir0 must not have been deleted by any cleanup path.
|
||||
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
|
||||
for _, sid := range []int{0, 12} {
|
||||
shardPath := base0 + erasure_coding.ToExt(sid)
|
||||
fi, err := os.Stat(shardPath)
|
||||
if err != nil {
|
||||
t.Errorf("orphan shard %d was destroyed: %v", sid, err)
|
||||
continue
|
||||
}
|
||||
if fi.Size() != expectedShardSize {
|
||||
t.Errorf("orphan shard %d truncated: size %d, want %d", sid, fi.Size(), expectedShardSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoadEcShardsOrphanWithoutSiblingEcx exercises the truly-orphaned
|
||||
// case from issue #9212: shard files exist on a disk but no .ecx exists
|
||||
// anywhere on the volume server. We must not crash, and we must leave the
|
||||
// shard files alone so an operator can restore the index later.
|
||||
func TestLoadEcShardsOrphanWithoutSiblingEcx(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
dir0 := filepath.Join(tempDir, "data1")
|
||||
dir1 := filepath.Join(tempDir, "data2")
|
||||
if err := os.MkdirAll(dir0, 0o755); err != nil {
|
||||
t.Fatalf("mkdir dir0: %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(dir1, 0o755); err != nil {
|
||||
t.Fatalf("mkdir dir1: %v", err)
|
||||
}
|
||||
|
||||
collection := "grafana-loki"
|
||||
vid := needle.VolumeId(2222)
|
||||
const dataShards = 10
|
||||
const datSize int64 = 10 * 1024 * 1024
|
||||
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
|
||||
|
||||
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
|
||||
for _, sid := range []int{0, 12} {
|
||||
f, err := os.Create(base0 + erasure_coding.ToExt(sid))
|
||||
if err != nil {
|
||||
t.Fatalf("create shard %d: %v", sid, err)
|
||||
}
|
||||
if err := f.Truncate(expectedShardSize); err != nil {
|
||||
f.Close()
|
||||
t.Fatalf("truncate shard %d: %v", sid, err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
|
||||
[]string{dir0, dir1},
|
||||
[]int32{100, 100},
|
||||
[]util.MinFreeSpace{{}, {}},
|
||||
"",
|
||||
NeedleMapInMemory,
|
||||
[]types.DiskType{types.HardDriveType, types.HardDriveType},
|
||||
nil,
|
||||
3,
|
||||
)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-store.NewVolumesChan:
|
||||
case <-store.NewEcShardsChan:
|
||||
case <-store.DeletedVolumesChan:
|
||||
case <-store.DeletedEcShardsChan:
|
||||
case <-store.StateUpdateChan:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
store.Close()
|
||||
close(done)
|
||||
})
|
||||
|
||||
loc0 := store.Locations[0]
|
||||
if _, found := loc0.FindEcShard(vid, 0); found {
|
||||
t.Errorf("shard 0 should remain unloaded when no .ecx exists anywhere; reconciliation must not fabricate one")
|
||||
}
|
||||
|
||||
for _, sid := range []int{0, 12} {
|
||||
shardPath := base0 + erasure_coding.ToExt(sid)
|
||||
fi, err := os.Stat(shardPath)
|
||||
if err != nil {
|
||||
t.Errorf("orphan shard %d destroyed by reconciliation: %v", sid, err)
|
||||
continue
|
||||
}
|
||||
if fi.Size() != expectedShardSize {
|
||||
t.Errorf("orphan shard %d truncated: size %d, want %d", sid, fi.Size(), expectedShardSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileNoOpWhenEachDiskIsSelfContained guards against the
|
||||
// reconciliation pass double-loading shards or stomping on EcVolumes that
|
||||
// were already populated by the per-disk pass.
|
||||
func TestReconcileNoOpWhenEachDiskIsSelfContained(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
dir0 := filepath.Join(tempDir, "data1")
|
||||
dir1 := filepath.Join(tempDir, "data2")
|
||||
if err := os.MkdirAll(dir0, 0o755); err != nil {
|
||||
t.Fatalf("mkdir dir0: %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(dir1, 0o755); err != nil {
|
||||
t.Fatalf("mkdir dir1: %v", err)
|
||||
}
|
||||
|
||||
collection := "grafana-loki"
|
||||
vid := needle.VolumeId(3333)
|
||||
const dataShards, parityShards = 10, 4
|
||||
const datSize int64 = 10 * 1024 * 1024
|
||||
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
|
||||
|
||||
writeFullEcLayout := func(dir string, shardIds []int) {
|
||||
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
|
||||
for _, sid := range shardIds {
|
||||
f, err := os.Create(base + erasure_coding.ToExt(sid))
|
||||
if err != nil {
|
||||
t.Fatalf("create shard %d in %s: %v", sid, dir, err)
|
||||
}
|
||||
if err := f.Truncate(expectedShardSize); err != nil {
|
||||
f.Close()
|
||||
t.Fatalf("truncate shard %d in %s: %v", sid, dir, err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
if err := os.WriteFile(base+".ecx", make([]byte, 20), 0o644); err != nil {
|
||||
t.Fatalf("write .ecx in %s: %v", dir, err)
|
||||
}
|
||||
if err := os.WriteFile(base+".ecj", nil, 0o644); err != nil {
|
||||
t.Fatalf("write .ecj in %s: %v", dir, err)
|
||||
}
|
||||
if err := volume_info.SaveVolumeInfo(base+".vif", &volume_server_pb.VolumeInfo{
|
||||
Version: uint32(needle.Version3),
|
||||
DatFileSize: datSize,
|
||||
EcShardConfig: &volume_server_pb.EcShardConfig{
|
||||
DataShards: dataShards,
|
||||
ParityShards: parityShards,
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("save .vif in %s: %v", dir, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Each disk is a fully self-contained EC layout — reconciliation
|
||||
// should leave them untouched.
|
||||
writeFullEcLayout(dir0, []int{0, 4})
|
||||
writeFullEcLayout(dir1, []int{8, 12})
|
||||
|
||||
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
|
||||
[]string{dir0, dir1},
|
||||
[]int32{100, 100},
|
||||
[]util.MinFreeSpace{{}, {}},
|
||||
"",
|
||||
NeedleMapInMemory,
|
||||
[]types.DiskType{types.HardDriveType, types.HardDriveType},
|
||||
nil,
|
||||
3,
|
||||
)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-store.NewVolumesChan:
|
||||
case <-store.NewEcShardsChan:
|
||||
case <-store.DeletedVolumesChan:
|
||||
case <-store.DeletedEcShardsChan:
|
||||
case <-store.StateUpdateChan:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
store.Close()
|
||||
close(done)
|
||||
})
|
||||
|
||||
for diskIdx, sids := range [][]erasure_coding.ShardId{{0, 4}, {8, 12}} {
|
||||
loc := store.Locations[diskIdx]
|
||||
ev, found := loc.FindEcVolume(vid)
|
||||
if !found {
|
||||
t.Fatalf("disk %d: EcVolume for vid %d not loaded", diskIdx, vid)
|
||||
}
|
||||
if got := len(ev.Shards); got != len(sids) {
|
||||
t.Errorf("disk %d: EcVolume has %d shards, want %d (reconciliation may have double-loaded)", diskIdx, got, len(sids))
|
||||
}
|
||||
for _, sid := range sids {
|
||||
if _, ok := loc.FindEcShard(vid, sid); !ok {
|
||||
t.Errorf("disk %d: shard %d missing", diskIdx, sid)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoadEcShardsWhenOwnerEcxIsInDataDir covers the legacy layout flagged
|
||||
// in PR #9244 review: -dir.idx is configured (so every DiskLocation has
|
||||
// IdxDirectory != Directory), but the owner's .ecx / .ecj / .vif were
|
||||
// written into the owner's data dir before -dir.idx was set. indexEcxOwners
|
||||
// must record the directory the .ecx was actually found in (Directory),
|
||||
// not just the owner's IdxDirectory — otherwise NewEcVolume's same-disk
|
||||
// fallback retries the orphan disk's data dir and ENOENTs there too.
|
||||
func TestLoadEcShardsWhenOwnerEcxIsInDataDir(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
dir0 := filepath.Join(tempDir, "data1") // orphan: shards only
|
||||
dir1 := filepath.Join(tempDir, "data2") // owner: .ecx in data dir
|
||||
idxDir := filepath.Join(tempDir, "idx") // shared idx folder, intentionally empty
|
||||
for _, d := range []string{dir0, dir1, idxDir} {
|
||||
if err := os.MkdirAll(d, 0o755); err != nil {
|
||||
t.Fatalf("mkdir %s: %v", d, err)
|
||||
}
|
||||
}
|
||||
|
||||
collection := "grafana-loki"
|
||||
vid := needle.VolumeId(4242)
|
||||
const dataShards, parityShards = 10, 4
|
||||
const datSize int64 = 10 * 1024 * 1024
|
||||
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
|
||||
|
||||
writeShard := func(dir string, shardId int) {
|
||||
t.Helper()
|
||||
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
|
||||
f, err := os.Create(base + erasure_coding.ToExt(shardId))
|
||||
if err != nil {
|
||||
t.Fatalf("create shard %d in %s: %v", shardId, dir, err)
|
||||
}
|
||||
if err := f.Truncate(expectedShardSize); err != nil {
|
||||
f.Close()
|
||||
t.Fatalf("truncate shard %d in %s: %v", shardId, dir, err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
writeShard(dir0, 0)
|
||||
writeShard(dir0, 12)
|
||||
writeShard(dir1, 1)
|
||||
|
||||
// Owner's .ecx / .ecj / .vif live in dir1 (data dir), NOT idxDir.
|
||||
// This mirrors a server that ran without -dir.idx, then later got it
|
||||
// configured — the index files stay in their original on-disk home.
|
||||
base1Data := erasure_coding.EcShardFileName(collection, dir1, int(vid))
|
||||
if err := os.WriteFile(base1Data+".ecx", make([]byte, 20), 0o644); err != nil {
|
||||
t.Fatalf("write .ecx in data dir: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(base1Data+".ecj", nil, 0o644); err != nil {
|
||||
t.Fatalf("write .ecj in data dir: %v", err)
|
||||
}
|
||||
if err := volume_info.SaveVolumeInfo(base1Data+".vif", &volume_server_pb.VolumeInfo{
|
||||
Version: uint32(needle.Version3),
|
||||
DatFileSize: datSize,
|
||||
EcShardConfig: &volume_server_pb.EcShardConfig{
|
||||
DataShards: dataShards,
|
||||
ParityShards: parityShards,
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("save .vif in data dir: %v", err)
|
||||
}
|
||||
|
||||
// idxDir is configured but intentionally empty for this volume — we
|
||||
// want IdxDirectory != Directory while the .ecx lives in Directory.
|
||||
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
|
||||
[]string{dir0, dir1},
|
||||
[]int32{100, 100},
|
||||
[]util.MinFreeSpace{{}, {}},
|
||||
idxDir,
|
||||
NeedleMapInMemory,
|
||||
[]types.DiskType{types.HardDriveType, types.HardDriveType},
|
||||
nil,
|
||||
3,
|
||||
)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-store.NewVolumesChan:
|
||||
case <-store.NewEcShardsChan:
|
||||
case <-store.DeletedVolumesChan:
|
||||
case <-store.DeletedEcShardsChan:
|
||||
case <-store.StateUpdateChan:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
store.Close()
|
||||
close(done)
|
||||
})
|
||||
|
||||
loc1 := store.Locations[1]
|
||||
if _, found := loc1.FindEcShard(vid, 1); !found {
|
||||
t.Fatalf("baseline broken: shard 1 on dir1 (which has .ecx in its data dir) was not loaded")
|
||||
}
|
||||
|
||||
loc0 := store.Locations[0]
|
||||
for _, sid := range []erasure_coding.ShardId{0, 12} {
|
||||
if _, found := loc0.FindEcShard(vid, sid); !found {
|
||||
t.Errorf("issue #9212 (PR #9244 review): orphan shard %d on dir0 not loaded; reconcile pointed loader at IdxDirectory but .ecx was actually in owner.Directory", sid)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
)
|
||||
|
||||
// ecKeyForReconcile keys orphan-shard reconciliation by collection + volume
|
||||
// id. Per-collection grouping matters because two collections can re-use the
|
||||
// same volume id, and we must only pair shards with their own .ecx file.
|
||||
type ecKeyForReconcile struct {
|
||||
collection string
|
||||
vid needle.VolumeId
|
||||
}
|
||||
|
||||
// ecxOwnerInfo records both the disk that owns the .ecx and the actual
|
||||
// directory it lives in (IdxDirectory or Directory). The directory matters
|
||||
// because indexEcxOwners scans both — when .ecx lives in Directory (the
|
||||
// legacy "written before -dir.idx was set" layout that removeEcVolumeFiles
|
||||
// in disk_location_ec.go also keeps cleaning up), passing the owner's
|
||||
// IdxDirectory to NewEcVolume would ENOENT both the primary and the
|
||||
// same-disk fallback path, which uses the orphan disk's data dir, not the
|
||||
// owner's. Tracking the actual scan dir lets reconcile point loaders at
|
||||
// the directory the .ecx is really in.
|
||||
type ecxOwnerInfo struct {
|
||||
location *DiskLocation
|
||||
idxDir string
|
||||
}
|
||||
|
||||
// reconcileEcShardsAcrossDisks loads EC shards that the per-disk scan in
|
||||
// loadAllEcShards skipped because the disk holding the .ec?? files does not
|
||||
// also hold the matching .ecx / .ecj / .vif index files. The index files
|
||||
// are located on a different disk of the same volume server (issue #9212).
|
||||
//
|
||||
// Per-disk loadAllEcShards correctly leaves these orphan shards on disk —
|
||||
// it does not have visibility into other DiskLocations on the same store —
|
||||
// so the cross-disk fan-out must happen here, after every disk's initial
|
||||
// pass has completed. We register each shard against its physical disk's
|
||||
// ecVolumes map (so heartbeat reporting carries the right DiskId per
|
||||
// shard), but point the EcVolume at the sibling disk's index files so it
|
||||
// can serve reads and route deletes through a real .ecx / .ecj.
|
||||
func (s *Store) reconcileEcShardsAcrossDisks() {
|
||||
if len(s.Locations) < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
ecxOwners := s.indexEcxOwners()
|
||||
if len(ecxOwners) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, loc := range s.Locations {
|
||||
orphans := loc.collectOrphanEcShards()
|
||||
if len(orphans) == 0 {
|
||||
continue
|
||||
}
|
||||
for key, shards := range orphans {
|
||||
owner, ok := ecxOwners[key]
|
||||
if !ok {
|
||||
glog.Warningf("ec volume %d (collection=%q) has shards on %s without a matching .ecx anywhere on this volume server; shards %v will stay unloaded until the missing .ecx is restored",
|
||||
key.vid, key.collection, loc.Directory, shards)
|
||||
continue
|
||||
}
|
||||
if owner.location == loc {
|
||||
// .ecx is on this same disk, but loadAllEcShards still
|
||||
// did not load these shards — handleFoundEcxFile already
|
||||
// logged the underlying failure. Don't try again here.
|
||||
continue
|
||||
}
|
||||
glog.V(0).Infof("ec volume %d (collection=%q): loading orphan shards %v on %s using index files from %s (issue #9212)",
|
||||
key.vid, key.collection, shards, loc.Directory, owner.idxDir)
|
||||
if err := loc.loadEcShardsWithIdxDir(shards, key.collection, key.vid, owner.idxDir, loc.ecShardNotifyHandler); err != nil {
|
||||
glog.Errorf("ec volume %d on %s: cross-disk shard load failed: %v", key.vid, loc.Directory, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// indexEcxOwners returns the disk and the actual directory that owns the
|
||||
// .ecx file for each (collection, vid) on this store. .ecx normally lives
|
||||
// in IdxDirectory but may have been written into the data directory before
|
||||
// -dir.idx was set, so we check both — and we record which one matched so
|
||||
// downstream loaders point NewEcVolume at the directory that really has
|
||||
// the file. The first owner found wins; duplicates across disks are
|
||||
// unusual but tolerated.
|
||||
func (s *Store) indexEcxOwners() map[ecKeyForReconcile]ecxOwnerInfo {
|
||||
owners := make(map[ecKeyForReconcile]ecxOwnerInfo)
|
||||
for _, loc := range s.Locations {
|
||||
seen := make(map[string]bool, 2)
|
||||
for _, scan := range []string{loc.IdxDirectory, loc.Directory} {
|
||||
if scan == "" || seen[scan] {
|
||||
continue
|
||||
}
|
||||
seen[scan] = true
|
||||
entries, err := os.ReadDir(scan)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
name := entry.Name()
|
||||
if !strings.HasSuffix(name, ".ecx") {
|
||||
continue
|
||||
}
|
||||
base := name[:len(name)-len(".ecx")]
|
||||
collection, vid, err := parseCollectionVolumeId(base)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
key := ecKeyForReconcile{collection: collection, vid: vid}
|
||||
if _, exists := owners[key]; !exists {
|
||||
owners[key] = ecxOwnerInfo{location: loc, idxDir: scan}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return owners
|
||||
}
|
||||
|
||||
// collectOrphanEcShards walks the disk's data directory and returns the
|
||||
// .ec?? shard files that are present on disk but not yet registered to an
|
||||
// EcVolume in memory. The map is keyed by (collection, vid) so callers can
|
||||
// match each group against the .ecx-owning disk in one lookup.
|
||||
//
|
||||
// Zero-byte shard files are ignored — loadAllEcShards already treats them
|
||||
// as cleanup-worthy noise and we want the same shape here.
|
||||
func (l *DiskLocation) collectOrphanEcShards() map[ecKeyForReconcile][]string {
|
||||
entries, err := os.ReadDir(l.Directory)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
orphans := make(map[ecKeyForReconcile][]string)
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
name := entry.Name()
|
||||
ext := path.Ext(name)
|
||||
if !re.MatchString(ext) {
|
||||
continue
|
||||
}
|
||||
info, err := entry.Info()
|
||||
if err != nil || info.Size() == 0 {
|
||||
continue
|
||||
}
|
||||
shardId, err := strconv.ParseInt(ext[3:], 10, 64)
|
||||
if err != nil || shardId < 0 || shardId > 255 {
|
||||
continue
|
||||
}
|
||||
base := name[:len(name)-len(ext)]
|
||||
collection, vid, err := parseCollectionVolumeId(base)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if _, loaded := l.FindEcShard(vid, erasure_coding.ShardId(shardId)); loaded {
|
||||
continue
|
||||
}
|
||||
key := ecKeyForReconcile{collection: collection, vid: vid}
|
||||
orphans[key] = append(orphans[key], name)
|
||||
}
|
||||
return orphans
|
||||
}
|
||||
Reference in New Issue
Block a user