mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(volume): don't panic on read when needle map is nil (#9342)
* fix(volume): don't panic on read when needle map is nil A failed CommitCompact reload (and #9335's new error path for a remote-tiered volume with a stray .vif but no .idx) leaves v.nm == nil on a volume that's still in the store. readNeedle / readNeedleDataInto dereferenced v.nm with no guard, so the next GET segfaulted the http handler instead of returning an error the client could retry on another replica. Add the same v.nm == nil check the other Volume accessors already use, including the slow-read inner loop where the lock is released between iterations and a failed reload can race in. Fixes #9339. * match rust nm-nil read behavior; trim comments seaweed-volume's read_needle_with_option / re_lookup_needle_data_offset already lift Option<NeedleMap> through ok_or(NotFound). Use ErrorNotFound on the Go side too instead of a generic 500-mapped error so both volume servers respond identically when v.nm is nil. * log once when reads hit nil needle map ErrorNotFound alone hides the real cause: a half-loaded volume just returns 404s and the operator has nothing to grep for. Add a once-per- volume Errorf on the nil path, reset on successful load. Mirror the same in seaweed-volume via nm_or_not_found(). * trim comments * drop once-flag, log inline on every nil-nm read
This commit is contained in:
@@ -773,6 +773,13 @@ impl Volume {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn nm_or_not_found(&self) -> Result<&NeedleMap, VolumeError> {
|
||||
self.nm.as_ref().ok_or_else(|| {
|
||||
tracing::warn!(volume_id = self.id.0, "needle map not loaded");
|
||||
VolumeError::NotFound
|
||||
})
|
||||
}
|
||||
|
||||
fn load_index(&mut self) -> Result<(), VolumeError> {
|
||||
let use_redb = matches!(
|
||||
self.needle_map_kind,
|
||||
@@ -1058,7 +1065,7 @@ impl Volume {
|
||||
read_option: &mut ReadOption,
|
||||
) -> Result<i32, VolumeError> {
|
||||
let _guard = self.data_file_access_control.read_lock();
|
||||
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
|
||||
let nm = self.nm_or_not_found()?;
|
||||
let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?;
|
||||
|
||||
if nv.offset.is_zero() {
|
||||
@@ -1300,7 +1307,7 @@ impl Volume {
|
||||
read_deleted: bool,
|
||||
) -> Result<NeedleStreamInfo, VolumeError> {
|
||||
let _guard = self.data_file_access_control.read_lock();
|
||||
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
|
||||
let nm = self.nm_or_not_found()?;
|
||||
let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?;
|
||||
|
||||
if nv.offset.is_zero() {
|
||||
@@ -1402,7 +1409,7 @@ impl Volume {
|
||||
&self,
|
||||
needle_id: NeedleId,
|
||||
) -> Result<(u64, u16), VolumeError> {
|
||||
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
|
||||
let nm = self.nm_or_not_found()?;
|
||||
let nv = nm.get(needle_id).ok_or(VolumeError::NotFound)?;
|
||||
if nv.offset.is_zero() {
|
||||
return Err(VolumeError::NotFound);
|
||||
@@ -1719,7 +1726,7 @@ impl Volume {
|
||||
/// Read all live needles from the volume (for ReadAllNeedles streaming RPC).
|
||||
pub fn read_all_needles(&self) -> Result<Vec<Needle>, VolumeError> {
|
||||
let _guard = self.data_file_access_control.read_lock();
|
||||
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
|
||||
let nm = self.nm_or_not_found()?;
|
||||
let version = self.version();
|
||||
let dat_size = self.current_dat_file_size()? as i64;
|
||||
let mut needles = Vec::new();
|
||||
@@ -1908,7 +1915,7 @@ impl Volume {
|
||||
if self.dat_file.is_none() && self.remote_dat_file.is_none() {
|
||||
return Err(VolumeError::NotFound);
|
||||
}
|
||||
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
|
||||
let nm = self.nm_or_not_found()?;
|
||||
let dat_size = self.dat_file_size().map_err(VolumeError::Io)?;
|
||||
|
||||
let mut files_checked: u64 = 0;
|
||||
@@ -1968,7 +1975,7 @@ impl Volume {
|
||||
if self.dat_file.is_none() && self.remote_dat_file.is_none() {
|
||||
return Err(VolumeError::NotFound);
|
||||
}
|
||||
let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?;
|
||||
let nm = self.nm_or_not_found()?;
|
||||
|
||||
let dat_size = self.dat_file_size().map_err(|e| VolumeError::Io(e))?;
|
||||
let version = self.version();
|
||||
|
||||
@@ -22,6 +22,11 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize
|
||||
v.dataFileAccessLock.RLock()
|
||||
defer v.dataFileAccessLock.RUnlock()
|
||||
|
||||
if v.nm == nil {
|
||||
glog.V(0).Infof("volume %d: needle map not loaded; read returns not-found", v.Id)
|
||||
return -1, ErrorNotFound
|
||||
}
|
||||
|
||||
nv, ok := v.nm.Get(n.Id)
|
||||
if !ok || nv.Offset.IsZero() {
|
||||
return -1, ErrorNotFound
|
||||
@@ -108,6 +113,13 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
|
||||
if readOption.HasSlowRead {
|
||||
v.dataFileAccessLock.RLock()
|
||||
}
|
||||
if v.nm == nil {
|
||||
if readOption.HasSlowRead {
|
||||
v.dataFileAccessLock.RUnlock()
|
||||
}
|
||||
glog.V(0).Infof("volume %d: needle map not loaded; read returns not-found", v.Id)
|
||||
return ErrorNotFound
|
||||
}
|
||||
nv, ok := v.nm.Get(n.Id)
|
||||
if readOption.HasSlowRead {
|
||||
v.dataFileAccessLock.RUnlock()
|
||||
@@ -146,6 +158,13 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr
|
||||
}
|
||||
// possibly re-read needle offset if volume is compacted
|
||||
if readOption.VolumeRevision != v.SuperBlock.CompactionRevision {
|
||||
if v.nm == nil {
|
||||
if readOption.HasSlowRead {
|
||||
v.dataFileAccessLock.RUnlock()
|
||||
}
|
||||
glog.V(0).Infof("volume %d: needle map not loaded mid-read", v.Id)
|
||||
return ErrorNotFound
|
||||
}
|
||||
// the volume is compacted
|
||||
nv, ok = v.nm.Get(n.Id)
|
||||
if !ok || nv.Offset.IsZero() {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
@@ -9,6 +10,35 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestReadNeedleNilNeedleMap(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("volume creation: %v", err)
|
||||
}
|
||||
defer v.Close()
|
||||
|
||||
v.dataFileAccessLock.Lock()
|
||||
if v.nm != nil {
|
||||
v.nm.Close()
|
||||
v.nm = nil
|
||||
}
|
||||
v.dataFileAccessLock.Unlock()
|
||||
|
||||
n := new(needle.Needle)
|
||||
n.Id = types.Uint64ToNeedleId(1)
|
||||
|
||||
if _, err := v.readNeedle(n, &ReadOption{}, nil); err != ErrorNotFound {
|
||||
t.Fatalf("readNeedle: want ErrorNotFound, got %v", err)
|
||||
}
|
||||
|
||||
err = v.readNeedleDataInto(n, &ReadOption{ReadBufferSize: 1024}, &bytes.Buffer{}, 0, 0)
|
||||
if err != ErrorNotFound {
|
||||
t.Fatalf("readNeedleDataInto: want ErrorNotFound, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user