mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
admin: convert filer address to gRPC form before dispatch (#9523)
The master returns each registered filer in pb.ServerAddress dual-port form (host:httpPort.grpcPort, e.g. 10.0.0.1:8888.18888). The admin's plugin context builder forwarded that string verbatim as filer_grpc_address, so workers calling grpc.DialContext on it failed every job in ~3ms with "dial tcp: lookup tcp/8888.18888: unknown port". Run each entry through pb.ServerAddress.ToGrpcAddress before populating ClusterContext.FilerGrpcAddresses. The lifecycle integration test now pins filer.port.grpc to a value that breaks the FILER_PORT+10000 assumption, and a new dispatch test drives the admin's /api/plugin/job-types/s3_lifecycle/run path end-to-end and asserts the dispatched job both reaches the filer and deletes the backdated object.
This commit is contained in:
@@ -9,6 +9,10 @@ S3_PORT := 8333
|
||||
MASTER_PORT := 9333
|
||||
VOLUME_PORT := 8080
|
||||
FILER_PORT := 8888
|
||||
# Pin the filer gRPC port off the FILER_PORT+10000 convention so any
|
||||
# code path that assumes the offset breaks here, not in production.
|
||||
FILER_GRPC_PORT := 18890
|
||||
ADMIN_PORT := 23646
|
||||
ACCESS_KEY ?= some_access_key1
|
||||
SECRET_KEY ?= some_secret_key1
|
||||
TEST_TIMEOUT := 10m
|
||||
@@ -39,6 +43,7 @@ start-server: build-weed
|
||||
@AWS_ACCESS_KEY_ID=$(ACCESS_KEY) AWS_SECRET_ACCESS_KEY=$(SECRET_KEY) $(WEED_BINARY) mini \
|
||||
-dir=$(SERVER_DIR) \
|
||||
-s3.port=$(S3_PORT) \
|
||||
-filer.port.grpc=$(FILER_GRPC_PORT) \
|
||||
> weed-test.log 2>&1 & \
|
||||
echo $$! > weed-server.pid
|
||||
@for i in $$(seq 1 90); do \
|
||||
@@ -68,7 +73,8 @@ test:
|
||||
S3_ENDPOINT=http://localhost:$(S3_PORT) \
|
||||
S3_GRPC_ENDPOINT=localhost:$$(($(S3_PORT) + 10000)) \
|
||||
MASTER_ENDPOINT=http://localhost:$(MASTER_PORT) \
|
||||
FILER_GRPC_ADDRESS=localhost:$$(($(FILER_PORT) + 10000)) \
|
||||
FILER_GRPC_ADDRESS=localhost:$(FILER_GRPC_PORT) \
|
||||
ADMIN_ENDPOINT=http://localhost:$(ADMIN_PORT) \
|
||||
go test -v -timeout $(TEST_TIMEOUT) -run $(TEST_PATTERN)
|
||||
|
||||
test-with-server: start-server
|
||||
|
||||
@@ -0,0 +1,113 @@
|
||||
package lifecycle
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Drives the real admin->worker dispatch path via the run endpoint and
|
||||
// asserts the dispatch reaches the filer (no dial error) and deletes
|
||||
// the backdated object. The Makefile pins filer.port.grpc off the
|
||||
// FILER_PORT+10000 convention so a raw-address forwarding regression
|
||||
// resurfaces here.
|
||||
func TestLifecycleAdminDispatchSucceedsWithCustomFilerGrpcPort(t *testing.T) {
|
||||
adminEndpoint := envOr("ADMIN_ENDPOINT", defaultAdminEndpoint)
|
||||
|
||||
c := s3Client(t)
|
||||
fc, fcClose := filerClient(t)
|
||||
defer fcClose()
|
||||
|
||||
bucket := uniqueBucket("admin-dispatch")
|
||||
mustCreateBucket(t, c, bucket)
|
||||
putExpirationLifecycle(t, c, bucket, "expire/", 1)
|
||||
const oldKey = "expire/old.txt"
|
||||
putObject(t, c, bucket, oldKey, "old")
|
||||
backdateMtime(t, fc, bucket, oldKey, 30)
|
||||
|
||||
waitForLifecycleWorkerReady(t, adminEndpoint)
|
||||
|
||||
// Lifecycle is a long-running batch; the run endpoint cancels it at
|
||||
// this timeout, which converts a healthy run into canceled_count=1.
|
||||
const runTimeoutSeconds = 30
|
||||
body, err := json.Marshal(map[string]any{
|
||||
"timeout_seconds": runTimeoutSeconds,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
req, err := http.NewRequestWithContext(
|
||||
context.Background(), http.MethodPost,
|
||||
adminEndpoint+"/api/plugin/job-types/s3_lifecycle/run",
|
||||
bytes.NewReader(body),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
require.NoError(t, err)
|
||||
defer resp.Body.Close()
|
||||
|
||||
var payload map[string]any
|
||||
require.NoError(t, json.NewDecoder(resp.Body).Decode(&payload))
|
||||
t.Logf("admin /api/plugin/job-types/s3_lifecycle/run response: %v", payload)
|
||||
require.Equal(t, http.StatusOK, resp.StatusCode, "admin run endpoint failed: %v", payload)
|
||||
|
||||
require.GreaterOrEqual(t, jsonNumber(t, payload, "detected_count"), 1)
|
||||
require.Equal(t, 0, jsonNumber(t, payload, "error_count"),
|
||||
"dispatched job errored — likely filer_grpc_address was raw host:httpPort.grpcPort")
|
||||
|
||||
require.Eventuallyf(t, func() bool {
|
||||
_, err := c.HeadObject(context.Background(), &s3.HeadObjectInput{
|
||||
Bucket: aws.String(bucket), Key: aws.String(oldKey),
|
||||
})
|
||||
return err != nil
|
||||
}, 30*time.Second, 500*time.Millisecond,
|
||||
"expected %s/%s to be deleted after admin-dispatched lifecycle run", bucket, oldKey)
|
||||
}
|
||||
|
||||
func jsonNumber(t *testing.T, payload map[string]any, key string) int {
|
||||
t.Helper()
|
||||
raw, ok := payload[key]
|
||||
require.Truef(t, ok, "response missing key %q: %v", key, payload)
|
||||
switch v := raw.(type) {
|
||||
case float64:
|
||||
return int(v)
|
||||
case json.Number:
|
||||
n, err := v.Int64()
|
||||
require.NoErrorf(t, err, "key %q is not an int: %v", key, raw)
|
||||
return int(n)
|
||||
default:
|
||||
t.Fatalf("key %q has unexpected type %T (%v)", key, raw, raw)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
func waitForLifecycleWorkerReady(t *testing.T, adminEndpoint string) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(30 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
req, err := http.NewRequest(http.MethodGet, adminEndpoint+"/api/plugin/scheduler-states", nil)
|
||||
require.NoError(t, err)
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
var payload any
|
||||
_ = json.NewDecoder(resp.Body).Decode(&payload)
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusOK && strings.Contains(fmt.Sprintf("%v", payload), "s3_lifecycle") {
|
||||
return
|
||||
}
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("admin never reported an s3_lifecycle-capable worker")
|
||||
}
|
||||
@@ -40,12 +40,14 @@ const (
|
||||
defaultS3Endpoint = "http://localhost:8333"
|
||||
defaultS3GrpcEndpoint = "localhost:18333"
|
||||
defaultMasterEndpt = "http://localhost:9333"
|
||||
defaultFilerGRPC = "localhost:18888"
|
||||
defaultAdminEndpoint = "http://localhost:23646"
|
||||
// Pinned off the FILER_PORT+10000 convention; see Makefile.
|
||||
defaultFilerGRPC = "localhost:18890"
|
||||
bucketLifecycleXMLKey = "s3-bucket-lifecycle-configuration-xml"
|
||||
bucketsPath = "/buckets"
|
||||
accessKey = "some_access_key1"
|
||||
secretKey = "some_secret_key1"
|
||||
region = "us-east-1"
|
||||
secretKey = "some_secret_key1"
|
||||
region = "us-east-1"
|
||||
)
|
||||
|
||||
func envOr(key, def string) string {
|
||||
|
||||
Reference in New Issue
Block a user