mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(mq): don't cache topic non-existence on transient filer errors
TopicExists and getTopicConfFromCache negative-cached a topic for the full 30s TTL whenever a filer lookup failed for any reason, including timeouts. A topic created earlier then looked gone until the TTL expired, and the metadata auto-create path couldn't heal it (CreateTopic rejects an already-persisted conf), so producers saw UNKNOWN_TOPIC_OR_PARTITION. Only negative-cache on a definitive ErrNotFound; let transient errors fall through and retry against the filer.
This commit is contained in:
@@ -2,6 +2,7 @@ package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -232,14 +233,20 @@ func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.Top
|
||||
exists := false
|
||||
err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
topicPath := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, request.Topic.Namespace, request.Topic.Name)
|
||||
confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
||||
_, lookupErr := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: topicPath,
|
||||
Name: filer.TopicConfFile,
|
||||
})
|
||||
if err == nil && confResp.Entry != nil {
|
||||
if lookupErr == nil {
|
||||
exists = true
|
||||
return nil
|
||||
}
|
||||
return nil // Don't propagate error, just check existence
|
||||
// A definitive "not found" means the topic does not exist; surface any
|
||||
// other (transient) error so we never cache a false negative.
|
||||
if errors.Is(lookupErr, filer_pb.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
return lookupErr
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -3,6 +3,7 @@ package broker
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
@@ -77,14 +78,19 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config
|
||||
conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
|
||||
|
||||
if readConfErr != nil {
|
||||
// Negative cache: topic doesn't exist
|
||||
b.topicCacheMu.Lock()
|
||||
b.topicCache[topicKey] = &topicCacheEntry{
|
||||
conf: nil,
|
||||
expiresAt: time.Now().Add(b.topicCacheTTL),
|
||||
// Only negative-cache when the filer definitively reports topic.conf is
|
||||
// missing. Transient errors (timeouts, connection blips) must not poison
|
||||
// the cache, otherwise the topic appears to vanish for the full TTL even
|
||||
// though it still exists on disk.
|
||||
if errors.Is(readConfErr, filer_pb.ErrNotFound) {
|
||||
b.topicCacheMu.Lock()
|
||||
b.topicCache[topicKey] = &topicCacheEntry{
|
||||
conf: nil,
|
||||
expiresAt: time.Now().Add(b.topicCacheTTL),
|
||||
}
|
||||
b.topicCacheMu.Unlock()
|
||||
glog.V(4).Infof("Topic cached as non-existent: %s", topicKey)
|
||||
}
|
||||
b.topicCacheMu.Unlock()
|
||||
glog.V(4).Infof("Topic cached as non-existent: %s", topicKey)
|
||||
return nil, fmt.Errorf("topic %v not found: %w", t, readConfErr)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user