fix(ec): crash-safe ecx-journal fold and shard rebuild (fsync before publish, no short-read-as-success) (#9938)

* fix(ec): make ecx-journal fold and shard rebuild crash-safe

Two EC rebuild paths could silently lose or corrupt data:

RebuildEcxFile folded the .ecj deletion journal into .ecx (in-place
WriteAt tombstones) and then unlinked the journal without flushing the
.ecx writes first. A crash could persist the unlink ahead of the
tombstones, resurrecting deleted needles on the next load. It also read
journal records with a bare n!=size break, so a torn tail silently
dropped the remaining tombstones before the unlink. Now: read records
with io.ReadFull (io.EOF ends cleanly, a torn tail aborts and leaves
.ecj in place for retry), fsync .ecx before removing the journal.

rebuildEcFiles treated a zero/short ReadAt as a clean end-of-input and
discarded the read error, so a truncated or unreadable input shard
produced truncated regenerated shards that were then published as
restored redundancy; the regenerated shards were also never fsynced on
the no-sidecar path. Now: derive the expected shard size from the
present inputs up front (rejecting a divergent/zero-size input), drive
the loop by that size, fail on any short read or short write, and fsync
every regenerated shard before it is mounted/renamed.

Rust volume server mirrors the rebuild fix: rebuild_ec_files now checks
the read_at byte count (it previously discarded it, the same truncation
bug). The Rust ecx fold already synced .ecx before removing the journal.

Custom EC ratios are unaffected: the shard size derives from the input
shards and the loop uses the .vif-resolved data/parity counts, never a
hardcoded 10+4.

* storage: close ecx journal files via defer in RebuildEcxFile

Per review: a single deferred Close per file replaces the per-error-path
manual closes, so new early returns cannot leak descriptors. The journal
is still closed explicitly before its unlink since Windows cannot delete
an open file; the deferred second Close is a harmless no-op.
This commit is contained in:
Chris Lu
2026-06-12 22:28:56 -07:00
committed by GitHub
parent 871d7ddc02
commit 18cdb3819b
4 changed files with 428 additions and 36 deletions
@@ -136,11 +136,22 @@ pub fn rebuild_ec_files(
// Allocate buffers for all shards. Option<Vec<u8>> is required by rs.reconstruct()
let mut buffers: Vec<Option<Vec<u8>>> = vec![None; total_shards];
// Read available shards
// Read available shards. A short read means a truncated/corrupt input
// shard; treat it as an error rather than reconstructing over a
// zero-padded tail and publishing the result as restored redundancy.
for (i, shard) in shards.iter().enumerate() {
if !missing_shard_ids.contains(&(i as u32)) {
let mut buf = vec![0u8; to_process];
shard.read_at(&mut buf, offset)?;
let n = shard.read_at(&mut buf, offset)?;
if n != to_process {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"ec rebuild short read shard {} at {}: got {} want {}",
i, offset, n, to_process
),
));
}
buffers[i] = Some(buf);
}
}
@@ -655,6 +666,81 @@ mod tests {
assert!(std::path::Path::new(&ecx_path).exists());
}
// encode_sample_volume writes a small volume and EC-encodes it, returning
// the dir path so a test can drop/truncate shards and rebuild.
fn encode_sample_volume(tmp: &TempDir) -> String {
let dir = tmp.path().to_str().unwrap().to_string();
let mut v = Volume::new(
&dir,
&dir,
"",
VolumeId(1),
NeedleMapKind::InMemory,
None,
None,
0,
Version::current(),
)
.unwrap();
for i in 1..=20 {
let data = format!("test data for needle {} padded with bytes", i).repeat(64);
let mut n = Needle {
id: NeedleId(i),
cookie: Cookie(i as u32),
data: data.as_bytes().to_vec(),
data_size: data.len() as u32,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
}
v.sync_to_disk().unwrap();
v.close();
write_ec_files(&dir, &dir, "", VolumeId(1), 10, 4).unwrap();
dir
}
// A truncated/corrupt input shard must abort rebuild_ec_files with an error
// rather than reconstructing over a zero-padded tail and publishing a
// truncated shard as restored redundancy.
#[test]
fn test_rebuild_ec_files_short_read_input_errors() {
let tmp = TempDir::new().unwrap();
let dir = encode_sample_volume(&tmp);
// Truncate a present input shard to half its size.
let victim = format!("{}/1.ec03", dir);
let full = std::fs::metadata(&victim).unwrap().len();
assert!(full > 0, "encoded shard should be non-empty");
let f = std::fs::OpenOptions::new().write(true).open(&victim).unwrap();
f.set_len(full / 2).unwrap();
drop(f);
// Rebuild a different (genuinely missing) shard.
std::fs::remove_file(format!("{}/1.ec07", dir)).unwrap();
let res = rebuild_ec_files(&dir, "", VolumeId(1), &[7], 10, 4);
assert!(res.is_err(), "truncated input shard must abort the rebuild");
}
// Happy path: dropping a shard and rebuilding it from the rest must succeed
// and recreate the shard file (the short-read guard must not false-positive).
#[test]
fn test_rebuild_ec_files_happy_path() {
let tmp = TempDir::new().unwrap();
let dir = encode_sample_volume(&tmp);
let dropped = format!("{}/1.ec07", dir);
std::fs::remove_file(&dropped).unwrap();
rebuild_ec_files(&dir, "", VolumeId(1), &[7], 10, 4).unwrap();
assert!(
std::path::Path::new(&dropped).exists(),
"rebuilt shard .ec07 should exist"
);
assert!(
std::fs::metadata(&dropped).unwrap().len() > 0,
"rebuilt shard should be non-empty"
);
}
#[test]
fn test_reed_solomon_basic() {
let data_shards = 10;
+68 -30
View File
@@ -510,6 +510,29 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
return fmt.Errorf("failed to create encoder: %w", err)
}
// The output shard size equals the present input shards' size (all EC
// shards are equal length). Deriving it up front turns a short read on a
// truncated/corrupt input into an error instead of a silent early return
// that would publish truncated shards as restored redundancy.
var expectedShardSize int64 = -1
for i := 0; i < ctx.Total(); i++ {
if !shardHasData[i] {
continue
}
fi, statErr := inputFiles[i].Stat()
if statErr != nil {
return fmt.Errorf("stat input shard %d: %w", i, statErr)
}
if expectedShardSize < 0 {
expectedShardSize = fi.Size()
} else if fi.Size() != expectedShardSize {
return fmt.Errorf("ec rebuild: input shard %d size %d != %d (truncated input?)", i, fi.Size(), expectedShardSize)
}
}
if expectedShardSize <= 0 {
return fmt.Errorf("ec rebuild: no input shard data (expected shard size %d)", expectedShardSize)
}
buffers := make([][]byte, ctx.Total())
for i := range buffers {
if shardHasData[i] {
@@ -517,46 +540,61 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
}
var startOffset int64
var inputBufferDataSize int
for {
// read the input data from files
for i := 0; i < ctx.Total(); i++ {
if shardHasData[i] {
n, _ := inputFiles[i].ReadAt(buffers[i], startOffset)
if n == 0 {
return nil
}
if inputBufferDataSize == 0 {
inputBufferDataSize = n
}
if inputBufferDataSize != n {
return fmt.Errorf("ec shard size expected %d actual %d", inputBufferDataSize, n)
}
} else {
buffers[i] = nil
}
for startOffset := int64(0); startOffset < expectedShardSize; {
thisBlock := int64(ErasureCodingSmallBlockSize)
if remaining := expectedShardSize - startOffset; remaining < thisBlock {
thisBlock = remaining
}
// encode the data
err = enc.Reconstruct(buffers)
if err != nil {
// read the input data; a short read means a truncated input shard.
shards := make([][]byte, ctx.Total())
for i := 0; i < ctx.Total(); i++ {
if !shardHasData[i] {
continue // nil: reconstructed below
}
b := buffers[i][:thisBlock]
n, readErr := inputFiles[i].ReadAt(b, startOffset)
if readErr != nil && readErr != io.EOF {
return fmt.Errorf("ec rebuild read shard %d at %d: %w", i, startOffset, readErr)
}
if int64(n) != thisBlock {
return fmt.Errorf("ec rebuild short read shard %d at %d: got %d want %d", i, startOffset, n, thisBlock)
}
shards[i] = b
}
if err = enc.Reconstruct(shards); err != nil {
return fmt.Errorf("reconstruct: %w", err)
}
// write the data to output files
for i := 0; i < ctx.Total(); i++ {
if !shardHasData[i] {
n, _ := outputFiles[i].WriteAt(buffers[i][:inputBufferDataSize], startOffset)
if inputBufferDataSize != n {
return fmt.Errorf("fail to write to %s", outputFiles[i].Name())
}
if shardHasData[i] {
continue
}
n, writeErr := outputFiles[i].WriteAt(shards[i][:thisBlock], startOffset)
if writeErr != nil {
return fmt.Errorf("ec rebuild write shard %d at %d: %w", i, startOffset, writeErr)
}
if int64(n) != thisBlock {
return fmt.Errorf("ec rebuild short write shard %d at %d: got %d want %d", i, startOffset, n, thisBlock)
}
}
startOffset += int64(inputBufferDataSize)
startOffset += thisBlock
}
// Flush every regenerated shard before it is mounted/renamed and published
// as restored redundancy, so a crash cannot leave a peer trusting a shard
// whose bytes never reached disk.
for i := 0; i < ctx.Total(); i++ {
if shardHasData[i] {
continue
}
if err = outputFiles[i].Sync(); err != nil {
return fmt.Errorf("ec rebuild sync shard %d: %w", i, err)
}
}
return nil
}
func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) {
@@ -0,0 +1,252 @@
package erasure_coding
import (
"os"
"path/filepath"
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
// readEcxSizeField returns the size field of the .ecx entry for needleId, or
// (false) if the id is absent. Entries are fixed-width (id, offset, size).
func readEcxSizeField(t *testing.T, ecxPath string, needleId types.NeedleId) (types.Size, bool) {
t.Helper()
data, err := os.ReadFile(ecxPath)
if err != nil {
t.Fatalf("read ecx: %v", err)
}
entry := types.NeedleIdSize + types.OffsetSize + types.SizeSize
for off := 0; off+entry <= len(data); off += entry {
id := types.BytesToNeedleId(data[off : off+types.NeedleIdSize])
if id == needleId {
return types.BytesToSize(data[off+types.NeedleIdSize+types.OffsetSize : off+entry]), true
}
}
return 0, false
}
func writeEcxEntries(t *testing.T, ecxPath string, ids []types.NeedleId) {
t.Helper()
var buf []byte
for i, id := range ids {
b := make([]byte, types.NeedleIdSize+types.OffsetSize+types.SizeSize)
types.NeedleIdToBytes(b[0:types.NeedleIdSize], id)
// A plausible non-zero offset/size; the fold only rewrites the size field.
types.OffsetToBytes(b[types.NeedleIdSize:types.NeedleIdSize+types.OffsetSize], types.ToOffset(int64((i+1)*1024)))
types.SizeToBytes(b[types.NeedleIdSize+types.OffsetSize:], types.Size(100))
buf = append(buf, b...)
}
if err := os.WriteFile(ecxPath, buf, 0644); err != nil {
t.Fatalf("write ecx: %v", err)
}
}
func packEcjIds(ids []types.NeedleId) []byte {
var buf []byte
for _, id := range ids {
b := make([]byte, types.NeedleIdSize)
types.NeedleIdToBytes(b, id)
buf = append(buf, b...)
}
return buf
}
// TestRebuildEcxFile_AppliesTombstonesAndRemovesJournal: a clean fold marks the
// journaled needle deleted in .ecx and removes the journal.
func TestRebuildEcxFile_AppliesTombstonesAndRemovesJournal(t *testing.T) {
base := filepath.Join(t.TempDir(), "vol")
writeEcxEntries(t, base+".ecx", []types.NeedleId{1, 2, 3})
if err := os.WriteFile(base+".ecj", packEcjIds([]types.NeedleId{2}), 0644); err != nil {
t.Fatalf("write ecj: %v", err)
}
if err := RebuildEcxFile(base); err != nil {
t.Fatalf("RebuildEcxFile: %v", err)
}
if _, err := os.Stat(base + ".ecj"); !os.IsNotExist(err) {
t.Errorf(".ecj should be removed after a clean fold, stat err=%v", err)
}
if sz, ok := readEcxSizeField(t, base+".ecx", 2); !ok || sz != types.TombstoneFileSize {
t.Errorf("needle 2 should be tombstoned in .ecx, got size=%d ok=%v", sz, ok)
}
for _, live := range []types.NeedleId{1, 3} {
if sz, ok := readEcxSizeField(t, base+".ecx", live); !ok || sz == types.TombstoneFileSize {
t.Errorf("needle %d should remain live, got size=%d ok=%v", live, sz, ok)
}
}
}
// TestRebuildEcxFile_TornJournalAborts: a journal whose tail is a partial record
// (e.g. interrupted append) must abort the fold and leave .ecj in place for a
// retry, rather than silently dropping the remaining tombstones before unlink.
func TestRebuildEcxFile_TornJournalAborts(t *testing.T) {
base := filepath.Join(t.TempDir(), "vol")
writeEcxEntries(t, base+".ecx", []types.NeedleId{1, 2, 3})
// One full id followed by a truncated trailing record.
torn := append(packEcjIds([]types.NeedleId{2}), []byte{0x01, 0x02, 0x03}...)
if err := os.WriteFile(base+".ecj", torn, 0644); err != nil {
t.Fatalf("write ecj: %v", err)
}
if err := RebuildEcxFile(base); err == nil {
t.Fatal("expected an error folding a torn journal, got nil")
}
if _, err := os.Stat(base + ".ecj"); err != nil {
t.Errorf(".ecj must be retained after a torn-journal abort, stat err=%v", err)
}
}
// TestRebuildEcxFile_NoJournalNoop: with no .ecj the fold is a no-op and does
// not error (idempotent re-run after a prior successful fold).
func TestRebuildEcxFile_NoJournalNoop(t *testing.T) {
base := filepath.Join(t.TempDir(), "vol")
writeEcxEntries(t, base+".ecx", []types.NeedleId{1, 2, 3})
before, _ := os.ReadFile(base + ".ecx")
if err := RebuildEcxFile(base); err != nil {
t.Fatalf("RebuildEcxFile with no journal: %v", err)
}
after, _ := os.ReadFile(base + ".ecx")
if len(before) != len(after) {
t.Errorf(".ecx changed despite no journal: %d -> %d bytes", len(before), len(after))
}
}
// makeRebuildInputs creates `present` equal-size input shard files of `size`
// bytes and one missing output file, returning slices sized to ctx.Total().
func makeRebuildInputs(t *testing.T, dir string, present, missingIdx int, size int) ([]bool, []*os.File, []*os.File) {
t.Helper()
ctx := NewDefaultECContext("", 0)
shardHasData := make([]bool, ctx.Total())
inputFiles := make([]*os.File, ctx.Total())
outputFiles := make([]*os.File, ctx.Total())
for i := 0; i < present; i++ {
p := filepath.Join(dir, "in"+ToExt(i))
if err := os.WriteFile(p, make([]byte, size), 0644); err != nil {
t.Fatalf("write input %d: %v", i, err)
}
f, err := os.Open(p)
if err != nil {
t.Fatalf("open input %d: %v", i, err)
}
t.Cleanup(func() { f.Close() })
inputFiles[i] = f
shardHasData[i] = true
}
outPath := filepath.Join(dir, "out"+ToExt(missingIdx))
of, err := os.OpenFile(outPath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
t.Fatalf("open output: %v", err)
}
t.Cleanup(func() { of.Close() })
outputFiles[missingIdx] = of
shardHasData[missingIdx] = false
return shardHasData, inputFiles, outputFiles
}
// TestRebuildEcFiles_TruncatedInputErrorsWithoutPublishing: a truncated input
// shard must abort the rebuild before any output is written, rather than
// reconstructing over a zero-padded tail and publishing a corrupt shard.
func TestRebuildEcFiles_TruncatedInputErrorsWithoutPublishing(t *testing.T) {
dir := t.TempDir()
ctx := NewDefaultECContext("", 0)
const size = 4096
shardHasData, inputFiles, outputFiles := makeRebuildInputs(t, dir, ctx.Total()-1, ctx.Total()-1, size)
// Truncate one present input to half.
if err := os.Truncate(filepath.Join(dir, "in"+ToExt(5)), size/2); err != nil {
t.Fatalf("truncate: %v", err)
}
err := rebuildEcFiles(shardHasData, inputFiles, outputFiles, ctx)
if err == nil {
t.Fatal("expected an error on a truncated input shard, got nil")
}
if fi, _ := os.Stat(filepath.Join(dir, "out"+ToExt(ctx.Total()-1))); fi != nil && fi.Size() != 0 {
t.Errorf("output shard must not be published on a truncated input, got %d bytes", fi.Size())
}
}
// TestRebuildEcFiles_ZeroSizeInputsErrors: all-empty input shards must error
// rather than producing and publishing zero-length output shards.
func TestRebuildEcFiles_ZeroSizeInputsErrors(t *testing.T) {
dir := t.TempDir()
ctx := NewDefaultECContext("", 0)
shardHasData, inputFiles, outputFiles := makeRebuildInputs(t, dir, ctx.Total()-1, ctx.Total()-1, 0)
if err := rebuildEcFiles(shardHasData, inputFiles, outputFiles, ctx); err == nil {
t.Fatal("expected an error on zero-size input shards, got nil")
}
}
// TestRebuildEcFiles_HappyPathRebuildsByteIdentical: a real encode, drop one
// shard, rebuild, and confirm the regenerated shard is byte-identical. The tiny
// .dat exercises the sub-1MB final-block path (the guard must not false-positive
// on a legitimate short tail).
func TestRebuildEcFiles_HappyPathRebuildsByteIdentical(t *testing.T) {
dir := t.TempDir()
base := filepath.Join(dir, "vol")
ctx := NewDefaultECContext("", 0)
writeRandomDat(t, base, 7000)
if _, err := generateEcFiles(base, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx); err != nil {
t.Fatalf("generateEcFiles: %v", err)
}
const dropped = 5
want, err := os.ReadFile(base + ToExt(dropped))
if err != nil {
t.Fatalf("read shard %d: %v", dropped, err)
}
if err := os.Remove(base + ToExt(dropped)); err != nil {
t.Fatalf("remove shard: %v", err)
}
if _, err := RebuildEcFiles(base, ctx, true); err != nil {
t.Fatalf("RebuildEcFiles: %v", err)
}
got, err := os.ReadFile(base + ToExt(dropped))
if err != nil {
t.Fatalf("read rebuilt shard: %v", err)
}
if string(got) != string(want) {
t.Errorf("rebuilt shard %d differs from original (%d vs %d bytes)", dropped, len(got), len(want))
}
}
// TestRebuildEcFiles_CustomRatioRebuildsByteIdentical: the same for a 9+3 ratio,
// confirming no 10+4 assumption leaks into the destructive rebuild path.
func TestRebuildEcFiles_CustomRatioRebuildsByteIdentical(t *testing.T) {
dir := t.TempDir()
base := filepath.Join(dir, "vol")
ctx := &ECContext{DataShards: 9, ParityShards: 3}
writeRandomDat(t, base, 9000)
if _, err := generateEcFiles(base, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx); err != nil {
t.Fatalf("generateEcFiles: %v", err)
}
const dropped = 7
want, err := os.ReadFile(base + ToExt(dropped))
if err != nil {
t.Fatalf("read shard %d: %v", dropped, err)
}
if err := os.Remove(base + ToExt(dropped)); err != nil {
t.Fatalf("remove shard: %v", err)
}
if _, err := RebuildEcFiles(base, ctx, true); err != nil {
t.Fatalf("RebuildEcFiles: %v", err)
}
got, err := os.ReadFile(base + ToExt(dropped))
if err != nil {
t.Fatalf("read rebuilt shard: %v", err)
}
if string(got) != string(want) {
t.Errorf("rebuilt 9+3 shard %d differs (%d vs %d bytes)", dropped, len(got), len(want))
}
// No shard position beyond the 9+3 layout must have been created.
if _, err := os.Stat(base + ToExt(12)); err == nil {
t.Errorf("unexpected shard .ec12 for a 9+3 volume")
}
}
@@ -123,27 +123,43 @@ func RebuildEcxFile(baseFileName string) error {
if err != nil {
return fmt.Errorf("rebuild: failed to open ecj file: %w", err)
}
defer ecjFile.Close()
buf := make([]byte, types.NeedleIdSize)
for {
n, _ := ecjFile.Read(buf)
if n != types.NeedleIdSize {
// io.ReadFull distinguishes a clean end (io.EOF) from a torn tail
// (io.ErrUnexpectedEOF) and a transient short read; a bare n!=size
// break would silently drop the rest of the journal and then unlink it.
_, readErr := io.ReadFull(ecjFile, buf)
if readErr == io.EOF {
break
}
if readErr != nil {
// Torn or unreadable journal: abort and leave .ecj in place so a
// retry can re-apply the deletions rather than resurrect them.
return fmt.Errorf("rebuild: read ecj: %w", readErr)
}
needleId := types.BytesToNeedleId(buf)
_, _, err = SearchNeedleFromSortedIndex(ecxFile, ecxFileSize, needleId, MarkNeedleDeleted)
if err != nil && err != NotFoundError {
ecxFile.Close()
return err
}
}
ecxFile.Close()
// Flush the in-place tombstones before removing the journal; otherwise a
// crash can persist the .ecj unlink ahead of the .ecx writes and resurrect
// the deleted needles on the next load.
if err = ecxFile.Sync(); err != nil {
return fmt.Errorf("rebuild: sync ecx: %w", err)
}
// Close the journal before unlinking it (Windows cannot delete an open
// file); the deferred Close becomes a harmless no-op.
ecjFile.Close()
os.Remove(baseFileName + ".ecj")
return nil