diff --git a/test/erasure_coding/admin_dockertest/ec_integration_test.go b/test/erasure_coding/admin_dockertest/ec_integration_test.go index b17924774..8d62f3279 100644 --- a/test/erasure_coding/admin_dockertest/ec_integration_test.go +++ b/test/erasure_coding/admin_dockertest/ec_integration_test.go @@ -247,7 +247,7 @@ func TestEcEndToEnd(t *testing.T) { "job_type": "erasure_coding", "admin_runtime": map[string]interface{}{ "enabled": true, - "detection_interval_seconds": 1, + "detection_interval_minutes": 1, "global_execution_concurrency": 4, "per_worker_execution_concurrency": 4, "max_jobs_per_detection": 100, diff --git a/weed/worker/tasks/iceberg/detection.go b/weed/worker/tasks/iceberg/detection.go index 8a279287c..d53f1c54b 100644 --- a/weed/worker/tasks/iceberg/detection.go +++ b/weed/worker/tasks/iceberg/detection.go @@ -487,7 +487,7 @@ func needsMaintenance(meta table.Metadata, config Config) bool { } // buildMaintenanceProposal creates a JobProposal for a table needing maintenance. -func (h *Handler) buildMaintenanceProposal(t tableInfo, filerAddress, resourceGroup string) *plugin_pb.JobProposal { +func (h *Handler) buildMaintenanceProposal(t tableInfo, filerGrpcAddress, resourceGroup string) *plugin_pb.JobProposal { dedupeKey := fmt.Sprintf("iceberg_maintenance:%s/%s/%s", t.BucketName, t.Namespace, t.TableName) snapshotCount := len(t.Metadata.Snapshots()) @@ -504,7 +504,7 @@ func (h *Handler) buildMaintenanceProposal(t tableInfo, filerAddress, resourceGr "namespace": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.Namespace}}, "table_name": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.TableName}}, "table_path": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: t.TablePath}}, - "filer_address": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: filerAddress}}, + "filer_address": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: filerGrpcAddress}}, }, Labels: map[string]string{ "bucket": t.BucketName, diff --git a/weed/worker/tasks/iceberg/exec_test.go b/weed/worker/tasks/iceberg/exec_test.go index 9d284eb41..2044fb2de 100644 --- a/weed/worker/tasks/iceberg/exec_test.go +++ b/weed/worker/tasks/iceberg/exec_test.go @@ -245,11 +245,9 @@ func startFakeFilerWithAddress(t *testing.T) (*fakeFilerServer, filer_pb.Seaweed time.Sleep(10 * time.Millisecond) } - // Return the address in ServerAddress format (host:httpPort.grpcPort) - // so that dialFiler resolves it correctly via ToGrpcAddress(). - _, portStr, _ := net.SplitHostPort(listener.Addr().String()) - serverAddr := fmt.Sprintf("127.0.0.1:0.%s", portStr) - return fakeServer, client, serverAddr + // Return the gRPC address in dialable form (host:grpcPort) since + // dialFiler now dials FilerGrpcAddresses verbatim. + return fakeServer, client, listener.Addr().String() } // --------------------------------------------------------------------------- @@ -999,24 +997,23 @@ func TestDetectWithFilters(t *testing.T) { func TestConnectToFilerSkipsUnreachableAddresses(t *testing.T) { handler := NewHandler(grpc.WithTransportCredentials(insecure.NewCredentials())) - _, _, liveAddr := startFakeFilerWithAddress(t) + _, _, liveGrpcAddr := startFakeFilerWithAddress(t) deadListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("listen for dead address: %v", err) } - _, deadPortStr, _ := net.SplitHostPort(deadListener.Addr().String()) + deadGrpcAddr := deadListener.Addr().String() _ = deadListener.Close() - deadAddr := fmt.Sprintf("127.0.0.1:0.%s", deadPortStr) - addr, conn, err := handler.connectToFiler(context.Background(), []string{deadAddr, liveAddr}) + grpcAddr, conn, err := handler.connectToFiler(context.Background(), []string{deadGrpcAddr, liveGrpcAddr}) if err != nil { t.Fatalf("connectToFiler failed: %v", err) } defer conn.Close() - if addr != liveAddr { - t.Fatalf("expected live address %q, got %q", liveAddr, addr) + if grpcAddr != liveGrpcAddr { + t.Fatalf("expected live address %q, got %q", liveGrpcAddr, grpcAddr) } } @@ -1027,11 +1024,10 @@ func TestConnectToFilerFailsWhenAllAddressesAreUnreachable(t *testing.T) { if err != nil { t.Fatalf("listen for dead address: %v", err) } - _, deadPortStr, _ := net.SplitHostPort(deadListener.Addr().String()) + deadGrpcAddr := deadListener.Addr().String() _ = deadListener.Close() - deadAddr := fmt.Sprintf("127.0.0.1:0.%s", deadPortStr) - _, _, err = handler.connectToFiler(context.Background(), []string{deadAddr}) + _, _, err = handler.connectToFiler(context.Background(), []string{deadGrpcAddr}) if err == nil { t.Fatal("expected connectToFiler to fail") } diff --git a/weed/worker/tasks/iceberg/handler.go b/weed/worker/tasks/iceberg/handler.go index 35b9967a9..32dfb28b5 100644 --- a/weed/worker/tasks/iceberg/handler.go +++ b/weed/worker/tasks/iceberg/handler.go @@ -376,12 +376,12 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq // Detection interval is managed by the scheduler via AdminRuntimeDefaults.DetectionIntervalMinutes. - // Get filer addresses from cluster context - filerAddresses := make([]string, 0) + // Get filer gRPC addresses from cluster context + filerGrpcAddresses := make([]string, 0) if request.ClusterContext != nil { - filerAddresses = append(filerAddresses, request.ClusterContext.FilerGrpcAddresses...) + filerGrpcAddresses = append(filerGrpcAddresses, request.ClusterContext.FilerGrpcAddresses...) } - if len(filerAddresses) == 0 { + if len(filerGrpcAddresses) == 0 { _ = sender.SendActivity(pluginworker.BuildDetectorActivity("skipped", "no filer addresses in cluster context", nil)) return h.sendEmptyDetection(sender) } @@ -396,7 +396,7 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq } // Connect to filer — try each address until one succeeds. - filerAddress, conn, err := h.connectToFiler(ctx, filerAddresses) + filerGrpcAddress, conn, err := h.connectToFiler(ctx, filerGrpcAddresses) if err != nil { return fmt.Errorf("connect to filer: %w", err) } @@ -424,7 +424,7 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq proposals := make([]*plugin_pb.JobProposal, 0, len(tables)) for _, t := range tables { - proposal := h.buildMaintenanceProposal(t, filerAddress, resourceGroupKey(t, resourceGroups.GroupBy)) + proposal := h.buildMaintenanceProposal(t, filerGrpcAddress, resourceGroupKey(t, resourceGroups.GroupBy)) proposals = append(proposals, proposal) } @@ -463,10 +463,10 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ namespace := readStringConfig(params, "namespace", "") tableName := readStringConfig(params, "table_name", "") tablePath := readStringConfig(params, "table_path", "") - filerAddress := readStringConfig(params, "filer_address", "") + filerGrpcAddress := readStringConfig(params, "filer_address", "") - if bucketName == "" || namespace == "" || tableName == "" || filerAddress == "" { - return fmt.Errorf("missing required parameters: bucket_name=%q, namespace=%q, table_name=%q, filer_address=%q", bucketName, namespace, tableName, filerAddress) + if bucketName == "" || namespace == "" || tableName == "" || filerGrpcAddress == "" { + return fmt.Errorf("missing required parameters: bucket_name=%q, namespace=%q, table_name=%q, filer_address=%q", bucketName, namespace, tableName, filerGrpcAddress) } // Reject path traversal in bucket/namespace/table names. for _, name := range []string{bucketName, namespace, tableName} { @@ -509,9 +509,9 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ } // Connect to filer - conn, err := h.dialFiler(ctx, filerAddress) + conn, err := h.dialFiler(ctx, filerGrpcAddress) if err != nil { - return fmt.Errorf("connect to filer %s: %w", filerAddress, err) + return fmt.Errorf("connect to filer %s: %w", filerGrpcAddress, err) } defer conn.Close() filerClient := filer_pb.NewSeaweedFilerClient(conn) @@ -640,11 +640,17 @@ func (h *Handler) sendEmptyDetection(sender pluginworker.DetectionSender) error }) } -func (h *Handler) dialFiler(ctx context.Context, address string) (*grpc.ClientConn, error) { +// dialFiler connects to a filer at the given gRPC address. The address is +// expected to be already dialable (host:grpcPort) as supplied via +// ClusterContext.FilerGrpcAddresses or a job proposal parameter; we don't +// run it through pb.ServerAddress.ToGrpcAddress because that helper's +// fallback adds +10000 to any single-port address, turning a real gRPC +// port like 18888 into a non-existent 28888. +func (h *Handler) dialFiler(ctx context.Context, grpcAddress string) (*grpc.ClientConn, error) { opCtx, opCancel := context.WithTimeout(ctx, filerConnectTimeout) defer opCancel() - conn, err := pb.GrpcDial(opCtx, pb.ServerAddress(address).ToGrpcAddress(), false, h.grpcDialOption) + conn, err := pb.GrpcDial(opCtx, grpcAddress, false, h.grpcDialOption) if err != nil { return nil, err } @@ -658,17 +664,17 @@ func (h *Handler) dialFiler(ctx context.Context, address string) (*grpc.ClientCo return conn, nil } -// connectToFiler tries each filer address in order and returns the first -// address whose gRPC connection and Ping request succeed. -func (h *Handler) connectToFiler(ctx context.Context, addresses []string) (string, *grpc.ClientConn, error) { +// connectToFiler tries each filer gRPC address in order and returns the +// first address whose gRPC connection and Ping request succeed. +func (h *Handler) connectToFiler(ctx context.Context, grpcAddresses []string) (string, *grpc.ClientConn, error) { var lastErr error - for _, addr := range addresses { - conn, err := h.dialFiler(ctx, addr) + for _, grpcAddr := range grpcAddresses { + conn, err := h.dialFiler(ctx, grpcAddr) if err != nil { - lastErr = fmt.Errorf("filer %s: %w", addr, err) + lastErr = fmt.Errorf("filer %s: %w", grpcAddr, err) continue } - return addr, conn, nil + return grpcAddr, conn, nil } if lastErr == nil { lastErr = fmt.Errorf("no filer addresses provided")