diff --git a/seaweed-volume/src/storage/volume.rs b/seaweed-volume/src/storage/volume.rs index fb68056e3..52274d5b0 100644 --- a/seaweed-volume/src/storage/volume.rs +++ b/seaweed-volume/src/storage/volume.rs @@ -773,6 +773,13 @@ impl Volume { Ok(()) } + fn nm_or_not_found(&self) -> Result<&NeedleMap, VolumeError> { + self.nm.as_ref().ok_or_else(|| { + tracing::warn!(volume_id = self.id.0, "needle map not loaded"); + VolumeError::NotFound + }) + } + fn load_index(&mut self) -> Result<(), VolumeError> { let use_redb = matches!( self.needle_map_kind, @@ -1058,7 +1065,7 @@ impl Volume { read_option: &mut ReadOption, ) -> Result { let _guard = self.data_file_access_control.read_lock(); - let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; + let nm = self.nm_or_not_found()?; let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?; if nv.offset.is_zero() { @@ -1300,7 +1307,7 @@ impl Volume { read_deleted: bool, ) -> Result { let _guard = self.data_file_access_control.read_lock(); - let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; + let nm = self.nm_or_not_found()?; let nv = nm.get(n.id).ok_or(VolumeError::NotFound)?; if nv.offset.is_zero() { @@ -1402,7 +1409,7 @@ impl Volume { &self, needle_id: NeedleId, ) -> Result<(u64, u16), VolumeError> { - let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; + let nm = self.nm_or_not_found()?; let nv = nm.get(needle_id).ok_or(VolumeError::NotFound)?; if nv.offset.is_zero() { return Err(VolumeError::NotFound); @@ -1719,7 +1726,7 @@ impl Volume { /// Read all live needles from the volume (for ReadAllNeedles streaming RPC). pub fn read_all_needles(&self) -> Result, VolumeError> { let _guard = self.data_file_access_control.read_lock(); - let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; + let nm = self.nm_or_not_found()?; let version = self.version(); let dat_size = self.current_dat_file_size()? as i64; let mut needles = Vec::new(); @@ -1908,7 +1915,7 @@ impl Volume { if self.dat_file.is_none() && self.remote_dat_file.is_none() { return Err(VolumeError::NotFound); } - let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; + let nm = self.nm_or_not_found()?; let dat_size = self.dat_file_size().map_err(VolumeError::Io)?; let mut files_checked: u64 = 0; @@ -1968,7 +1975,7 @@ impl Volume { if self.dat_file.is_none() && self.remote_dat_file.is_none() { return Err(VolumeError::NotFound); } - let nm = self.nm.as_ref().ok_or(VolumeError::NotFound)?; + let nm = self.nm_or_not_found()?; let dat_size = self.dat_file_size().map_err(|e| VolumeError::Io(e))?; let version = self.version(); diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index 9e27fb4b2..702f3ed39 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -22,6 +22,11 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSize v.dataFileAccessLock.RLock() defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + glog.V(0).Infof("volume %d: needle map not loaded; read returns not-found", v.Id) + return -1, ErrorNotFound + } + nv, ok := v.nm.Get(n.Id) if !ok || nv.Offset.IsZero() { return -1, ErrorNotFound @@ -108,6 +113,13 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr if readOption.HasSlowRead { v.dataFileAccessLock.RLock() } + if v.nm == nil { + if readOption.HasSlowRead { + v.dataFileAccessLock.RUnlock() + } + glog.V(0).Infof("volume %d: needle map not loaded; read returns not-found", v.Id) + return ErrorNotFound + } nv, ok := v.nm.Get(n.Id) if readOption.HasSlowRead { v.dataFileAccessLock.RUnlock() @@ -146,6 +158,13 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr } // possibly re-read needle offset if volume is compacted if readOption.VolumeRevision != v.SuperBlock.CompactionRevision { + if v.nm == nil { + if readOption.HasSlowRead { + v.dataFileAccessLock.RUnlock() + } + glog.V(0).Infof("volume %d: needle map not loaded mid-read", v.Id) + return ErrorNotFound + } // the volume is compacted nv, ok = v.nm.Get(n.Id) if !ok || nv.Offset.IsZero() { diff --git a/weed/storage/volume_read_test.go b/weed/storage/volume_read_test.go index d3fb31c64..45ca55e8e 100644 --- a/weed/storage/volume_read_test.go +++ b/weed/storage/volume_read_test.go @@ -1,6 +1,7 @@ package storage import ( + "bytes" "testing" "github.com/seaweedfs/seaweedfs/weed/storage/needle" @@ -9,6 +10,35 @@ import ( "github.com/stretchr/testify/assert" ) +func TestReadNeedleNilNeedleMap(t *testing.T) { + dir := t.TempDir() + + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0) + if err != nil { + t.Fatalf("volume creation: %v", err) + } + defer v.Close() + + v.dataFileAccessLock.Lock() + if v.nm != nil { + v.nm.Close() + v.nm = nil + } + v.dataFileAccessLock.Unlock() + + n := new(needle.Needle) + n.Id = types.Uint64ToNeedleId(1) + + if _, err := v.readNeedle(n, &ReadOption{}, nil); err != ErrorNotFound { + t.Fatalf("readNeedle: want ErrorNotFound, got %v", err) + } + + err = v.readNeedleDataInto(n, &ReadOption{ReadBufferSize: 1024}, &bytes.Buffer{}, 0, 0) + if err != ErrorNotFound { + t.Fatalf("readNeedleDataInto: want ErrorNotFound, got %v", err) + } +} + func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) { dir := t.TempDir()