mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
79859fc21d
* feat(s3/versioning): grep-able heal logs + scan-anomaly diagnostics + audit cmd Three diagnostic additions on top of #9460, all aimed at making the next production incident faster to triage than the one we just spent hours on. 1. [versioning-heal] grep prefix on every heal-related log line, with a small fixed event vocabulary (produced / surfaced / healed / enqueue / drain / retry / gave_up / anomaly / clear_failed / heal_persist_failed / teardown_failed / queue_full). One grep gives operators a single event stream across the produce-to-drain lifecycle. 2. Escalate the "scanned N>0 entries but no valid latest" case in updateLatestVersionAfterDeletion from V(1) Infof to a Warning that names the orphan entries it saw. This is the listing-after-rm inconsistency signature that pinned down 259064a8's failure — it should not be invisible at default log levels. 3. New weed shell command `s3.versions.audit -prefix <path> [-v] [-heal]` that walks .versions/ directories under a prefix and reports the stranded population. With -heal it clears the latest-version pointer in place on stranded directories so subsequent reads return a clean NoSuchKey instead of replaying the 10-retry self-heal loop. * fix(s3/versioning): audit pagination, exclusive categories, ctx-aware retry Address PR review: 1. s3.versions.audit walked only the first 1024-entry page of each .versions/ directory, false-positiving "stranded" on large dirs. Loop until the page returns < 1024 entries, advancing startName. 2. clean and orphan-only categories double-counted when a directory had no pointer and at least one orphan: incremented both. Make them mutually exclusive so report totals sum to versionsDirs. 3. retryFilerOp's worst-case ~6.3s backoff was a bare time.Sleep, non-interruptible by ctx. A server shutdown / client disconnect would wait out the budget per in-flight delete. Thread ctx through deleteSpecificObjectVersion -> repointLatestBeforeDeletion / updateLatestVersionAfterDeletion -> retryFilerOp; backoff now uses a select{<-ctx.Done(), <-timer.C}. HTTP handlers pass r.Context(); gRPC lifecycle handlers pass the stream ctx. New test pins the behavior: cancelling ctx mid-backoff returns ctx.Err() in <500ms instead of blocking ~6.3s. * fix(s3/versioning): clearStale outcome + escape grep-able log fields Two coderabbit follow-ups: 1. Successful pointer clear should suppress `produced`. updateLatestVersionAfterDeletion's transient-rm fallback called clearStaleLatestVersionPointer best-effort, then unconditionally returned retryErr. The caller (deleteSpecificObjectVersion) saw the error and emitted `event=produced` + enqueued the reconciler, even though clearStaleLatestVersionPointer had just driven the pointer to consistency and the next reader would get NoSuchKey via the clean-miss path. Make clearStaleLatestVersionPointer return cleared bool; on success the caller returns nil so neither produced nor the reconciler enqueue fires. Concurrent-writer aborts, re-scan errors, and CAS mismatches still report false so genuinely stranded state keeps surfacing. 2. Escape user-controlled fields in heal log lines. versioningHealInfof / Warningf / Errorf interpolated raw bucket / key / filename / err text into a single-space-separated line. An S3 key (or error string from gRPC) containing whitespace, newlines, or `event=...` could split one event into multiple tokens and spoof fake fields downstream. Sanitize each arg in the helper: safe values pass through; anything with whitespace, quotes, control chars, or backslashes is replaced with its strconv.Quote form. No caller changes — the format strings remain unchanged. Tests pin both behaviors: sanitization table covers the field boundary cases; an end-to-end shape test confirms a key containing `event=spoof` stays inside a single quoted token.
171 lines
5.9 KiB
Go
171 lines
5.9 KiB
Go
package s3api
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// TestVersionsHealQueue_DedupOnEnqueue ensures multiple enqueues of the
|
|
// same bucket/object collapse into a single pending entry, so a hot
|
|
// failure path doesn't bloat the queue.
|
|
func TestVersionsHealQueue_DedupOnEnqueue(t *testing.T) {
|
|
q := newVersionsHealQueue()
|
|
for i := 0; i < 5; i++ {
|
|
q.Enqueue("b", "obj")
|
|
}
|
|
assert.Equal(t, 1, q.Len(), "duplicate enqueues collapse")
|
|
}
|
|
|
|
// TestVersionsHealQueue_CapacityCap ensures the queue refuses growth
|
|
// past the static cap and logs at V(1) instead of OOM-ing.
|
|
func TestVersionsHealQueue_CapacityCap(t *testing.T) {
|
|
q := newVersionsHealQueue()
|
|
for i := 0; i < versionsHealQueueCapacity+50; i++ {
|
|
q.Enqueue("b", string(rune(i)))
|
|
}
|
|
assert.Equal(t, versionsHealQueueCapacity, q.Len(), "queue clamps at capacity")
|
|
}
|
|
|
|
// TestVersionsHealQueue_PopReadyOnlyDueItems checks that nextRetry
|
|
// gating keeps not-yet-ready candidates in the queue.
|
|
func TestVersionsHealQueue_PopReadyOnlyDueItems(t *testing.T) {
|
|
q := newVersionsHealQueue()
|
|
q.Enqueue("b", "due")
|
|
|
|
// Inject a deferred candidate directly so we control its nextRetry.
|
|
q.pending[versionsHealKey("b", "later")] = &versionsHealCandidate{
|
|
bucket: "b",
|
|
object: "later",
|
|
enqueued: time.Now(),
|
|
nextRetry: time.Now().Add(10 * time.Minute),
|
|
}
|
|
|
|
due := q.popReady(time.Now())
|
|
require.Len(t, due, 1, "only the due candidate pops")
|
|
assert.Equal(t, "due", due[0].object)
|
|
assert.Equal(t, 1, q.Len(), "deferred candidate still queued")
|
|
}
|
|
|
|
// TestVersionsHealQueue_RequeueWithBackoff verifies failed candidates
|
|
// re-enter the queue with an extended nextRetry.
|
|
func TestVersionsHealQueue_RequeueWithBackoff(t *testing.T) {
|
|
q := newVersionsHealQueue()
|
|
c := &versionsHealCandidate{bucket: "b", object: "obj", attempts: 1}
|
|
|
|
q.requeue(c, 500*time.Millisecond)
|
|
assert.Equal(t, 1, q.Len())
|
|
|
|
now := time.Now()
|
|
due := q.popReady(now)
|
|
assert.Empty(t, due, "not yet due immediately after requeue")
|
|
|
|
due = q.popReady(now.Add(time.Second))
|
|
assert.Len(t, due, 1, "due after backoff window passes")
|
|
}
|
|
|
|
// TestVersionsHealQueue_GiveUpAfterMaxAttempts ensures we don't loop
|
|
// forever against a deterministically broken state.
|
|
func TestVersionsHealQueue_GiveUpAfterMaxAttempts(t *testing.T) {
|
|
q := newVersionsHealQueue()
|
|
c := &versionsHealCandidate{bucket: "b", object: "obj", attempts: versionsHealMaxRetries}
|
|
q.requeue(c, time.Millisecond)
|
|
assert.Equal(t, 0, q.Len(), "candidate at max attempts is dropped, read-path heal still covers it")
|
|
}
|
|
|
|
// TestRetryFilerOp_SucceedsBeforeExhaustion confirms a flaky op that
|
|
// eventually succeeds is reported as success without surfacing prior
|
|
// errors.
|
|
func TestRetryFilerOp_SucceedsBeforeExhaustion(t *testing.T) {
|
|
calls := 0
|
|
err := retryFilerOp(context.Background(), "test", func() error {
|
|
calls++
|
|
if calls < 3 {
|
|
return errors.New("transient")
|
|
}
|
|
return nil
|
|
})
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, 3, calls, "stops calling once the op succeeds")
|
|
}
|
|
|
|
// TestRetryFilerOp_PropagatesAfterExhaustion confirms a deterministic
|
|
// failure is wrapped with the attempt count so operators can tell at
|
|
// a glance whether the underlying issue is transient.
|
|
func TestRetryFilerOp_PropagatesAfterExhaustion(t *testing.T) {
|
|
calls := 0
|
|
err := retryFilerOp(context.Background(), "test", func() error {
|
|
calls++
|
|
return errors.New("permanent")
|
|
})
|
|
require.Error(t, err)
|
|
assert.Equal(t, updateLatestRetryAttempts, calls, "ran the full retry budget")
|
|
assert.Contains(t, err.Error(), "exhausted")
|
|
assert.Contains(t, err.Error(), "permanent", "underlying error preserved")
|
|
}
|
|
|
|
// TestRetryFilerOp_ContextCancelInterruptsBackoff confirms that a ctx
|
|
// canceled mid-backoff returns ctx.Err() immediately instead of
|
|
// blocking until the worst-case ~6.3s retry budget elapses. Server
|
|
// shutdown / client disconnect relies on this to drain in-flight
|
|
// retries promptly.
|
|
func TestRetryFilerOp_ContextCancelInterruptsBackoff(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
// Cancel after the first failed attempt has scheduled its backoff
|
|
// but well before the timer fires.
|
|
go func() {
|
|
time.Sleep(30 * time.Millisecond)
|
|
cancel()
|
|
}()
|
|
calls := 0
|
|
start := time.Now()
|
|
err := retryFilerOp(ctx, "test", func() error {
|
|
calls++
|
|
return errors.New("transient")
|
|
})
|
|
elapsed := time.Since(start)
|
|
require.Error(t, err)
|
|
assert.ErrorIs(t, err, context.Canceled, "must surface ctx.Err verbatim")
|
|
assert.Less(t, elapsed, 500*time.Millisecond, "ctx cancel must interrupt the backoff sleep")
|
|
assert.Less(t, calls, updateLatestRetryAttempts, "ctx cancel must short-circuit before exhausting the retry budget")
|
|
}
|
|
|
|
// TestRetryFilerOp_TerminalErrorsShortCircuit confirms that errors which
|
|
// won't change on retry (NotFound, context cancellation) are returned
|
|
// immediately and unwrapped, without burning the retry budget.
|
|
func TestRetryFilerOp_TerminalErrorsShortCircuit(t *testing.T) {
|
|
cases := []struct {
|
|
name string
|
|
err error
|
|
}{
|
|
{"filer_pb.ErrNotFound", filer_pb.ErrNotFound},
|
|
{"wrapped filer_pb.ErrNotFound", fmt.Errorf("wrap: %w", filer_pb.ErrNotFound)},
|
|
{"grpc NotFound status", status.Error(codes.NotFound, "missing")},
|
|
{"context canceled", context.Canceled},
|
|
{"context deadline exceeded", context.DeadlineExceeded},
|
|
}
|
|
for _, tc := range cases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
start := time.Now()
|
|
calls := 0
|
|
err := retryFilerOp(context.Background(), "test", func() error {
|
|
calls++
|
|
return tc.err
|
|
})
|
|
elapsed := time.Since(start)
|
|
require.Error(t, err)
|
|
assert.Equal(t, 1, calls, "terminal error must not be retried")
|
|
assert.Less(t, elapsed, 50*time.Millisecond, "terminal error must not delay")
|
|
assert.NotContains(t, err.Error(), "exhausted", "terminal error must not be wrapped with retry-budget prefix")
|
|
})
|
|
}
|
|
}
|