Files
seaweedfs/weed/s3api/s3api_versioning_reconciler_test.go
Chris Lu 79859fc21d feat(s3/versioning): grep-able heal logs + scan-anomaly diagnostics + audit cmd (#9468)
* feat(s3/versioning): grep-able heal logs + scan-anomaly diagnostics + audit cmd

Three diagnostic additions on top of #9460, all aimed at making the next
production incident faster to triage than the one we just spent hours on.

1. [versioning-heal] grep prefix on every heal-related log line, with a
   small fixed event vocabulary (produced / surfaced / healed / enqueue /
   drain / retry / gave_up / anomaly / clear_failed / heal_persist_failed
   / teardown_failed / queue_full). One grep gives operators a single
   event stream across the produce-to-drain lifecycle.

2. Escalate the "scanned N>0 entries but no valid latest" case in
   updateLatestVersionAfterDeletion from V(1) Infof to a Warning that
   names the orphan entries it saw. This is the listing-after-rm
   inconsistency signature that pinned down 259064a8's failure — it
   should not be invisible at default log levels.

3. New weed shell command `s3.versions.audit -prefix <path> [-v] [-heal]`
   that walks .versions/ directories under a prefix and reports the
   stranded population. With -heal it clears the latest-version pointer
   in place on stranded directories so subsequent reads return a clean
   NoSuchKey instead of replaying the 10-retry self-heal loop.

* fix(s3/versioning): audit pagination, exclusive categories, ctx-aware retry

Address PR review:

1. s3.versions.audit walked only the first 1024-entry page of each
   .versions/ directory, false-positiving "stranded" on large dirs.
   Loop until the page returns < 1024 entries, advancing startName.

2. clean and orphan-only categories double-counted when a directory
   had no pointer and at least one orphan: incremented both. Make them
   mutually exclusive so report totals sum to versionsDirs.

3. retryFilerOp's worst-case ~6.3s backoff was a bare time.Sleep,
   non-interruptible by ctx. A server shutdown / client disconnect
   would wait out the budget per in-flight delete. Thread ctx through
   deleteSpecificObjectVersion -> repointLatestBeforeDeletion /
   updateLatestVersionAfterDeletion -> retryFilerOp; backoff now uses
   a select{<-ctx.Done(), <-timer.C}. HTTP handlers pass r.Context();
   gRPC lifecycle handlers pass the stream ctx.

   New test pins the behavior: cancelling ctx mid-backoff returns
   ctx.Err() in <500ms instead of blocking ~6.3s.

* fix(s3/versioning): clearStale outcome + escape grep-able log fields

Two coderabbit follow-ups:

1. Successful pointer clear should suppress `produced`.
   updateLatestVersionAfterDeletion's transient-rm fallback called
   clearStaleLatestVersionPointer best-effort, then unconditionally
   returned retryErr. The caller (deleteSpecificObjectVersion) saw the
   error and emitted `event=produced` + enqueued the reconciler, even
   though clearStaleLatestVersionPointer had just driven the pointer to
   consistency and the next reader would get NoSuchKey via the
   clean-miss path. Make clearStaleLatestVersionPointer return cleared
   bool; on success the caller returns nil so neither produced nor the
   reconciler enqueue fires. Concurrent-writer aborts, re-scan errors,
   and CAS mismatches still report false so genuinely stranded state
   keeps surfacing.

2. Escape user-controlled fields in heal log lines.
   versioningHealInfof / Warningf / Errorf interpolated raw bucket /
   key / filename / err text into a single-space-separated line. An S3
   key (or error string from gRPC) containing whitespace, newlines, or
   `event=...` could split one event into multiple tokens and spoof
   fake fields downstream. Sanitize each arg in the helper: safe
   values pass through; anything with whitespace, quotes, control
   chars, or backslashes is replaced with its strconv.Quote form. No
   caller changes — the format strings remain unchanged.

Tests pin both behaviors: sanitization table covers the field
boundary cases; an end-to-end shape test confirms a key containing
`event=spoof` stays inside a single quoted token.
2026-05-13 10:48:58 -07:00

171 lines
5.9 KiB
Go

package s3api
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// TestVersionsHealQueue_DedupOnEnqueue ensures multiple enqueues of the
// same bucket/object collapse into a single pending entry, so a hot
// failure path doesn't bloat the queue.
func TestVersionsHealQueue_DedupOnEnqueue(t *testing.T) {
q := newVersionsHealQueue()
for i := 0; i < 5; i++ {
q.Enqueue("b", "obj")
}
assert.Equal(t, 1, q.Len(), "duplicate enqueues collapse")
}
// TestVersionsHealQueue_CapacityCap ensures the queue refuses growth
// past the static cap and logs at V(1) instead of OOM-ing.
func TestVersionsHealQueue_CapacityCap(t *testing.T) {
q := newVersionsHealQueue()
for i := 0; i < versionsHealQueueCapacity+50; i++ {
q.Enqueue("b", string(rune(i)))
}
assert.Equal(t, versionsHealQueueCapacity, q.Len(), "queue clamps at capacity")
}
// TestVersionsHealQueue_PopReadyOnlyDueItems checks that nextRetry
// gating keeps not-yet-ready candidates in the queue.
func TestVersionsHealQueue_PopReadyOnlyDueItems(t *testing.T) {
q := newVersionsHealQueue()
q.Enqueue("b", "due")
// Inject a deferred candidate directly so we control its nextRetry.
q.pending[versionsHealKey("b", "later")] = &versionsHealCandidate{
bucket: "b",
object: "later",
enqueued: time.Now(),
nextRetry: time.Now().Add(10 * time.Minute),
}
due := q.popReady(time.Now())
require.Len(t, due, 1, "only the due candidate pops")
assert.Equal(t, "due", due[0].object)
assert.Equal(t, 1, q.Len(), "deferred candidate still queued")
}
// TestVersionsHealQueue_RequeueWithBackoff verifies failed candidates
// re-enter the queue with an extended nextRetry.
func TestVersionsHealQueue_RequeueWithBackoff(t *testing.T) {
q := newVersionsHealQueue()
c := &versionsHealCandidate{bucket: "b", object: "obj", attempts: 1}
q.requeue(c, 500*time.Millisecond)
assert.Equal(t, 1, q.Len())
now := time.Now()
due := q.popReady(now)
assert.Empty(t, due, "not yet due immediately after requeue")
due = q.popReady(now.Add(time.Second))
assert.Len(t, due, 1, "due after backoff window passes")
}
// TestVersionsHealQueue_GiveUpAfterMaxAttempts ensures we don't loop
// forever against a deterministically broken state.
func TestVersionsHealQueue_GiveUpAfterMaxAttempts(t *testing.T) {
q := newVersionsHealQueue()
c := &versionsHealCandidate{bucket: "b", object: "obj", attempts: versionsHealMaxRetries}
q.requeue(c, time.Millisecond)
assert.Equal(t, 0, q.Len(), "candidate at max attempts is dropped, read-path heal still covers it")
}
// TestRetryFilerOp_SucceedsBeforeExhaustion confirms a flaky op that
// eventually succeeds is reported as success without surfacing prior
// errors.
func TestRetryFilerOp_SucceedsBeforeExhaustion(t *testing.T) {
calls := 0
err := retryFilerOp(context.Background(), "test", func() error {
calls++
if calls < 3 {
return errors.New("transient")
}
return nil
})
assert.NoError(t, err)
assert.Equal(t, 3, calls, "stops calling once the op succeeds")
}
// TestRetryFilerOp_PropagatesAfterExhaustion confirms a deterministic
// failure is wrapped with the attempt count so operators can tell at
// a glance whether the underlying issue is transient.
func TestRetryFilerOp_PropagatesAfterExhaustion(t *testing.T) {
calls := 0
err := retryFilerOp(context.Background(), "test", func() error {
calls++
return errors.New("permanent")
})
require.Error(t, err)
assert.Equal(t, updateLatestRetryAttempts, calls, "ran the full retry budget")
assert.Contains(t, err.Error(), "exhausted")
assert.Contains(t, err.Error(), "permanent", "underlying error preserved")
}
// TestRetryFilerOp_ContextCancelInterruptsBackoff confirms that a ctx
// canceled mid-backoff returns ctx.Err() immediately instead of
// blocking until the worst-case ~6.3s retry budget elapses. Server
// shutdown / client disconnect relies on this to drain in-flight
// retries promptly.
func TestRetryFilerOp_ContextCancelInterruptsBackoff(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// Cancel after the first failed attempt has scheduled its backoff
// but well before the timer fires.
go func() {
time.Sleep(30 * time.Millisecond)
cancel()
}()
calls := 0
start := time.Now()
err := retryFilerOp(ctx, "test", func() error {
calls++
return errors.New("transient")
})
elapsed := time.Since(start)
require.Error(t, err)
assert.ErrorIs(t, err, context.Canceled, "must surface ctx.Err verbatim")
assert.Less(t, elapsed, 500*time.Millisecond, "ctx cancel must interrupt the backoff sleep")
assert.Less(t, calls, updateLatestRetryAttempts, "ctx cancel must short-circuit before exhausting the retry budget")
}
// TestRetryFilerOp_TerminalErrorsShortCircuit confirms that errors which
// won't change on retry (NotFound, context cancellation) are returned
// immediately and unwrapped, without burning the retry budget.
func TestRetryFilerOp_TerminalErrorsShortCircuit(t *testing.T) {
cases := []struct {
name string
err error
}{
{"filer_pb.ErrNotFound", filer_pb.ErrNotFound},
{"wrapped filer_pb.ErrNotFound", fmt.Errorf("wrap: %w", filer_pb.ErrNotFound)},
{"grpc NotFound status", status.Error(codes.NotFound, "missing")},
{"context canceled", context.Canceled},
{"context deadline exceeded", context.DeadlineExceeded},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
start := time.Now()
calls := 0
err := retryFilerOp(context.Background(), "test", func() error {
calls++
return tc.err
})
elapsed := time.Since(start)
require.Error(t, err)
assert.Equal(t, 1, calls, "terminal error must not be retried")
assert.Less(t, elapsed, 50*time.Millisecond, "terminal error must not delay")
assert.NotContains(t, err.Error(), "exhausted", "terminal error must not be wrapped with retry-budget prefix")
})
}
}