diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go index abc7d9ac0..279a842dd 100644 --- a/weed/util/log_buffer/log_read_stateless.go +++ b/weed/util/log_buffer/log_read_stateless.go @@ -310,17 +310,6 @@ func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_ return nil, false } -// invalidateCachedDiskChunk removes a chunk from the cache -// This is called when cached data is found to be incomplete or incorrect -func invalidateCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) { - logBuffer.diskChunkCache.mu.Lock() - defer logBuffer.diskChunkCache.mu.Unlock() - - if _, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists { - delete(logBuffer.diskChunkCache.chunks, chunkStartOffset) - } -} - // cacheDiskChunk stores a disk chunk in the cache with LRU eviction func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) { logBuffer.diskChunkCache.mu.Lock() @@ -472,63 +461,6 @@ func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, max return messages, nextOffset, totalBytes, nil } -// readMessagesFromDisk reads messages from disk using the ReadFromDiskFn -func (logBuffer *LogBuffer) readMessagesFromDisk(startOffset int64, maxMessages int, maxBytes int, highWaterMark int64) ( - messages []*filer_pb.LogEntry, - nextOffset int64, - highWaterMark2 int64, - endOfPartition bool, - err error, -) { - if logBuffer.ReadFromDiskFn == nil { - return nil, startOffset, highWaterMark, true, - fmt.Errorf("no disk read function configured") - } - - messages = make([]*filer_pb.LogEntry, 0, maxMessages) - nextOffset = startOffset - totalBytes := 0 - - // Use a simple callback to collect messages - collectFn := func(logEntry *filer_pb.LogEntry) (bool, error) { - // Check limits - if len(messages) >= maxMessages { - return true, nil // Done - } - - entrySize := 4 + len(logEntry.Data) + len(logEntry.Key) - if totalBytes+entrySize > maxBytes { - return true, nil // Done - } - - // Only include messages at or after startOffset - if logEntry.Offset >= startOffset { - messages = append(messages, logEntry) - totalBytes += entrySize - nextOffset = logEntry.Offset + 1 - } - - return false, nil // Continue - } - - // Read from disk - startPos := NewMessagePositionFromOffset(startOffset) - _, isDone, err := logBuffer.ReadFromDiskFn(startPos, 0, collectFn) - - if err != nil { - glog.Warningf("[StatelessRead] Disk read error: %v", err) - return nil, startOffset, highWaterMark, false, err - } - - glog.V(4).Infof("[StatelessRead] Read %d messages from disk, nextOffset=%d, isDone=%v", - len(messages), nextOffset, isDone) - - // If we read from disk and got no messages, and isDone is true, we're at the end - endOfPartition = isDone && len(messages) == 0 - - return messages, nextOffset, highWaterMark, endOfPartition, nil -} - // GetHighWaterMark returns the highest offset available in this partition // This is a lightweight operation for clients to check partition state func (logBuffer *LogBuffer) GetHighWaterMark() int64 {