From b4289abb0a2e6b89338c9b321e683e8d2ff58a39 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 17 May 2026 11:33:54 -0700 Subject: [PATCH] admin: convert filer address to gRPC form before dispatch (#9523) The master returns each registered filer in pb.ServerAddress dual-port form (host:httpPort.grpcPort, e.g. 10.0.0.1:8888.18888). The admin's plugin context builder forwarded that string verbatim as filer_grpc_address, so workers calling grpc.DialContext on it failed every job in ~3ms with "dial tcp: lookup tcp/8888.18888: unknown port". Run each entry through pb.ServerAddress.ToGrpcAddress before populating ClusterContext.FilerGrpcAddresses. The lifecycle integration test now pins filer.port.grpc to a value that breaks the FILER_PORT+10000 assumption, and a new dispatch test drives the admin's /api/plugin/job-types/s3_lifecycle/run path end-to-end and asserts the dispatched job both reaches the filer and deletes the backdated object. --- .github/workflows/s3-go-tests.yml | 1 + test/s3/lifecycle/Makefile | 8 +- .../s3_lifecycle_admin_dispatch_test.go | 113 ++++++++++++++++++ test/s3/lifecycle/s3_lifecycle_test.go | 8 +- weed/admin/dash/plugin_api.go | 10 +- 5 files changed, 133 insertions(+), 7 deletions(-) create mode 100644 test/s3/lifecycle/s3_lifecycle_admin_dispatch_test.go diff --git a/.github/workflows/s3-go-tests.yml b/.github/workflows/s3-go-tests.yml index b6291cc96..ffd7c2db2 100644 --- a/.github/workflows/s3-go-tests.yml +++ b/.github/workflows/s3-go-tests.yml @@ -257,6 +257,7 @@ jobs: # TTL-pinned bucket collections piled up in a single run. test: - TestLifecycleAbortIncompleteMultipartUpload + - TestLifecycleAdminDispatchSucceedsWithCustomFilerGrpcPort - TestLifecycleBootstrapWalkOnExistingObjects - TestLifecycleConfigUpdateBetweenSweeps - TestLifecycleDeleteBucketLifecycleStopsDispatching diff --git a/test/s3/lifecycle/Makefile b/test/s3/lifecycle/Makefile index 26612ca76..5fafcb10e 100644 --- a/test/s3/lifecycle/Makefile +++ b/test/s3/lifecycle/Makefile @@ -9,6 +9,10 @@ S3_PORT := 8333 MASTER_PORT := 9333 VOLUME_PORT := 8080 FILER_PORT := 8888 +# Pin the filer gRPC port off the FILER_PORT+10000 convention so any +# code path that assumes the offset breaks here, not in production. +FILER_GRPC_PORT := 18890 +ADMIN_PORT := 23646 ACCESS_KEY ?= some_access_key1 SECRET_KEY ?= some_secret_key1 TEST_TIMEOUT := 10m @@ -39,6 +43,7 @@ start-server: build-weed @AWS_ACCESS_KEY_ID=$(ACCESS_KEY) AWS_SECRET_ACCESS_KEY=$(SECRET_KEY) $(WEED_BINARY) mini \ -dir=$(SERVER_DIR) \ -s3.port=$(S3_PORT) \ + -filer.port.grpc=$(FILER_GRPC_PORT) \ > weed-test.log 2>&1 & \ echo $$! > weed-server.pid @for i in $$(seq 1 90); do \ @@ -68,7 +73,8 @@ test: S3_ENDPOINT=http://localhost:$(S3_PORT) \ S3_GRPC_ENDPOINT=localhost:$$(($(S3_PORT) + 10000)) \ MASTER_ENDPOINT=http://localhost:$(MASTER_PORT) \ - FILER_GRPC_ADDRESS=localhost:$$(($(FILER_PORT) + 10000)) \ + FILER_GRPC_ADDRESS=localhost:$(FILER_GRPC_PORT) \ + ADMIN_ENDPOINT=http://localhost:$(ADMIN_PORT) \ go test -v -timeout $(TEST_TIMEOUT) -run $(TEST_PATTERN) test-with-server: start-server diff --git a/test/s3/lifecycle/s3_lifecycle_admin_dispatch_test.go b/test/s3/lifecycle/s3_lifecycle_admin_dispatch_test.go new file mode 100644 index 000000000..58c521f87 --- /dev/null +++ b/test/s3/lifecycle/s3_lifecycle_admin_dispatch_test.go @@ -0,0 +1,113 @@ +package lifecycle + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/stretchr/testify/require" +) + +// Drives the real admin->worker dispatch path via the run endpoint and +// asserts the dispatch reaches the filer (no dial error) and deletes +// the backdated object. The Makefile pins filer.port.grpc off the +// FILER_PORT+10000 convention so a raw-address forwarding regression +// resurfaces here. +func TestLifecycleAdminDispatchSucceedsWithCustomFilerGrpcPort(t *testing.T) { + adminEndpoint := envOr("ADMIN_ENDPOINT", defaultAdminEndpoint) + + c := s3Client(t) + fc, fcClose := filerClient(t) + defer fcClose() + + bucket := uniqueBucket("admin-dispatch") + mustCreateBucket(t, c, bucket) + putExpirationLifecycle(t, c, bucket, "expire/", 1) + const oldKey = "expire/old.txt" + putObject(t, c, bucket, oldKey, "old") + backdateMtime(t, fc, bucket, oldKey, 30) + + waitForLifecycleWorkerReady(t, adminEndpoint) + + // Lifecycle is a long-running batch; the run endpoint cancels it at + // this timeout, which converts a healthy run into canceled_count=1. + const runTimeoutSeconds = 30 + body, err := json.Marshal(map[string]any{ + "timeout_seconds": runTimeoutSeconds, + }) + require.NoError(t, err) + req, err := http.NewRequestWithContext( + context.Background(), http.MethodPost, + adminEndpoint+"/api/plugin/job-types/s3_lifecycle/run", + bytes.NewReader(body), + ) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + var payload map[string]any + require.NoError(t, json.NewDecoder(resp.Body).Decode(&payload)) + t.Logf("admin /api/plugin/job-types/s3_lifecycle/run response: %v", payload) + require.Equal(t, http.StatusOK, resp.StatusCode, "admin run endpoint failed: %v", payload) + + require.GreaterOrEqual(t, jsonNumber(t, payload, "detected_count"), 1) + require.Equal(t, 0, jsonNumber(t, payload, "error_count"), + "dispatched job errored — likely filer_grpc_address was raw host:httpPort.grpcPort") + + require.Eventuallyf(t, func() bool { + _, err := c.HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String(bucket), Key: aws.String(oldKey), + }) + return err != nil + }, 30*time.Second, 500*time.Millisecond, + "expected %s/%s to be deleted after admin-dispatched lifecycle run", bucket, oldKey) +} + +func jsonNumber(t *testing.T, payload map[string]any, key string) int { + t.Helper() + raw, ok := payload[key] + require.Truef(t, ok, "response missing key %q: %v", key, payload) + switch v := raw.(type) { + case float64: + return int(v) + case json.Number: + n, err := v.Int64() + require.NoErrorf(t, err, "key %q is not an int: %v", key, raw) + return int(n) + default: + t.Fatalf("key %q has unexpected type %T (%v)", key, raw, raw) + return 0 + } +} + +func waitForLifecycleWorkerReady(t *testing.T, adminEndpoint string) { + t.Helper() + deadline := time.Now().Add(30 * time.Second) + for time.Now().Before(deadline) { + req, err := http.NewRequest(http.MethodGet, adminEndpoint+"/api/plugin/scheduler-states", nil) + require.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + if err != nil { + time.Sleep(250 * time.Millisecond) + continue + } + var payload any + _ = json.NewDecoder(resp.Body).Decode(&payload) + resp.Body.Close() + if resp.StatusCode == http.StatusOK && strings.Contains(fmt.Sprintf("%v", payload), "s3_lifecycle") { + return + } + time.Sleep(250 * time.Millisecond) + } + t.Fatalf("admin never reported an s3_lifecycle-capable worker") +} diff --git a/test/s3/lifecycle/s3_lifecycle_test.go b/test/s3/lifecycle/s3_lifecycle_test.go index 78ec7db3c..44c6feea0 100644 --- a/test/s3/lifecycle/s3_lifecycle_test.go +++ b/test/s3/lifecycle/s3_lifecycle_test.go @@ -40,12 +40,14 @@ const ( defaultS3Endpoint = "http://localhost:8333" defaultS3GrpcEndpoint = "localhost:18333" defaultMasterEndpt = "http://localhost:9333" - defaultFilerGRPC = "localhost:18888" + defaultAdminEndpoint = "http://localhost:23646" + // Pinned off the FILER_PORT+10000 convention; see Makefile. + defaultFilerGRPC = "localhost:18890" bucketLifecycleXMLKey = "s3-bucket-lifecycle-configuration-xml" bucketsPath = "/buckets" accessKey = "some_access_key1" - secretKey = "some_secret_key1" - region = "us-east-1" + secretKey = "some_secret_key1" + region = "us-east-1" ) func envOr(key, def string) string { diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index 8eacd1557..814e225ad 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -16,6 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/plugin" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" "google.golang.org/protobuf/encoding/protojson" @@ -767,17 +768,20 @@ func (s *AdminServer) buildDefaultPluginClusterContext() *plugin_pb.ClusterConte clusterContext.MasterGrpcAddresses = append(clusterContext.MasterGrpcAddresses, masterAddress) } + // Master returns filers in dual-port form (host:httpPort.grpcPort); + // workers dial these directly, so collapse to host:grpcPort first. filerSeen := map[string]struct{}{} for _, filer := range s.GetAllFilers() { filer = strings.TrimSpace(filer) if filer == "" { continue } - if _, exists := filerSeen[filer]; exists { + grpcAddr := pb.ServerAddress(filer).ToGrpcAddress() + if _, exists := filerSeen[grpcAddr]; exists { continue } - filerSeen[filer] = struct{}{} - clusterContext.FilerGrpcAddresses = append(clusterContext.FilerGrpcAddresses, filer) + filerSeen[grpcAddr] = struct{}{} + clusterContext.FilerGrpcAddresses = append(clusterContext.FilerGrpcAddresses, grpcAddr) } volumeSeen := map[string]struct{}{}