diff --git a/weed/mount/error_classifier.go b/weed/mount/error_classifier.go index 893b893b9..6b55a610f 100644 --- a/weed/mount/error_classifier.go +++ b/weed/mount/error_classifier.go @@ -47,3 +47,28 @@ func grpcErrorToFuseStatus(err error) fuse.Status { return fuse.EIO } + +// isRetryableFilerError reports whether a filer RPC error looks transient +// enough to retry. It takes a conservative whitelist approach: only errors +// that clearly describe a permanent application-level failure +// (NotFound/AlreadyExists/InvalidArgument/PermissionDenied/Unauthenticated/ +// FailedPrecondition) short-circuit the retry loop. Everything else — +// transport errors, Canceled/Unavailable/ResourceExhausted, or errors with no +// gRPC status — is treated as potentially transient and retried. +func isRetryableFilerError(err error) bool { + if err == nil { + return false + } + if s, ok := status.FromError(err); ok { + switch s.Code() { + case codes.NotFound, + codes.AlreadyExists, + codes.InvalidArgument, + codes.PermissionDenied, + codes.Unauthenticated, + codes.FailedPrecondition: + return false + } + } + return true +} diff --git a/weed/mount/error_classifier_test.go b/weed/mount/error_classifier_test.go new file mode 100644 index 000000000..1414cf2e8 --- /dev/null +++ b/weed/mount/error_classifier_test.go @@ -0,0 +1,126 @@ +package mount + +import ( + "errors" + "fmt" + "syscall" + "testing" + "time" + + "github.com/seaweedfs/go-fuse/v2/fuse" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Regression: wfs_save.go used to wrap the gRPC error with fmt.Errorf(... %v ...) +// which stringified the status and made status.FromError fall through to the +// default EIO mapping. Wrapping with %w must preserve the code so that +// codes.Canceled from a closing filer connection surfaces as ETIMEDOUT (a +// retryable hint for FUSE callers) rather than EIO. +func TestGrpcErrorToFuseStatusUnwrapsCanceledThroughFmtErrorf(t *testing.T) { + grpcErr := status.Error(codes.Canceled, "grpc: the client connection is closing") + + wrapped := fmt.Errorf("UpdateEntry dir /some/path: %w", grpcErr) + + got := grpcErrorToFuseStatus(wrapped) + want := fuse.Status(syscall.ETIMEDOUT) + if got != want { + t.Fatalf("grpcErrorToFuseStatus(canceled wrapped with %%w) = %v, want %v", got, want) + } +} + +// Guard against regressing the wrap verb: %v loses the gRPC status and the +// classifier must fall through to EIO. This test documents that behavior so +// anyone reverting the %w change sees the intent. +func TestGrpcErrorToFuseStatusDropsCanceledThroughPercentV(t *testing.T) { + grpcErr := status.Error(codes.Canceled, "grpc: the client connection is closing") + + wrapped := fmt.Errorf("UpdateEntry dir /some/path: %v", grpcErr) + + got := grpcErrorToFuseStatus(wrapped) + if got != fuse.EIO { + t.Fatalf("grpcErrorToFuseStatus(canceled wrapped with %%v) = %v, want EIO (regression guard)", got) + } +} + +func TestIsRetryableFilerError(t *testing.T) { + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"canceled", status.Error(codes.Canceled, "grpc: the client connection is closing"), true}, + {"unavailable", status.Error(codes.Unavailable, "connection refused"), true}, + {"deadline_exceeded", status.Error(codes.DeadlineExceeded, "deadline exceeded"), true}, + {"resource_exhausted", status.Error(codes.ResourceExhausted, "too many concurrent requests"), true}, + {"internal", status.Error(codes.Internal, "server crashed"), true}, + {"not_found", status.Error(codes.NotFound, "entry missing"), false}, + {"already_exists", status.Error(codes.AlreadyExists, "duplicate"), false}, + {"invalid_argument", status.Error(codes.InvalidArgument, "bad request"), false}, + {"permission_denied", status.Error(codes.PermissionDenied, "no access"), false}, + {"unauthenticated", status.Error(codes.Unauthenticated, "missing creds"), false}, + {"failed_precondition", status.Error(codes.FailedPrecondition, "not empty"), false}, + {"plain_error_retries", errors.New("random network glitch"), true}, + {"wrapped_canceled_still_retries", fmt.Errorf("ctx: %w", status.Error(codes.Canceled, "closing")), true}, + {"wrapped_not_found_still_skipped", fmt.Errorf("ctx: %w", status.Error(codes.NotFound, "gone")), false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := isRetryableFilerError(tc.err); got != tc.want { + t.Fatalf("isRetryableFilerError(%v) = %v, want %v", tc.err, got, tc.want) + } + }) + } +} + +// retryMetadataFlushIf must short-circuit on non-retryable errors so that +// synchronous FUSE ops (chmod/utimes/xattr) don't hang for ~7s on ENOENT/ +// EACCES/EINVAL. +func TestRetryMetadataFlushIfShortCircuitsOnPermanentError(t *testing.T) { + originalSleep := metadataFlushSleep + t.Cleanup(func() { + metadataFlushSleep = originalSleep + }) + metadataFlushSleep = func(_ time.Duration) { + t.Fatal("sleep should not be called when shouldRetry returns false") + } + + attempts := 0 + permanent := status.Error(codes.NotFound, "entry missing") + err := retryMetadataFlushIf(func() error { + attempts++ + return permanent + }, isRetryableFilerError, nil) + + if attempts != 1 { + t.Fatalf("attempts = %d, want 1 (permanent error should short-circuit)", attempts) + } + if !errors.Is(err, permanent) { + t.Fatalf("err = %v, want permanent sentinel", err) + } +} + +// Transient errors must keep retrying up to the attempt cap even when a +// predicate is supplied. +func TestRetryMetadataFlushIfRetriesTransientErrors(t *testing.T) { + originalSleep := metadataFlushSleep + t.Cleanup(func() { + metadataFlushSleep = originalSleep + }) + metadataFlushSleep = func(_ time.Duration) {} + + attempts := 0 + transient := status.Error(codes.Canceled, "grpc: the client connection is closing") + err := retryMetadataFlushIf(func() error { + attempts++ + return transient + }, isRetryableFilerError, nil) + + if attempts != metadataFlushRetries+1 { + t.Fatalf("attempts = %d, want %d", attempts, metadataFlushRetries+1) + } + if !errors.Is(err, transient) { + t.Fatalf("err = %v, want transient sentinel", err) + } +} diff --git a/weed/mount/metadata_flush_retry.go b/weed/mount/metadata_flush_retry.go index f4fe2bcc3..5a4f3c8d8 100644 --- a/weed/mount/metadata_flush_retry.go +++ b/weed/mount/metadata_flush_retry.go @@ -7,6 +7,15 @@ const metadataFlushRetries = 3 var metadataFlushSleep = time.Sleep func retryMetadataFlush(flush func() error, onRetry func(nextAttempt, totalAttempts int, backoff time.Duration, err error)) error { + return retryMetadataFlushIf(flush, nil, onRetry) +} + +// retryMetadataFlushIf is retryMetadataFlush with an optional shouldRetry +// predicate. If shouldRetry is nil or returns true, the flush is retried with +// exponential backoff; if it returns false, the error is returned immediately +// so callers don't pay retry latency on clearly permanent errors (e.g. +// ENOENT/EACCES/EINVAL from a synchronous setattr). +func retryMetadataFlushIf(flush func() error, shouldRetry func(error) bool, onRetry func(nextAttempt, totalAttempts int, backoff time.Duration, err error)) error { totalAttempts := metadataFlushRetries + 1 var err error for attempt := 1; attempt <= totalAttempts; attempt++ { @@ -17,6 +26,9 @@ func retryMetadataFlush(flush func() error, onRetry func(nextAttempt, totalAttem if attempt == totalAttempts { break } + if shouldRetry != nil && !shouldRetry(err) { + break + } backoff := time.Duration(1<