diff --git a/.github/workflows/s3-tables-tests.yml b/.github/workflows/s3-tables-tests.yml index 72a18ef85..8f90dafc0 100644 --- a/.github/workflows/s3-tables-tests.yml +++ b/.github/workflows/s3-tables-tests.yml @@ -2,10 +2,6 @@ name: "S3 Tables Integration Tests" on: pull_request: - -concurrency: - group: ${{ github.head_ref }}/s3-tables-tests - cancel-in-progress: true permissions: contents: read @@ -194,6 +190,74 @@ jobs: path: test/s3tables/catalog_trino/test-output.log retention-days: 3 + dremio-iceberg-catalog-tests: + name: Dremio Iceberg Catalog Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 30 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Set up Docker + uses: docker/setup-buildx-action@v4 + + - name: Pre-pull Dremio image + run: docker pull dremio/dremio-oss:25.2.0 + + - name: Run go mod tidy + run: go mod tidy + + - name: Install SeaweedFS + run: | + go install -buildvcs=false ./weed + + - name: Run Dremio Iceberg Catalog Integration Tests + timeout-minutes: 25 + working-directory: test/s3tables/catalog_dremio + run: | + set -x + set -o pipefail + echo "=== System Information ===" + uname -a + free -h + df -h + docker info + echo "=== Starting Dremio Iceberg Catalog Tests ===" + + go test -v -timeout 20m . 2>&1 | tee test-output.log || { + echo "Dremio Iceberg catalog integration tests failed" + exit 1 + } + + - name: Show test output on failure + if: failure() + working-directory: test/s3tables/catalog_dremio + run: | + echo "=== Test Output ===" + if [ -f test-output.log ]; then + tail -200 test-output.log + fi + + echo "=== Process information ===" + ps aux | grep -E "(weed|test|docker|dremio)" || true + echo "=== Dremio containers ===" + docker ps -a --filter "name=seaweed-dremio" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v7 + with: + name: dremio-iceberg-catalog-test-logs + path: test/s3tables/catalog_dremio/test-output.log + retention-days: 3 + polaris-integration-tests: name: Polaris Integration Tests runs-on: ubuntu-22.04 diff --git a/.gitignore b/.gitignore index b356654f9..2306882fe 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ vendor tags *.swp +.claude/ ### OSX template .DS_Store .AppleDouble diff --git a/test/s3tables/catalog_dremio/README.md b/test/s3tables/catalog_dremio/README.md new file mode 100644 index 000000000..1150db22f --- /dev/null +++ b/test/s3tables/catalog_dremio/README.md @@ -0,0 +1,46 @@ +# Dremio Iceberg Catalog Integration Test + +This directory contains a Dremio integration smoke test for SeaweedFS's Iceberg REST Catalog implementation. + +## What It Tests + +`TestDremioIcebergCatalog` verifies the Dremio path end to end: + +1. Starts a local SeaweedFS mini cluster with S3 Tables and Iceberg REST enabled. +2. Creates a SeaweedFS table bucket. +3. Creates an Iceberg namespace and empty table through the SeaweedFS REST catalog OAuth flow. +4. Starts `dremio/dremio-oss:25.2.0`. +5. Bootstraps a Dremio admin user and logs in. +6. Creates a Dremio `RESTCATALOG` source that points at the SeaweedFS catalog. +7. Submits Dremio SQL through `/api/v3/sql`, polls the job API, and reads job results. +8. Queries the SeaweedFS-backed Iceberg table from Dremio. + +## Running Locally + +Build or install `weed`, then run: + +```bash +cd test/s3tables/catalog_dremio +go test -v -timeout 20m . +``` + +The test requires Docker. The GitHub Actions job runs on `ubuntu-22.04` and executes the test for pull requests. + +## Configuration + +The test uses these fixed credentials for the local SeaweedFS IAM config: + +- S3 access key: `AKIAIOSFODNN7EXAMPLE` +- S3 secret key: `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` +- Region: `us-west-2` +- Warehouse bucket: `iceberg-tables` + +The Dremio source is configured via `POST /api/v3/catalog`; it is not configured in `dremio.conf`. +The Dremio container starts with the `plugins.restcatalog.enabled` support key enabled, which is required for the Iceberg REST Catalog source in Dremio OSS 25.2. + +## Troubleshooting + +- Ensure Docker is running: `docker version` +- Ensure `weed` is built or available on `PATH` +- Check host-gateway routing if Dremio cannot reach SeaweedFS: `docker run --add-host host.docker.internal:host-gateway --rm alpine getent hosts host.docker.internal` +- Check Dremio logs from the failed test output; the harness prints the Dremio container tail on Dremio startup, source setup, or job failures. diff --git a/test/s3tables/catalog_dremio/dremio_catalog_test.go b/test/s3tables/catalog_dremio/dremio_catalog_test.go new file mode 100644 index 000000000..e5ff07566 --- /dev/null +++ b/test/s3tables/catalog_dremio/dremio_catalog_test.go @@ -0,0 +1,891 @@ +// Package catalog_dremio provides integration tests for Dremio with SeaweedFS Iceberg REST Catalog. +package catalog_dremio + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "os" + "os/exec" + "path/filepath" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/testutil" +) + +const ( + dremioImage = "dremio/dremio-oss:25.2.0" + dremioSourceName = "iceberg" + dremioAdminUser = "seaweed-admin" + dremioAdminPassword = "SeaweedFS123!" +) + +type TestEnvironment struct { + seaweedDir string + weedBinary string + dataDir string + bindIP string + s3Port int + s3GrpcPort int + icebergPort int + masterPort int + masterGrpcPort int + filerPort int + filerGrpcPort int + volumePort int + volumeGrpcPort int + weedProcess *exec.Cmd + weedCancel context.CancelFunc + dremioContainer string + dremioToken string + accessKey string + secretKey string +} + +// TestDremioIcebergCatalog starts Dremio, registers SeaweedFS as an Iceberg +// REST catalog source, and runs Dremio SQL against a table served by the +// SeaweedFS catalog. +func TestDremioIcebergCatalog(t *testing.T) { + requireDremioRuntime(t) + + env := NewTestEnvironment(t) + defer env.Cleanup(t) + + fmt.Printf(">>> Starting SeaweedFS...\n") + env.StartSeaweedFS(t) + fmt.Printf(">>> SeaweedFS started.\n") + + tableBucket := "iceberg-tables" + fmt.Printf(">>> Creating table bucket: %s\n", tableBucket) + createTableBucket(t, env, tableBucket) + fmt.Printf(">>> Table bucket created.\n") + + testIcebergRestAPI(t, env) + + namespace := "dremio_" + randomString(6) + tableName := "smoke_" + randomString(6) + icebergToken := requestIcebergOAuthToken(t, env) + createIcebergNamespace(t, env, icebergToken, tableBucket, namespace) + createIcebergTable(t, env, icebergToken, tableBucket, namespace, tableName) + + configDir := env.writeDremioConfig(t, tableBucket) + env.startDremioContainer(t, configDir) + waitForDremio(t, env.dremioContainer, 180*time.Second) + env.bootstrapDremio(t, tableBucket) + + selectOutput := runDremioSQL(t, env, "SELECT 1 AS ok") + assertSingleNumericValue(t, selectOutput, 1) + + tableRef := dremioObjectName(dremioSourceName, namespace, tableName) + countOutput := runDremioSQL(t, env, fmt.Sprintf("SELECT COUNT(*) AS row_count FROM %s", tableRef)) + assertSingleNumericValue(t, countOutput, 0) +} + +// NewTestEnvironment creates a new test environment with allocated ports and configuration. +func NewTestEnvironment(t *testing.T) *TestEnvironment { + t.Helper() + + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + seaweedDir := wd + for i := 0; i < 6; i++ { + if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { + break + } + seaweedDir = filepath.Dir(seaweedDir) + } + + weedBinary := filepath.Join(seaweedDir, "weed", "weed") + info, err := os.Stat(weedBinary) + if err != nil || info.IsDir() { + weedBinary = filepath.Join(seaweedDir, "weed", "weed", "weed") + info, err = os.Stat(weedBinary) + if err != nil || info.IsDir() { + weedBinary = "weed" + if _, err := exec.LookPath(weedBinary); err != nil { + t.Skip("weed binary not found, skipping integration test") + } + } + } + + dataDir, err := os.MkdirTemp("", "seaweed-dremio-test-*") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + + bindIP := testutil.FindBindIP() + ports := testutil.MustAllocatePorts(t, 9) + + env := &TestEnvironment{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + masterPort: ports[0], + masterGrpcPort: ports[1], + volumePort: ports[2], + volumeGrpcPort: ports[3], + filerPort: ports[4], + filerGrpcPort: ports[5], + s3Port: ports[6], + s3GrpcPort: ports[7], + icebergPort: ports[8], + } + + env.accessKey = "AKIAIOSFODNN7EXAMPLE" + env.secretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + + return env +} + +// StartSeaweedFS starts a SeaweedFS mini instance with all necessary services. +func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { + t.Helper() + + iamConfigPath, err := testutil.WriteIAMConfig(env.dataDir, env.accessKey, env.secretKey) + if err != nil { + t.Fatalf("Failed to create IAM config: %v", err) + } + + securityToml := filepath.Join(env.dataDir, "security.toml") + if err := os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644); err != nil { + t.Fatalf("Failed to create security.toml: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + env.weedCancel = cancel + + cmd := exec.CommandContext(ctx, env.weedBinary, "mini", + "-master.port", fmt.Sprintf("%d", env.masterPort), + "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), + "-volume.port", fmt.Sprintf("%d", env.volumePort), + "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), + "-filer.port", fmt.Sprintf("%d", env.filerPort), + "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), + "-s3.port", fmt.Sprintf("%d", env.s3Port), + "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), + "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort), + "-s3.config", iamConfigPath, + "-ip", env.bindIP, + "-ip.bind", "0.0.0.0", + "-dir", env.dataDir, + ) + cmd.Dir = env.dataDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + cmd.Env = append(os.Environ(), + "AWS_ACCESS_KEY_ID="+env.accessKey, + "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "ICEBERG_WAREHOUSE=s3://iceberg-tables", + "S3TABLES_DEFAULT_BUCKET=iceberg-tables", + ) + + if err := cmd.Start(); err != nil { + t.Fatalf("Failed to start SeaweedFS: %v", err) + } + env.weedProcess = cmd + + icebergURL := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + if !env.waitForService(icebergURL, 30*time.Second) { + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(icebergURL) + if err != nil { + t.Logf("WARNING: Could not connect to Iceberg service at %s: %v", icebergURL, err) + } else { + t.Logf("WARNING: Iceberg service returned status %d at %s", resp.StatusCode, icebergURL) + resp.Body.Close() + } + t.Fatalf("Iceberg REST API did not become ready") + } +} + +// Cleanup stops all processes and removes temporary resources. +func (env *TestEnvironment) Cleanup(t *testing.T) { + t.Helper() + + if env.dremioContainer != "" { + _ = exec.Command("docker", "rm", "-f", env.dremioContainer).Run() + } + + if env.weedCancel != nil { + env.weedCancel() + } + + if env.weedProcess != nil { + time.Sleep(2 * time.Second) + _ = env.weedProcess.Wait() + } + + if env.dataDir != "" { + _ = os.RemoveAll(env.dataDir) + } +} + +// waitForService polls a URL until it responds with a success status or timeout is reached. +func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool { + client := &http.Client{Timeout: 2 * time.Second} + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + resp, err := client.Get(url) + if err != nil { + time.Sleep(500 * time.Millisecond) + continue + } + statusCode := resp.StatusCode + resp.Body.Close() + if statusCode >= 200 && statusCode < 300 { + return true + } + if statusCode == http.StatusUnauthorized || statusCode == http.StatusForbidden { + return true + } + time.Sleep(500 * time.Millisecond) + } + return false +} + +// testIcebergRestAPI verifies that the Iceberg REST API endpoint is responding. +func testIcebergRestAPI(t *testing.T, env *TestEnvironment) { + t.Helper() + fmt.Printf(">>> Testing Iceberg REST API directly...\n") + + addr := net.JoinHostPort(env.bindIP, fmt.Sprintf("%d", env.icebergPort)) + conn, err := net.Dial("tcp", addr) + if err != nil { + t.Fatalf("Cannot connect to Iceberg service at %s: %v", addr, err) + } + conn.Close() + t.Logf("Successfully connected to Iceberg service at %s", addr) + + url := fmt.Sprintf("http://%s:%d/v1/config", env.bindIP, env.icebergPort) + t.Logf("Testing Iceberg REST API at %s", url) + + resp, err := http.Get(url) + if err != nil { + t.Fatalf("Failed to connect to Iceberg REST API at %s: %v", url, err) + } + defer resp.Body.Close() + + t.Logf("Iceberg REST API response status: %d", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + t.Logf("Iceberg REST API response body: %s", string(body)) + + if resp.StatusCode != http.StatusOK { + t.Fatalf("Expected 200 OK from /v1/config, got %d", resp.StatusCode) + } +} + +// writeDremioConfig writes a minimal dremio.conf. Iceberg REST catalog sources +// are registered after Dremio starts via POST /api/v3/catalog. +func (env *TestEnvironment) writeDremioConfig(t *testing.T, warehouseBucket string) string { + t.Helper() + _ = warehouseBucket + + configDir := filepath.Join(env.dataDir, "dremio") + if err := os.MkdirAll(configDir, 0755); err != nil { + t.Fatalf("Failed to create Dremio config dir: %v", err) + } + + if err := os.WriteFile(filepath.Join(configDir, "dremio.conf"), []byte("{}\n"), 0644); err != nil { + t.Fatalf("Failed to write Dremio config: %v", err) + } + + return configDir +} + +// startDremioContainer starts a Dremio Docker container with the given configuration. +// configDir's dremio.conf is bind-mounted as a single file so Dremio's default +// log4j2.properties, dremio-env, and distrib.conf in /opt/dremio/conf remain +// in place. +func (env *TestEnvironment) startDremioContainer(t *testing.T, configDir string) { + t.Helper() + + containerName := "seaweed-dremio-" + randomString(8) + env.dremioContainer = containerName + + cmd := exec.Command("docker", "run", "-d", + "--name", containerName, + "--add-host", "host.docker.internal:host-gateway", + "-v", fmt.Sprintf("%s/dremio.conf:/opt/dremio/conf/dremio.conf:ro", configDir), + "-e", "AWS_ACCESS_KEY_ID="+env.accessKey, + "-e", "AWS_SECRET_ACCESS_KEY="+env.secretKey, + "-e", "AWS_REGION=us-west-2", + "-e", "DREMIO_MAX_HEAP_MEMORY_SIZE_MB=2048", + "-e", "DREMIO_MAX_DIRECT_MEMORY_SIZE_MB=2048", + "-e", "DREMIO_JAVA_EXTRA_OPTS=-Ddremio.debug.sysopt.plugins.restcatalog.enabled=true", + dremioImage, + ) + if output, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("Failed to start Dremio container: %v\n%s", err, string(output)) + } +} + +// dremioContainerLogs returns up to ~200 tail lines from the Dremio container, +// useful for diagnosing startup crashes. +func dremioContainerLogs(containerName string) string { + cmd := exec.Command("docker", "logs", "--tail", "200", containerName) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Sprintf("(failed to fetch docker logs: %v)\n%s", err, string(output)) + } + return string(output) +} + +// waitForDremio waits for Dremio container to be ready by polling its health endpoint. +func waitForDremio(t *testing.T, containerName string, timeout time.Duration) { + t.Helper() + + deadline := time.Now().Add(timeout) + var lastOutput []byte + for time.Now().Before(deadline) { + cmd := exec.Command("docker", "exec", containerName, + "curl", "-fsS", "-o", "/dev/null", "http://localhost:9047/api/v2/ping", + ) + if output, err := cmd.CombinedOutput(); err == nil { + return + } else { + lastOutput = output + outputStr := string(output) + if strings.Contains(outputStr, "No such container") || + strings.Contains(outputStr, "is not running") { + break + } + } + time.Sleep(2 * time.Second) + } + + cmd := exec.Command("docker", "exec", containerName, "curl", "-I", "http://localhost:9047") + if err := cmd.Run(); err == nil { + time.Sleep(5 * time.Second) + return + } + + t.Fatalf("Timed out waiting for Dremio to be ready\nLast output:\n%s\nContainer logs:\n%s", + string(lastOutput), dremioContainerLogs(containerName)) +} + +func (env *TestEnvironment) bootstrapDremio(t *testing.T, warehouseBucket string) { + t.Helper() + + env.createDremioAdminUser(t) + env.dremioToken = env.loginDremio(t) + env.createDremioIcebergSource(t, warehouseBucket) +} + +func (env *TestEnvironment) createDremioAdminUser(t *testing.T) { + t.Helper() + + payload := map[string]any{ + "userName": dremioAdminUser, + "firstName": "Seaweed", + "lastName": "Admin", + "email": "seaweed-admin@example.com", + "createdAt": time.Now().UnixMilli(), + "password": dremioAdminPassword, + } + + deadline := time.Now().Add(90 * time.Second) + var last string + for time.Now().Before(deadline) { + status, body, err := env.dremioRequest(http.MethodPut, "/apiv2/bootstrap/firstuser", "_dremionull", payload) + if err == nil && (status == http.StatusOK || status == http.StatusCreated || status == http.StatusConflict || + (status == http.StatusBadRequest && strings.Contains(strings.ToLower(body), "already"))) { + return + } + last = fmt.Sprintf("status=%d err=%v body=%s", status, err, body) + time.Sleep(2 * time.Second) + } + + t.Fatalf("Failed to create Dremio admin user\nLast response: %s\nContainer logs:\n%s", + last, dremioContainerLogs(env.dremioContainer)) +} + +func (env *TestEnvironment) loginDremio(t *testing.T) string { + t.Helper() + + payload := map[string]string{ + "userName": dremioAdminUser, + "password": dremioAdminPassword, + } + + deadline := time.Now().Add(90 * time.Second) + var last string + for time.Now().Before(deadline) { + status, body, err := env.dremioRequest(http.MethodPost, "/apiv2/login", "", payload) + if err == nil && status == http.StatusOK { + var response struct { + Token string `json:"token"` + } + if err := json.Unmarshal([]byte(body), &response); err != nil { + t.Fatalf("Failed to decode Dremio login response: %v\nBody: %s", err, body) + } + if response.Token == "" { + t.Fatalf("Dremio login returned empty token\nBody: %s", body) + } + return response.Token + } + last = fmt.Sprintf("status=%d err=%v body=%s", status, err, body) + time.Sleep(2 * time.Second) + } + + t.Fatalf("Failed to log in to Dremio\nLast response: %s\nContainer logs:\n%s", + last, dremioContainerLogs(env.dremioContainer)) + return "" +} + +func (env *TestEnvironment) createDremioIcebergSource(t *testing.T, warehouseBucket string) { + t.Helper() + + s3Endpoint := fmt.Sprintf("host.docker.internal:%d", env.s3Port) + source := map[string]any{ + "entityType": "source", + "name": dremioSourceName, + "type": "RESTCATALOG", + "config": map[string]any{ + "restEndpointUri": fmt.Sprintf("http://host.docker.internal:%d", env.icebergPort), + "enableAsync": true, + "isCachingEnabled": false, + "maxCacheSpacePct": 100, + "isRecursiveAllowedNamespaces": true, + "propertyList": []map[string]string{ + dremioProperty("warehouse", "s3://"+warehouseBucket), + dremioProperty("scope", "PRINCIPAL_ROLE:ALL"), + dremioProperty("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"), + dremioProperty("fs.s3a.endpoint", s3Endpoint), + dremioProperty("fs.s3a.path.style.access", "true"), + dremioProperty("fs.s3a.connection.ssl.enabled", "false"), + dremioProperty("fs.s3a.endpoint.region", "us-west-2"), + dremioProperty("dremio.s3.compat", "true"), + dremioProperty("dremio.s3.region", "us-west-2"), + dremioProperty("dremio.bucket.discovery.enabled", "false"), + dremioProperty("fs.s3a.audit.enabled", "false"), + dremioProperty("fs.s3a.create.file-status-check", "false"), + }, + "secretPropertyList": []map[string]string{ + dremioProperty("fs.s3a.access.key", env.accessKey), + dremioProperty("fs.s3a.secret.key", env.secretKey), + dremioProperty("credential", env.accessKey+":"+env.secretKey), + }, + }, + } + + status, body, err := env.dremioRequest(http.MethodPost, "/api/v3/catalog", env.dremioAuthHeader(), source) + if err != nil { + t.Fatalf("Failed to create Dremio Iceberg source: %v\nBody: %s", err, body) + } + if status == http.StatusConflict { + return + } + if status != http.StatusOK { + t.Fatalf("Unexpected status creating Dremio Iceberg source: status=%d body=%s\nContainer logs:\n%s", + status, body, dremioContainerLogs(env.dremioContainer)) + } + + var response struct { + State struct { + Status string `json:"status"` + Messages []string `json:"messages"` + } `json:"state"` + } + if err := json.Unmarshal([]byte(body), &response); err == nil && strings.EqualFold(response.State.Status, "bad") { + t.Fatalf("Dremio Iceberg source was created in bad state: %v\nBody: %s\nContainer logs:\n%s", + response.State.Messages, body, dremioContainerLogs(env.dremioContainer)) + } +} + +func dremioProperty(name, value string) map[string]string { + return map[string]string{"name": name, "value": value} +} + +func (env *TestEnvironment) dremioAuthHeader() string { + return "_dremio" + env.dremioToken +} + +func (env *TestEnvironment) dremioRequest(method, path, authHeader string, payload any) (int, string, error) { + args := []string{"exec", "-i", env.dremioContainer, + "curl", "-sS", "--max-time", "60", "-X", method, + "-H", "Accept: application/json", + "-w", "\n%{http_code}", + } + if authHeader != "" { + args = append(args, "-H", "Authorization: "+authHeader) + } + + var payloadBytes []byte + var err error + if payload != nil { + payloadBytes, err = json.Marshal(payload) + if err != nil { + return 0, "", fmt.Errorf("marshal payload: %w", err) + } + args = append(args, "-H", "Content-Type: application/json", "--data-binary", "@-") + } + args = append(args, "http://localhost:9047"+path) + + cmd := exec.Command("docker", args...) + if payload != nil { + cmd.Stdin = bytes.NewReader(payloadBytes) + } + + outputBytes, err := cmd.CombinedOutput() + output := strings.TrimRight(string(outputBytes), "\n") + idx := strings.LastIndex(output, "\n") + if idx < 0 { + if err != nil { + return 0, output, err + } + return 0, output, fmt.Errorf("curl output did not include HTTP status") + } + + status, parseErr := strconv.Atoi(strings.TrimSpace(output[idx+1:])) + body := output[:idx] + if parseErr != nil { + return 0, body, fmt.Errorf("parse HTTP status %q: %w", output[idx+1:], parseErr) + } + if err != nil { + return status, body, err + } + return status, body, nil +} + +// runDremioSQL submits SQL, polls the Dremio job until completion, and returns +// the job results JSON. +func runDremioSQL(t *testing.T, env *TestEnvironment, sql string) string { + t.Helper() + + status, body, err := env.dremioRequest(http.MethodPost, "/api/v3/sql", env.dremioAuthHeader(), map[string]string{"sql": sql}) + if err != nil { + t.Fatalf("Dremio SQL submit failed: %v\nSQL: %s\nBody: %s", err, sql, body) + } + if status != http.StatusOK { + t.Fatalf("Dremio SQL submit returned status %d\nSQL: %s\nBody: %s\nContainer logs:\n%s", + status, sql, body, dremioContainerLogs(env.dremioContainer)) + } + + var response struct { + ID string `json:"id"` + ErrorMessage string `json:"errorMessage"` + } + if err := json.Unmarshal([]byte(body), &response); err != nil { + t.Fatalf("Failed to decode Dremio SQL response: %v\nSQL: %s\nBody: %s", err, sql, body) + } + if response.ErrorMessage != "" { + t.Fatalf("Dremio SQL submit returned error: %s\nSQL: %s\nBody: %s", response.ErrorMessage, sql, body) + } + if response.ID == "" { + t.Fatalf("Dremio SQL response did not include a job id\nSQL: %s\nBody: %s", sql, body) + } + + env.waitForDremioJob(t, response.ID, sql) + + resultsPath := fmt.Sprintf("/api/v3/job/%s/results?limit=500", url.PathEscape(response.ID)) + status, body, err = env.dremioRequest(http.MethodGet, resultsPath, env.dremioAuthHeader(), nil) + if err != nil { + t.Fatalf("Dremio job results request failed: %v\nSQL: %s\nBody: %s", err, sql, body) + } + if status != http.StatusOK { + t.Fatalf("Dremio job results returned status %d\nSQL: %s\nBody: %s", status, sql, body) + } + return strings.TrimSpace(body) +} + +func (env *TestEnvironment) waitForDremioJob(t *testing.T, jobID, sql string) { + t.Helper() + + deadline := time.Now().Add(3 * time.Minute) + var last string + for time.Now().Before(deadline) { + status, body, err := env.dremioRequest(http.MethodGet, "/api/v3/job/"+url.PathEscape(jobID), env.dremioAuthHeader(), nil) + if err != nil { + last = fmt.Sprintf("status=%d err=%v body=%s", status, err, body) + time.Sleep(1 * time.Second) + continue + } + if status != http.StatusOK { + last = fmt.Sprintf("status=%d body=%s", status, body) + time.Sleep(1 * time.Second) + continue + } + + var job struct { + JobState string `json:"jobState"` + ErrorMessage string `json:"errorMessage"` + } + if err := json.Unmarshal([]byte(body), &job); err != nil { + t.Fatalf("Failed to decode Dremio job response: %v\nSQL: %s\nBody: %s", err, sql, body) + } + + switch job.JobState { + case "COMPLETED": + return + case "FAILED", "CANCELED": + t.Fatalf("Dremio job %s ended in %s\nSQL: %s\nError: %s\nBody: %s\nContainer logs:\n%s", + jobID, job.JobState, sql, job.ErrorMessage, body, dremioContainerLogs(env.dremioContainer)) + default: + last = body + time.Sleep(1 * time.Second) + } + } + + t.Fatalf("Timed out waiting for Dremio job %s\nSQL: %s\nLast response: %s\nContainer logs:\n%s", + jobID, sql, last, dremioContainerLogs(env.dremioContainer)) +} + +// parseDremioResponse parses the JSON response from Dremio and extracts rows. +func parseDremioResponse(t *testing.T, output string) [][]interface{} { + t.Helper() + + var response map[string]interface{} + decoder := json.NewDecoder(strings.NewReader(output)) + decoder.UseNumber() + if err := decoder.Decode(&response); err != nil { + t.Fatalf("Failed to parse Dremio response as JSON: %v\nResponse: %s", err, output) + } + + if errMsg, ok := response["errorMessage"]; ok && errMsg != "" { + t.Fatalf("Dremio returned an error: %v", errMsg) + } + + var schemaNames []string + if schema, ok := response["schema"].([]interface{}); ok { + for _, field := range schema { + fieldMap, ok := field.(map[string]interface{}) + if !ok { + continue + } + name, ok := fieldMap["name"].(string) + if ok { + schemaNames = append(schemaNames, name) + } + } + } + + rows, ok := response["rows"].([]interface{}) + if !ok { + t.Fatalf("Dremio response does not contain 'rows' field: %s", output) + } + + var result [][]interface{} + for _, row := range rows { + switch rowData := row.(type) { + case []interface{}: + result = append(result, rowData) + case map[string]interface{}: + values := make([]interface{}, 0, len(rowData)) + if len(schemaNames) > 0 { + for _, name := range schemaNames { + values = append(values, rowData[name]) + } + } else { + keys := make([]string, 0, len(rowData)) + for key := range rowData { + keys = append(keys, key) + } + sort.Strings(keys) + for _, key := range keys { + values = append(values, rowData[key]) + } + } + result = append(result, values) + } + } + return result +} + +func assertSingleNumericValue(t *testing.T, output string, expected float64) { + t.Helper() + + rows := parseDremioResponse(t, output) + if len(rows) != 1 || len(rows[0]) != 1 { + t.Fatalf("Expected one row with one value, got: %v\nOutput: %s", rows, output) + } + + var got float64 + switch value := rows[0][0].(type) { + case float64: + got = value + case json.Number: + parsed, err := value.Float64() + if err != nil { + t.Fatalf("Expected numeric value, got %v", rows[0][0]) + } + got = parsed + default: + t.Fatalf("Expected numeric value, got %T: %v", rows[0][0], rows[0][0]) + } + if got != expected { + t.Fatalf("Expected numeric value %v, got %v\nOutput: %s", expected, got, output) + } +} + +func requestIcebergOAuthToken(t *testing.T, env *TestEnvironment) string { + t.Helper() + + resp, err := http.PostForm(fmt.Sprintf("http://%s:%d/v1/oauth/tokens", env.bindIP, env.icebergPort), url.Values{ + "grant_type": {"client_credentials"}, + "client_id": {env.accessKey}, + "client_secret": {env.secretKey}, + }) + if err != nil { + t.Fatalf("POST /v1/oauth/tokens: %v", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + t.Fatalf("OAuth token request failed: status=%d body=%s", resp.StatusCode, body) + } + + var tokenResp struct { + AccessToken string `json:"access_token"` + TokenType string `json:"token_type"` + } + if err := json.Unmarshal(body, &tokenResp); err != nil { + t.Fatalf("decode token response: %v", err) + } + if tokenResp.AccessToken == "" { + t.Fatal("got empty access_token") + } + if tokenResp.TokenType != "bearer" { + t.Fatalf("expected token_type=bearer, got %s", tokenResp.TokenType) + } + return tokenResp.AccessToken +} + +func createIcebergNamespace(t *testing.T, env *TestEnvironment, token, bucketName, namespace string) { + t.Helper() + + doIcebergJSONRequest(t, env, token, http.MethodPost, fmt.Sprintf("/v1/%s/namespaces", url.PathEscape(bucketName)), map[string]any{ + "namespace": []string{namespace}, + }, http.StatusOK, http.StatusConflict) +} + +func createIcebergTable(t *testing.T, env *TestEnvironment, token, bucketName, namespace, tableName string) { + t.Helper() + + doIcebergJSONRequest(t, env, token, http.MethodPost, + fmt.Sprintf("/v1/%s/namespaces/%s/tables", url.PathEscape(bucketName), url.PathEscape(namespace)), + map[string]any{ + "name": tableName, + "schema": map[string]any{ + "type": "struct", + "schema-id": 0, + "fields": []map[string]any{ + {"id": 1, "name": "id", "required": true, "type": "long"}, + {"id": 2, "name": "label", "required": false, "type": "string"}, + }, + }, + }, http.StatusOK) +} + +func doIcebergJSONRequest(t *testing.T, env *TestEnvironment, token, method, path string, payload any, expectedStatuses ...int) string { + t.Helper() + + var body io.Reader + if payload != nil { + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal Iceberg request: %v", err) + } + body = bytes.NewReader(payloadBytes) + } + + req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", env.bindIP, env.icebergPort, path), body) + if err != nil { + t.Fatalf("create Iceberg request: %v", err) + } + req.Header.Set("Authorization", "Bearer "+token) + if payload != nil { + req.Header.Set("Content-Type", "application/json") + } + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Iceberg request failed: %v", err) + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + for _, expectedStatus := range expectedStatuses { + if resp.StatusCode == expectedStatus { + return string(respBody) + } + } + t.Fatalf("Iceberg request returned unexpected status %d, want %v\nPath: %s\nBody: %s", + resp.StatusCode, expectedStatuses, path, respBody) + return "" +} + +func dremioObjectName(parts ...string) string { + quoted := make([]string, 0, len(parts)) + for _, part := range parts { + quoted = append(quoted, `"`+strings.ReplaceAll(part, `"`, `""`)+`"`) + } + return strings.Join(quoted, ".") +} + +// createTableBucket creates an S3 table bucket using `weed shell`, which +// talks to the filer over gRPC and bypasses S3 SigV4 auth (the test runs +// with IAM enabled). The master address must use `host:port.grpcPort` +// (dot, not colon). +func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + cmd := exec.CommandContext(ctx, env.weedBinary, "shell", + fmt.Sprintf("-master=%s:%d.%d", env.bindIP, env.masterPort, env.masterGrpcPort), + ) + cmd.Stdin = strings.NewReader(fmt.Sprintf("s3tables.bucket -create -name %s -account 000000000000\nexit\n", bucketName)) + output, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("Failed to create table bucket %s via weed shell: %v\nOutput: %s", bucketName, err, string(output)) + } + t.Logf("Created table bucket: %s", bucketName) +} + +func requireDremioRuntime(t *testing.T) { + t.Helper() + + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + if !hasDocker() { + t.Skip("Docker not available, skipping Dremio integration test") + } +} + +// hasDocker checks if Docker is available in the system. +func hasDocker() bool { + cmd := exec.Command("docker", "version") + return cmd.Run() == nil +} + +// randomString generates a random string of the specified length. +func randomString(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, length) + if _, err := rand.Read(b); err != nil { + panic("failed to generate random string: " + err.Error()) + } + for i := range b { + b[i] = charset[int(b[i])%len(charset)] + } + return string(b) +}