From 1de741737db07ace83766b570afdd8771d7770fc Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 4 May 2026 00:28:30 -0700 Subject: [PATCH] test(s3tables): add Apache Doris Iceberg catalog integration test (#9307) * test(s3tables): add Apache Doris Iceberg catalog integration test Adds an end-to-end smoke test that boots the apache/doris all-in-one container, registers SeaweedFS as an external Iceberg REST catalog (OAuth2 client_credentials), and validates metadata visibility plus the parquet read path against tables seeded via the Iceberg REST API and a PyIceberg writer container, mirroring the existing Trino, Spark, and Dremio coverage. Wires the test into a new s3-tables-tests workflow job. * test(s3tables): document weed shell -master flag format and fill in helper docstrings Restores the explanatory comment on createTableBucket about the host:port.grpcPort ServerAddress format used by `weed shell -master` (produced by pb.NewServerAddress) so the dot separator isn't mistaken for a typo, and adds doc comments for createIcebergNamespace, createIcebergTable, doIcebergJSONRequest, requireDorisRuntime, and hasDocker. --- .github/workflows/s3-tables-tests.yml | 71 ++ test/s3tables/catalog_doris/Dockerfile.writer | 12 + test/s3tables/catalog_doris/README.md | 80 ++ test/s3tables/catalog_doris/append_rows.py | 87 ++ .../catalog_doris/doris_catalog_test.go | 943 ++++++++++++++++++ 5 files changed, 1193 insertions(+) create mode 100644 test/s3tables/catalog_doris/Dockerfile.writer create mode 100644 test/s3tables/catalog_doris/README.md create mode 100644 test/s3tables/catalog_doris/append_rows.py create mode 100644 test/s3tables/catalog_doris/doris_catalog_test.go diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 6dec28130..2ea6f4b59 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -261,6 +261,77 @@ jobs: path: test/s3tables/catalog_dremio/test-output.log retention-days: 3 + doris-iceberg-catalog-tests: + name: Doris Iceberg Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 35 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v4 + + - name: Pre-pull Doris image + run: docker pull apache/doris:doris-all-in-one-2.1.0 + + - name: Pre-pull Python image for PyIceberg writer + run: docker pull python:3.11-slim + + - name: Run go mod tidy + run: go mod tidy + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Doris Iceberg Catalog Integration Tests + timeout-minutes: 30 + working-directory: test/s3tables/catalog_doris + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + docker info + echo "=== Starting Doris Iceberg Catalog Tests ===" + + go test -v -timeout 25m . 2>&1 | tee test-output.log || { + echo "Doris Iceberg catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/catalog_doris + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker|doris)" || true + echo "=== Doris containers ===" + docker ps -a --filter "name=seaweed-doris" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v7 + with: + name: doris-iceberg-catalog-test-logs + path: test/s3tables/catalog_doris/test-output.log + retention-days: 3 + polaris-integration-tests: name: Polaris Integration Tests runs-on: ubuntu-22.04 diff --git a/test/s3tables/catalog_doris/Dockerfile.writer b/test/s3tables/catalog_doris/Dockerfile.writer new file mode 100644 index 000000000..3d22d0889 --- /dev/null +++ b/test/s3tables/catalog_doris/Dockerfile.writer @@ -0,0 +1,12 @@ +# Iceberg writer used by the Doris integration test to populate a table with +# data files via PyIceberg, so the downstream Doris SELECT verifies the read +# path against non-empty results rather than just COUNT(*) = 0. +FROM python:3.11-slim + +WORKDIR /app + +RUN pip install --no-cache-dir "pyiceberg[s3fs]" pyarrow + +COPY append_rows.py /app/ + +ENTRYPOINT ["python3", "/app/append_rows.py"] diff --git a/test/s3tables/catalog_doris/README.md b/test/s3tables/catalog_doris/README.md new file mode 100644 index 000000000..db4a3b545 --- /dev/null +++ b/test/s3tables/catalog_doris/README.md @@ -0,0 +1,80 @@ +# Apache Doris Iceberg Catalog Integration Test + +This directory contains an Apache Doris integration smoke test for SeaweedFS's +Iceberg REST Catalog implementation. + +## What It Tests + +`TestDorisIcebergCatalog` verifies the Doris path end to end: + +1. Starts a local SeaweedFS mini cluster with S3 Tables and Iceberg REST enabled. +2. Creates a SeaweedFS table bucket. +3. Creates an Iceberg namespace and an empty table through the SeaweedFS REST + catalog OAuth flow. +4. Creates a second table and populates it with three rows by running a + PyIceberg writer container (`Dockerfile.writer` + `append_rows.py`) before + Doris bootstraps, so the snapshot is part of Doris's first scan. +5. Starts `apache/doris:doris-all-in-one-2.1.0` and waits for the FE MySQL + query port and at least one alive BE. +6. Connects to Doris over the MySQL protocol and registers a Doris + `EXTERNAL CATALOG` of `type=iceberg` and `iceberg.catalog.type=rest` that + points at the SeaweedFS REST endpoint, using `credential = key:secret` for + the OAuth2 client-credentials flow. +7. Runs subtests against the SeaweedFS-backed Iceberg tables: + - `BasicSelect`: Doris is alive and answering SQL. + - `CatalogVisible`: `SHOW CATALOGS` lists the SeaweedFS-backed catalog. + - `DatabaseVisible`: the seeded namespace is exposed as a Doris database. + - `TableVisible`: the seeded table appears under `SHOW TABLES FROM ...`. + - `CountEmptyTable`: catalog-to-table resolution and a scan of an empty table. + - `ColumnProjection`: `SELECT id, label` succeeds and the response columns + are `id, label`. Failure here means Doris could not parse the schema + returned by the SeaweedFS catalog. + - `ReadWrittenDataCount` and `ReadWrittenDataValues`: Doris reads back the + three PyIceberg-appended rows and the values match. This exercises the + actual data path (parquet reads via S3), not just metadata. + +The PyIceberg writer image is built on demand via Docker layer caching. The +first build pulls `python:3.11-slim` and pip-installs PyIceberg + PyArrow +(~1-2 min in CI); subsequent invocations are cheap. + +## Running Locally + +Build or install `weed`, then run: + +```bash +cd test/s3tables/catalog_doris +go test -v -timeout 25m . +``` + +The test requires Docker. The GitHub Actions job runs on `ubuntu-22.04` and +executes the test for pull requests. + +## Configuration + +The test uses these fixed credentials for the local SeaweedFS IAM config: + +- S3 access key: `AKIAIOSFODNN7EXAMPLE` +- S3 secret key: `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` +- Region: `us-west-2` +- Warehouse bucket: `iceberg-tables` + +Doris ports: + +- The all-in-one container exposes `9030` (FE MySQL/SQL) and `8030` (FE HTTP). + Only `9030` is mapped to a host port (allocated dynamically) so the Go + MySQL client can connect from the test process. +- The Iceberg REST endpoint and the S3 endpoint are reached from inside the + Doris container via `host.docker.internal`, matching the Trino and Dremio + test paths. + +## Troubleshooting + +- Ensure Docker is running: `docker version` +- Ensure `weed` is built or available on `PATH` +- The Doris all-in-one image is large (~1 GB) and the FE+BE need + ~30-60 seconds to register before queries succeed; the test waits up to + 4 minutes for the query port and 60 seconds for at least one alive BE. +- If queries hang on metadata, run `REFRESH CATALOG iceberg_catalog` from a + Doris MySQL client and retry. +- Container logs are printed in the failure message; you can also check + `docker logs ` while the test is running. diff --git a/test/s3tables/catalog_doris/append_rows.py b/test/s3tables/catalog_doris/append_rows.py new file mode 100644 index 000000000..341024874 --- /dev/null +++ b/test/s3tables/catalog_doris/append_rows.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +"""Append rows to an existing Iceberg table via the SeaweedFS REST catalog. + +Used by the Doris integration test to materialize data files so a downstream +SELECT from Doris verifies the read path against non-empty results. + +Usage: + python3 append_rows.py \\ + --catalog-url http://localhost:8181 \\ + --warehouse s3://my-bucket \\ + --prefix my-bucket \\ + --s3-endpoint http://localhost:8333 \\ + --access-key AKIA... --secret-key wJalr... \\ + --namespace foo --namespace bar \\ + --table events +""" + +import argparse +import sys + +import pyarrow as pa +from pyiceberg.catalog import load_catalog + + +def main() -> int: + p = argparse.ArgumentParser() + p.add_argument("--catalog-url", required=True) + p.add_argument("--warehouse", required=True, help="s3://") + p.add_argument("--prefix", required=True, help="REST catalog prefix (table bucket name)") + p.add_argument("--s3-endpoint", required=True, help="http://host:port") + p.add_argument("--access-key", required=True) + p.add_argument("--secret-key", required=True) + p.add_argument("--region", default="us-east-1") + p.add_argument( + "--namespace", + action="append", + required=True, + help="One per level (e.g. --namespace foo --namespace bar for foo.bar).", + ) + p.add_argument("--table", required=True) + args = p.parse_args() + + # `credential` triggers OAuth2 client_credentials against + # /v1/oauth/tokens, matching the helper Go test uses for + # REST-API table creation. The s3.* keys are needed for parquet writes. + catalog = load_catalog( + "rest", + **{ + "type": "rest", + "uri": args.catalog_url, + "warehouse": args.warehouse, + "prefix": args.prefix, + "credential": f"{args.access_key}:{args.secret_key}", + "s3.access-key-id": args.access_key, + "s3.secret-access-key": args.secret_key, + "s3.endpoint": args.s3_endpoint, + "s3.region": args.region, + "s3.path-style-access": "true", + }, + ) + + table_id = tuple(args.namespace) + (args.table,) + table = catalog.load_table(table_id) + + # Match the Iceberg table schema: id is `required long`, label is + # `optional string`. Default pyarrow columns are nullable, which fails + # PyIceberg's required-field compatibility check. + arrow_schema = pa.schema( + [ + pa.field("id", pa.int64(), nullable=False), + pa.field("label", pa.string(), nullable=True), + ] + ) + arrow_table = pa.Table.from_pydict( + { + "id": [1, 2, 3], + "label": ["one", "two", "three"], + }, + schema=arrow_schema, + ) + table.append(arrow_table) + print(f"appended {arrow_table.num_rows} rows to {'.'.join(table_id)}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/test/s3tables/catalog_doris/doris_catalog_test.go b/test/s3tables/catalog_doris/doris_catalog_test.go new file mode 100644 index 000000000..b7b071b3f --- /dev/null +++ b/test/s3tables/catalog_doris/doris_catalog_test.go @@ -0,0 +1,943 @@ +// Package catalog_doris provides integration tests for Apache Doris with the +// SeaweedFS Iceberg REST Catalog. +package catalog_doris + +import ( + "bytes" + "context" + "crypto/rand" + "database/sql" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +const ( + dorisImage = "apache/doris:doris-all-in-one-2.1.0" + dorisCatalogName = "iceberg_catalog" + dorisQueryPort = 9030 + dorisStartTimeout = 4 * time.Minute +) + +type TestEnvironment struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + icebergPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + dorisContainer string + dorisHostQueryPort int + accessKey string + secretKey string +} + +// TestDorisIcebergCatalog brings up SeaweedFS + Doris and validates that +// Doris can discover catalog metadata served by SeaweedFS's Iceberg REST API +// and read both empty and populated tables through the standard data path. +// +// Subtests: +// - BasicSelect: Doris is alive and answering SQL. +// - CatalogVisible: SHOW CATALOGS lists the SeaweedFS-backed catalog. +// - DatabaseVisible: the seeded namespace is visible as a database. +// - TableVisible: the seeded table is listed under the namespace. +// - CountEmptyTable: Doris resolves the table and scans an empty Iceberg snapshot. +// - ColumnProjection: Doris parsed the schema and accepts column-name projection +// (a column-not-found here means the schema returned by SeaweedFS was rejected). +// - ReadWrittenDataCount / ReadWrittenDataValues: a separate table is populated +// by a PyIceberg writer container before Doris connects; Doris then reads the +// three rows back, exercising the actual data path (not just metadata). +func TestDorisIcebergCatalog(t *testing.T) { + requireDorisRuntime(t) + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + fmt.Printf(">>> Starting SeaweedFS...\n") + env.StartSeaweedFS(t) + fmt.Printf(">>> SeaweedFS started.\n") + + tableBucket := "iceberg-tables" + fmt.Printf(">>> Creating table bucket: %s\n", tableBucket) + createTableBucket(t, env, tableBucket) + fmt.Printf(">>> Table bucket created.\n") + + testIcebergRestAPI(t, env) + + namespace := "doris_" + randomString(6) + tableName := "smoke_" + randomString(6) + icebergToken := requestIcebergOAuthToken(t, env) + createIcebergNamespace(t, env, icebergToken, tableBucket, namespace) + createIcebergTable(t, env, icebergToken, tableBucket, namespace, tableName) + + // Seed a populated table by creating an empty one through the REST API + // and then appending three rows via PyIceberg. Done before Doris bootstrap + // so the snapshot is part of the catalog's first scan. + populatedTable := "populated_" + randomString(6) + createIcebergTable(t, env, icebergToken, tableBucket, namespace, populatedTable) + buildDorisWriterImage(t) + writeIcebergRows(t, env, tableBucket, []string{namespace}, populatedTable) + + env.startDorisContainer(t) + env.waitForDoris(t, dorisStartTimeout) + db := env.connectDoris(t) + defer db.Close() + + env.createDorisIcebergCatalog(t, db, tableBucket) + + t.Run("BasicSelect", func(t *testing.T) { + var v int + if err := db.QueryRow("SELECT 1").Scan(&v); err != nil { + t.Fatalf("SELECT 1: %v", err) + } + if v != 1 { + t.Fatalf("SELECT 1 = %d, want 1", v) + } + }) + + t.Run("CatalogVisible", func(t *testing.T) { + // Doris's SHOW CATALOGS columns vary between versions. The catalog + // name appears in the first column for all releases that ship the + // Iceberg REST connector, so a per-row substring check is enough. + if !rowsContain(t, db, "SHOW CATALOGS", dorisCatalogName) { + t.Fatalf("SHOW CATALOGS did not list %s", dorisCatalogName) + } + }) + + t.Run("DatabaseVisible", func(t *testing.T) { + query := fmt.Sprintf("SHOW DATABASES FROM %s", quoteDorisIdent(dorisCatalogName)) + if !rowsContain(t, db, query, namespace) { + t.Fatalf("SHOW DATABASES from %s did not list namespace %s", dorisCatalogName, namespace) + } + }) + + t.Run("TableVisible", func(t *testing.T) { + query := fmt.Sprintf("SHOW TABLES FROM %s.%s", + quoteDorisIdent(dorisCatalogName), quoteDorisIdent(namespace)) + if !rowsContain(t, db, query, tableName) { + t.Fatalf("SHOW TABLES from %s.%s did not list %s", dorisCatalogName, namespace, tableName) + } + }) + + tableRef := dorisObjectName(dorisCatalogName, namespace, tableName) + + t.Run("CountEmptyTable", func(t *testing.T) { + var count int64 + query := fmt.Sprintf("SELECT COUNT(*) FROM %s", tableRef) + if err := db.QueryRow(query).Scan(&count); err != nil { + t.Fatalf("%s: %v", query, err) + } + if count != 0 { + t.Fatalf("count(%s) = %d, want 0", tableRef, count) + } + }) + + t.Run("ColumnProjection", func(t *testing.T) { + // SELECT COUNT(*) doesn't exercise the schema. A projection by column + // name fails fast with "column not found" if the catalog's schema + // response wasn't parsed by Doris. + query := fmt.Sprintf("SELECT id, label FROM %s", tableRef) + rows, err := db.Query(query) + if err != nil { + t.Fatalf("%s: %v", query, err) + } + defer rows.Close() + cols, err := rows.Columns() + if err != nil { + t.Fatalf("Columns(): %v", err) + } + if !equalStringSlicesIgnoreCase(cols, []string{"id", "label"}) { + t.Fatalf("projected columns = %v, want [id label]", cols) + } + if rows.Next() { + t.Fatalf("expected empty result set, got at least one row") + } + }) + + populatedRef := dorisObjectName(dorisCatalogName, namespace, populatedTable) + + t.Run("ReadWrittenDataCount", func(t *testing.T) { + var count int64 + query := fmt.Sprintf("SELECT COUNT(*) FROM %s", populatedRef) + if err := db.QueryRow(query).Scan(&count); err != nil { + t.Fatalf("%s: %v", query, err) + } + if count != 3 { + t.Fatalf("count(%s) = %d, want 3", populatedRef, count) + } + }) + + t.Run("ReadWrittenDataValues", func(t *testing.T) { + query := fmt.Sprintf("SELECT id, label FROM %s ORDER BY id", populatedRef) + rows, err := db.Query(query) + if err != nil { + t.Fatalf("%s: %v", query, err) + } + defer rows.Close() + expected := [][2]string{{"1", "one"}, {"2", "two"}, {"3", "three"}} + i := 0 + for rows.Next() { + var id int64 + var label sql.NullString + if err := rows.Scan(&id, &label); err != nil { + t.Fatalf("Scan row %d: %v", i, err) + } + if i >= len(expected) { + t.Fatalf("got more than %d rows from %s", len(expected), populatedRef) + } + gotID := fmt.Sprintf("%d", id) + gotLabel := label.String + if gotID != expected[i][0] || gotLabel != expected[i][1] { + t.Errorf("row %d = (%s, %s), want (%s, %s)", + i, gotID, gotLabel, expected[i][0], expected[i][1]) + } + i++ + } + if err := rows.Err(); err != nil { + t.Fatalf("rows.Err: %v", err) + } + if i != len(expected) { + t.Fatalf("expected %d rows, got %d", len(expected), i) + } + }) +} + +// NewTestEnvironment allocates ports and returns an environment for the test. +func NewTestEnvironment(t *testing.T) *TestEnvironment { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + seaweedDir := wd + for i := 0; i < 6; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + info, err := os.Stat(weedBinary) + if err != nil || info.IsDir() { + weedBinary = filepath.Join(seaweedDir, "weed", "weed", "weed") + info, err = os.Stat(weedBinary) + if err != nil || info.IsDir() { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-doris-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + bindIP := testutil.FindBindIP() + // 9 ports for the seaweed mini cluster, plus one for the Doris MySQL + // query port mapped on the host. + ports := testutil.MustAllocatePorts(t, 10) + + env := &TestEnvironment{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + masterPort: ports[0], + masterGrpcPort: ports[1], + volumePort: ports[2], + volumeGrpcPort: ports[3], + filerPort: ports[4], + filerGrpcPort: ports[5], + s3Port: ports[6], + s3GrpcPort: ports[7], + icebergPort: ports[8], + dorisHostQueryPort: ports[9], + } + + env.accessKey = "AKIAIOSFODNN7EXAMPLE" + env.secretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + + return env +} + +// StartSeaweedFS starts a SeaweedFS mini instance with the Iceberg REST API. +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + iamConfigPath, err := testutil.WriteIAMConfig(env.dataDir, env.accessKey, env.secretKey) + if err != nil { + t.Fatalf("Failed to create IAM config: %v", err) + } + + securityToml := filepath.Join(env.dataDir, "security.toml") + if err := os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644); err != nil { + t.Fatalf("Failed to create security.toml: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + cmd := exec.CommandContext(ctx, env.weedBinary, "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort), + "-s3.config", iamConfigPath, + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + ) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + cmd.Env = append(os.Environ(), + "AWS_ACCESS_KEY_ID="+env.accessKey, + "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "ICEBERG_WAREHOUSE=s3://iceberg-tables", + "S3TABLES_DEFAULT_BUCKET=iceberg-tables", + ) + + if err := cmd.Start(); err != nil { + t.Fatalf("Failed to start SeaweedFS: %v", err) + } + env.weedProcess = cmd + + icebergURL := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + if !env.waitForService(icebergURL, 30*time.Second) { + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(icebergURL) + if err != nil { + t.Logf("WARNING: Could not connect to Iceberg service at %s: %v", icebergURL, err) + } else { + t.Logf("WARNING: Iceberg service returned status %d at %s", resp.StatusCode, icebergURL) + resp.Body.Close() + } + t.Fatalf("Iceberg REST API did not become ready") + } +} + +// Cleanup stops Doris, SeaweedFS, and removes temporary state. +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + if env.dorisContainer != "" { + _ = exec.Command("docker", "rm", "-f", env.dorisContainer).Run() + } + + if env.weedCancel != nil { + env.weedCancel() + } + + if env.weedProcess != nil { + time.Sleep(2 * time.Second) + _ = env.weedProcess.Wait() + } + + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +// waitForService polls a URL until it returns a 2xx/4xx status or timeout. +func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool { + client := &http.Client{Timeout: 2 * time.Second} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := client.Get(url) + if err != nil { + time.Sleep(500 * time.Millisecond) + continue + } + statusCode := resp.StatusCode + resp.Body.Close() + if statusCode >= 200 && statusCode < 300 { + return true + } + if statusCode == http.StatusUnauthorized || statusCode == http.StatusForbidden { + return true + } + time.Sleep(500 * time.Millisecond) + } + return false +} + +// testIcebergRestAPI verifies the Iceberg REST endpoint is reachable. +func testIcebergRestAPI(t *testing.T, env *TestEnvironment) { + t.Helper() + fmt.Printf(">>> Testing Iceberg REST API directly...\n") + + addr := net.JoinHostPort(env.bindIP, fmt.Sprintf("%d", env.icebergPort)) + conn, err := net.Dial("tcp", addr) + if err != nil { + t.Fatalf("Cannot connect to Iceberg service at %s: %v", addr, err) + } + conn.Close() + t.Logf("Successfully connected to Iceberg service at %s", addr) + + url := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + t.Logf("Testing Iceberg REST API at %s", url) + + resp, err := http.Get(url) + if err != nil { + t.Fatalf("Failed to connect to Iceberg REST API at %s: %v", url, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("Expected 200 OK from /v1/config, got %d, body: %s", resp.StatusCode, body) + } +} + +// startDorisContainer launches the Apache Doris all-in-one image and exposes +// only the FE MySQL/SQL port to the host. The Iceberg REST and S3 endpoints +// are reached via host.docker.internal, matching the Dremio/Trino paths. +func (env *TestEnvironment) startDorisContainer(t *testing.T) { + t.Helper() + + containerName := "seaweed-doris-" + randomString(8) + env.dorisContainer = containerName + + cmd := exec.Command("docker", "run", "-d", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-p", fmt.Sprintf("%d:%d", env.dorisHostQueryPort, dorisQueryPort), + "-e", "AWS_ACCESS_KEY_ID="+env.accessKey, + "-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "-e", "AWS_REGION=us-west-2", + dorisImage, + ) + if output, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("Failed to start Doris container: %v\n%s", err, string(output)) + } +} + +// dorisContainerLogs returns the tail of Doris's container logs for diagnostics. +func dorisContainerLogs(containerName string) string { + cmd := exec.Command("docker", "logs", "--tail", "200", containerName) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Sprintf("(failed to fetch docker logs: %v)\n%s", err, string(output)) + } + return string(output) +} + +// waitForDoris polls Doris's MySQL query port until it accepts SELECT 1. +// The all-in-one image takes ~30-60s to boot the FE and BE; we poll long +// enough to absorb cold-start variance in CI. +func (env *TestEnvironment) waitForDoris(t *testing.T, timeout time.Duration) { + t.Helper() + + dsn := fmt.Sprintf("root:@tcp(127.0.0.1:%d)/", env.dorisHostQueryPort) + deadline := time.Now().Add(timeout) + var lastErr error + for time.Now().Before(deadline) { + if !containerRunning(env.dorisContainer) { + t.Fatalf("Doris container exited before becoming ready\nContainer logs:\n%s", + dorisContainerLogs(env.dorisContainer)) + } + + db, err := sql.Open("mysql", dsn+"?timeout=2s&readTimeout=5s&writeTimeout=5s") + if err != nil { + lastErr = err + time.Sleep(2 * time.Second) + continue + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + var v int + err = db.QueryRowContext(ctx, "SELECT 1").Scan(&v) + cancel() + db.Close() + if err == nil && v == 1 { + // Doris reports the FE is up before the BE has registered. A + // catalog SHOW CATALOGS still works without BEs, but a REFRESH + // CATALOG against an external source needs at least one live + // BE. Wait for one to appear so subsequent subtests don't race. + if env.waitForDorisBackend(t, 60*time.Second) { + return + } + } else { + lastErr = err + } + time.Sleep(2 * time.Second) + } + + t.Fatalf("Timed out waiting for Doris to be ready\nLast error: %v\nContainer logs:\n%s", + lastErr, dorisContainerLogs(env.dorisContainer)) +} + +// waitForDorisBackend polls SHOW BACKENDS until at least one BE reports Alive=true. +func (env *TestEnvironment) waitForDorisBackend(t *testing.T, timeout time.Duration) bool { + t.Helper() + + dsn := fmt.Sprintf("root:@tcp(127.0.0.1:%d)/?timeout=2s&readTimeout=5s&writeTimeout=5s", + env.dorisHostQueryPort) + db, err := sql.Open("mysql", dsn) + if err != nil { + return false + } + defer db.Close() + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + alive, err := dorisHasAliveBackend(db) + if err == nil && alive { + return true + } + time.Sleep(2 * time.Second) + } + return false +} + +// dorisHasAliveBackend returns true if any row in SHOW BACKENDS has Alive="true". +func dorisHasAliveBackend(db *sql.DB) (bool, error) { + rows, err := db.Query("SHOW BACKENDS") + if err != nil { + return false, err + } + defer rows.Close() + + cols, err := rows.Columns() + if err != nil { + return false, err + } + aliveIdx := -1 + for i, c := range cols { + if strings.EqualFold(c, "Alive") { + aliveIdx = i + break + } + } + if aliveIdx < 0 { + return false, fmt.Errorf("SHOW BACKENDS has no Alive column: %v", cols) + } + + values := make([]sql.NullString, len(cols)) + scanArgs := make([]interface{}, len(cols)) + for i := range values { + scanArgs[i] = &values[i] + } + for rows.Next() { + if err := rows.Scan(scanArgs...); err != nil { + return false, err + } + if values[aliveIdx].Valid && strings.EqualFold(values[aliveIdx].String, "true") { + return true, nil + } + } + return false, rows.Err() +} + +// containerRunning returns true if the named container is in `running` state. +func containerRunning(containerName string) bool { + cmd := exec.Command("docker", "inspect", "--format", "{{.State.Running}}", containerName) + out, err := cmd.Output() + if err != nil { + return false + } + return strings.TrimSpace(string(out)) == "true" +} + +// connectDoris opens a long-lived MySQL-protocol connection to Doris. +func (env *TestEnvironment) connectDoris(t *testing.T) *sql.DB { + t.Helper() + + dsn := fmt.Sprintf("root:@tcp(127.0.0.1:%d)/?parseTime=true&timeout=10s&readTimeout=60s&writeTimeout=60s", + env.dorisHostQueryPort) + db, err := sql.Open("mysql", dsn) + if err != nil { + t.Fatalf("sql.Open(mysql): %v", err) + } + db.SetMaxOpenConns(2) + if err := db.Ping(); err != nil { + _ = db.Close() + t.Fatalf("Doris ping failed: %v\nContainer logs:\n%s", err, dorisContainerLogs(env.dorisContainer)) + } + return db +} + +// createDorisIcebergCatalog registers a Doris EXTERNAL CATALOG of type=iceberg +// pointing at the SeaweedFS REST endpoint. Doris reuses Iceberg's standard +// REST client, so OAuth2 client credentials are passed via "credential". +func (env *TestEnvironment) createDorisIcebergCatalog(t *testing.T, db *sql.DB, warehouseBucket string) { + t.Helper() + + icebergURI := fmt.Sprintf("http://host.docker.internal:%d", env.icebergPort) + s3Endpoint := fmt.Sprintf("http://host.docker.internal:%d", env.s3Port) + credential := env.accessKey + ":" + env.secretKey + + // Drop first so a stale state from a previous run does not poison + // metadata caching. Doris versions before 2.1 use IF EXISTS at the end. + if _, err := db.Exec(fmt.Sprintf("DROP CATALOG IF EXISTS %s", quoteDorisIdent(dorisCatalogName))); err != nil { + t.Logf("DROP CATALOG IF EXISTS %s: %v (continuing)", dorisCatalogName, err) + } + + createSQL := fmt.Sprintf(`CREATE CATALOG %s PROPERTIES ( + "type" = "iceberg", + "iceberg.catalog.type" = "rest", + "uri" = %q, + "warehouse" = %q, + "credential" = %q, + "s3.endpoint" = %q, + "s3.access_key" = %q, + "s3.secret_key" = %q, + "s3.region" = "us-west-2", + "use_path_style" = "true" + )`, + quoteDorisIdent(dorisCatalogName), + icebergURI, + "s3://"+warehouseBucket, + credential, + s3Endpoint, + env.accessKey, + env.secretKey, + ) + if _, err := db.Exec(createSQL); err != nil { + t.Fatalf("CREATE CATALOG %s failed: %v\nContainer logs:\n%s", + dorisCatalogName, err, dorisContainerLogs(env.dorisContainer)) + } + + // Refresh so namespaces/tables seeded via the REST API right before this + // step are visible to the first SHOW DATABASES query. + if _, err := db.Exec(fmt.Sprintf("REFRESH CATALOG %s", quoteDorisIdent(dorisCatalogName))); err != nil { + t.Logf("REFRESH CATALOG %s: %v (continuing)", dorisCatalogName, err) + } +} + +// rowsContain runs query and returns true if any value of any column equals +// (case-insensitively) the wanted token. Doris's SHOW family returns +// version-dependent column counts, so we don't pin to a specific column. +func rowsContain(t *testing.T, db *sql.DB, query, want string) bool { + t.Helper() + + rows, err := db.Query(query) + if err != nil { + t.Fatalf("%s: %v", query, err) + } + defer rows.Close() + + cols, err := rows.Columns() + if err != nil { + t.Fatalf("Columns(): %v", err) + } + values := make([]sql.NullString, len(cols)) + scanArgs := make([]interface{}, len(cols)) + for i := range values { + scanArgs[i] = &values[i] + } + wantLower := strings.ToLower(want) + for rows.Next() { + if err := rows.Scan(scanArgs...); err != nil { + t.Fatalf("Scan: %v", err) + } + for _, v := range values { + if v.Valid && strings.Contains(strings.ToLower(v.String), wantLower) { + return true + } + } + } + if err := rows.Err(); err != nil { + t.Fatalf("rows.Err: %v", err) + } + return false +} + +// equalStringSlicesIgnoreCase compares two []string for equality, ignoring case. +func equalStringSlicesIgnoreCase(got, want []string) bool { + if len(got) != len(want) { + return false + } + for i := range got { + if !strings.EqualFold(got[i], want[i]) { + return false + } + } + return true +} + +// quoteDorisIdent wraps an identifier in backticks (Doris uses MySQL syntax), +// escaping any embedded backticks. +func quoteDorisIdent(name string) string { + return "`" + strings.ReplaceAll(name, "`", "``") + "`" +} + +// dorisObjectName produces a backtick-quoted dotted reference like +// `catalog`.`namespace`.`table` so identifiers with hyphens or random +// suffixes parse without quoting issues. +func dorisObjectName(parts ...string) string { + quoted := make([]string, 0, len(parts)) + for _, part := range parts { + quoted = append(quoted, quoteDorisIdent(part)) + } + return strings.Join(quoted, ".") +} + +// requestIcebergOAuthToken requests an OAuth2 client_credentials token from +// the SeaweedFS Iceberg REST catalog. Used to seed the catalog with a +// namespace and table directly through the REST API before Doris connects. +func requestIcebergOAuthToken(t *testing.T, env *TestEnvironment) string { + t.Helper() + + resp, err := http.PostForm(fmt.Sprintf("http://%s:%d/v1/oauth/tokens", env.bindIP, env.icebergPort), url.Values{ + "grant_type": {"client_credentials"}, + "client_id": {env.accessKey}, + "client_secret": {env.secretKey}, + }) + if err != nil { + t.Fatalf("POST /v1/oauth/tokens: %v", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + t.Fatalf("OAuth token request failed: status=%d body=%s", resp.StatusCode, body) + } + + var tokenResp struct { + AccessToken string `json:"access_token"` + TokenType string `json:"token_type"` + } + if err := json.Unmarshal(body, &tokenResp); err != nil { + t.Fatalf("decode token response: %v", err) + } + if tokenResp.AccessToken == "" { + t.Fatal("got empty access_token") + } + return tokenResp.AccessToken +} + +// createIcebergNamespace creates a single-level Iceberg namespace through +// the REST catalog. Wrapper around createIcebergNamespaceLevels for the +// common single-level case. +func createIcebergNamespace(t *testing.T, env *TestEnvironment, token, bucketName, namespace string) { + t.Helper() + createIcebergNamespaceLevels(t, env, token, bucketName, []string{namespace}) +} + +// createIcebergTable creates a table inside a single-level namespace through +// the REST catalog. The table is created with the canonical +// (id long not null, label string nullable) schema used by all subtests. +func createIcebergTable(t *testing.T, env *TestEnvironment, token, bucketName, namespace, tableName string) { + t.Helper() + createIcebergTableInLevels(t, env, token, bucketName, []string{namespace}, tableName) +} + +// createIcebergNamespaceLevels creates a (possibly multi-level) Iceberg +// namespace via the REST catalog. +func createIcebergNamespaceLevels(t *testing.T, env *TestEnvironment, token, bucketName string, levels []string) { + t.Helper() + + doIcebergJSONRequest(t, env, token, http.MethodPost, fmt.Sprintf("/v1/%s/namespaces", url.PathEscape(bucketName)), map[string]any{ + "namespace": levels, + }, http.StatusOK, http.StatusConflict) +} + +// createIcebergTableInLevels creates a table inside the given namespace levels. +// The namespace path component is encoded with the unit-separator (0x1F) +// convention used by the SeaweedFS Iceberg REST API. +func createIcebergTableInLevels(t *testing.T, env *TestEnvironment, token, bucketName string, levels []string, tableName string) { + t.Helper() + + encodedNs := strings.Join(levels, "\x1F") + doIcebergJSONRequest(t, env, token, http.MethodPost, + fmt.Sprintf("/v1/%s/namespaces/%s/tables", url.PathEscape(bucketName), url.PathEscape(encodedNs)), + map[string]any{ + "name": tableName, + "schema": map[string]any{ + "type": "struct", + "schema-id": 0, + "fields": []map[string]any{ + {"id": 1, "name": "id", "required": true, "type": "long"}, + {"id": 2, "name": "label", "required": false, "type": "string"}, + }, + }, + }, http.StatusOK) +} + +const dorisWriterImage = "seaweedfs-doris-writer" + +// buildDorisWriterImage builds the local PyIceberg writer image. Layer caching +// makes repeat invocations cheap; the first build pulls python:3.11-slim and +// pip-installs pyiceberg+pyarrow (~1-2 min in CI). +func buildDorisWriterImage(t *testing.T) { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + cmd := exec.Command("docker", "build", + "-t", dorisWriterImage, + "-f", filepath.Join(wd, "Dockerfile.writer"), + wd, + ) + if out, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("Failed to build %s image: %v\n%s", dorisWriterImage, err, out) + } +} + +// writeIcebergRows runs the PyIceberg writer container, which loads the +// already-created table and appends three rows. +func writeIcebergRows(t *testing.T, env *TestEnvironment, bucketName string, namespace []string, tableName string) { + t.Helper() + + args := []string{ + "run", "--rm", + "--add-host", "host.docker.internal:host-gateway", + dorisWriterImage, + "--catalog-url", fmt.Sprintf("http://host.docker.internal:%d", env.icebergPort), + "--warehouse", "s3://" + bucketName, + "--prefix", bucketName, + "--s3-endpoint", fmt.Sprintf("http://host.docker.internal:%d", env.s3Port), + "--access-key", env.accessKey, + "--secret-key", env.secretKey, + "--region", "us-west-2", + "--table", tableName, + } + for _, level := range namespace { + args = append(args, "--namespace", level) + } + + cmd := exec.Command("docker", args...) + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("PyIceberg writer failed: %v\n%s", err, out) + } + t.Logf("PyIceberg writer output: %s", strings.TrimSpace(string(out))) +} + +// doIcebergJSONRequest issues an authenticated JSON request to the Iceberg +// REST endpoint and returns the response body. It fails the test unless the +// response status matches one of expectedStatuses. Used by namespace and +// table seeding so the tests can fail fast with a useful error if the REST +// API rejects a setup call. +func doIcebergJSONRequest(t *testing.T, env *TestEnvironment, token, method, path string, payload any, expectedStatuses ...int) string { + t.Helper() + + var body io.Reader + if payload != nil { + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal Iceberg request: %v", err) + } + body = bytes.NewReader(payloadBytes) + } + + req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", env.bindIP, env.icebergPort, path), body) + if err != nil { + t.Fatalf("create Iceberg request: %v", err) + } + req.Header.Set("Authorization", "Bearer "+token) + if payload != nil { + req.Header.Set("Content-Type", "application/json") + } + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Iceberg request failed: %v", err) + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + for _, expectedStatus := range expectedStatuses { + if resp.StatusCode == expectedStatus { + return string(respBody) + } + } + t.Fatalf("Iceberg request returned unexpected status %d, want %v\nPath: %s\nBody: %s", + resp.StatusCode, expectedStatuses, path, respBody) + return "" +} + +// createTableBucket creates an S3 table bucket using `weed shell`, which +// talks to the master over gRPC and bypasses the S3 SigV4 path so the test +// works whether IAM is enabled or not. The `-master` flag uses SeaweedFS's +// canonical `host:port.grpcPort` ServerAddress format produced by +// pb.NewServerAddress (see weed/pb/server_address.go) — the dot is the +// separator between the HTTP port and the gRPC port and is required, not +// a typo. Replacing it with a colon would make the parser treat the gRPC +// port as the HTTP port and synthesize the wrong gRPC port. +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + cmd := exec.CommandContext(ctx, env.weedBinary, "shell", + fmt.Sprintf("-master=%s:%d.%d", env.bindIP, env.masterPort, env.masterGrpcPort), + ) + cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName)) + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output)) + } + t.Logf("Created table bucket: %s", bucketName) +} + +// requireDorisRuntime skips the test in `-short` mode or when Docker isn't +// available, since the test cannot run without the Doris container. +func requireDorisRuntime(t *testing.T) { + t.Helper() + + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + if !hasDocker() { + t.Skip("Docker not available, skipping Doris integration test") + } +} + +// hasDocker reports whether `docker version` can run, which we treat as a +// sufficient signal that a Docker daemon is reachable from this process. +func hasDocker() bool { + cmd := exec.Command("docker", "version") + return cmd.Run() == nil +} + +// randomString returns a lowercase alphanumeric string of the given length. +func randomString(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, length) + if _, err := rand.Read(b); err != nil { + panic("failed to generate random string: " + err.Error()) + } + for i := range b { + b[i] = charset[int(b[i])%len(charset)] + } + return string(b) +}