diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 38da0dbb6..a09e76549 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -2,6 +2,7 @@ package broker import ( "context" + "errors" "fmt" "strings" "time" @@ -232,14 +233,20 @@ func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.Top exists := false err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { topicPath := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, request.Topic.Namespace, request.Topic.Name) - confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + _, lookupErr := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ Directory: topicPath, Name: filer.TopicConfFile, }) - if err == nil && confResp.Entry != nil { + if lookupErr == nil { exists = true + return nil } - return nil // Don't propagate error, just check existence + // A definitive "not found" means the topic does not exist; surface any + // other (transient) error so we never cache a false negative. + if errors.Is(lookupErr, filer_pb.ErrNotFound) { + return nil + } + return lookupErr }) if err != nil { diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 07322907d..9e908e485 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -3,6 +3,7 @@ package broker import ( "context" "encoding/binary" + "errors" "fmt" "io" "strings" @@ -77,14 +78,19 @@ func (b *MessageQueueBroker) getTopicConfFromCache(t topic.Topic) (*mq_pb.Config conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) if readConfErr != nil { - // Negative cache: topic doesn't exist - b.topicCacheMu.Lock() - b.topicCache[topicKey] = &topicCacheEntry{ - conf: nil, - expiresAt: time.Now().Add(b.topicCacheTTL), + // Only negative-cache when the filer definitively reports topic.conf is + // missing. Transient errors (timeouts, connection blips) must not poison + // the cache, otherwise the topic appears to vanish for the full TTL even + // though it still exists on disk. + if errors.Is(readConfErr, filer_pb.ErrNotFound) { + b.topicCacheMu.Lock() + b.topicCache[topicKey] = &topicCacheEntry{ + conf: nil, + expiresAt: time.Now().Add(b.topicCacheTTL), + } + b.topicCacheMu.Unlock() + glog.V(4).Infof("Topic cached as non-existent: %s", topicKey) } - b.topicCacheMu.Unlock() - glog.V(4).Infof("Topic cached as non-existent: %s", topicKey) return nil, fmt.Errorf("topic %v not found: %w", t, readConfErr) }