diff --git a/Makefile b/Makefile index 323a2c1e..b0d7c086 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ BINARY_NAME := sylve +BACKUP_BINARY_NAME := sylve-backup BIN_DIR := bin .PHONY: all build clean run depcheck @@ -11,6 +12,7 @@ build: build-depcheck cp -rf web/build/* internal/assets/web-files mkdir -p $(BIN_DIR) go build -o $(BIN_DIR)/$(BINARY_NAME) cmd/sylve/main.go + go build -o $(BIN_DIR)/$(BACKUP_BINARY_NAME) cmd/sylve-backup/main.go clean: rm -rf $(BIN_DIR) diff --git a/backup.config.example.json b/backup.config.example.json new file mode 100644 index 00000000..5315826c --- /dev/null +++ b/backup.config.example.json @@ -0,0 +1,10 @@ +{ + "logLevel": 1, + "dataPath": "./data-backup", + "listenPort": 7444, + "clusterKey": "replace-with-your-cluster-key", + "tlsConfig": { + "certFile": "", + "keyFile": "" + } +} diff --git a/cmd/sylve-backup/main.go b/cmd/sylve-backup/main.go new file mode 100644 index 00000000..57bb6f7d --- /dev/null +++ b/cmd/sylve-backup/main.go @@ -0,0 +1,274 @@ +// SPDX-License-Identifier: BSD-2-Clause +// +// Copyright (c) 2025 The FreeBSD Foundation. +// +// This software was developed by Hayzam Sherif +// of Alchemilla Ventures Pvt. Ltd. , +// under sponsorship from the FreeBSD Foundation. + +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/alchemillahq/gzfs" + "github.com/alchemillahq/sylve/internal" + "github.com/alchemillahq/sylve/internal/cmd" + "github.com/alchemillahq/sylve/internal/config" + "github.com/alchemillahq/sylve/internal/db" + "github.com/alchemillahq/sylve/internal/logger" + "github.com/alchemillahq/sylve/internal/services/auth" + "github.com/alchemillahq/sylve/internal/services/cluster" + "github.com/alchemillahq/sylve/internal/services/replication" + sysU "github.com/alchemillahq/sylve/pkg/system" + "gorm.io/gorm" +) + +const defaultBackupConfigPath = "./backup.config.json" + +func main() { + cmd.AsciiArt() + + command, args := parseCommand(os.Args[1:]) + + switch command { + case "serve": + if !sysU.IsRoot() { + logger.BootstrapFatal("Root privileges required for backup target mode") + } + runServe(args) + case "datasets": + runDatasets(args) + case "status": + runStatus(args) + case "pull": + if !sysU.IsRoot() { + logger.BootstrapFatal("Root privileges required for pull mode") + } + runPull(args) + case "help", "-h", "--help": + printUsage() + default: + fmt.Fprintf(os.Stderr, "unknown command: %s\n\n", command) + printUsage() + os.Exit(1) + } +} + +func parseCommand(args []string) (string, []string) { + if len(args) == 0 { + return "serve", args + } + + first := args[0] + if strings.HasPrefix(first, "-") { + return "serve", args + } + + return first, args[1:] +} + +func runServe(args []string) { + fs := flag.NewFlagSet("serve", flag.ExitOnError) + configPath := fs.String("config", defaultBackupConfigPath, "path to backup config file") + listenPort := fs.Int("listen-port", 0, "override listen port from backup config") + fs.Parse(args) + + runtime, err := newRuntime(*configPath, *listenPort) + if err != nil { + logger.BootstrapFatal(err.Error()) + } + defer runtime.close() + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + logger.L.Info().Int("listen_port", runtime.cfg.ListenPort).Msg("Starting backup replication target") + + if err := runtime.replication.RunStandalone(ctx, runtime.cfg.ListenPort); err != nil { + logger.L.Fatal().Err(err).Msg("backup_replication_target_failed") + } +} + +func runDatasets(args []string) { + fs := flag.NewFlagSet("datasets", flag.ExitOnError) + configPath := fs.String("config", defaultBackupConfigPath, "path to backup config file") + target := fs.String("target", "", "target endpoint host:port") + prefix := fs.String("prefix", "", "optional dataset prefix filter") + fs.Parse(args) + + if *target == "" { + logger.BootstrapFatal("target is required") + } + + runtime, err := newRuntime(*configPath, 0) + if err != nil { + logger.BootstrapFatal(err.Error()) + } + defer runtime.close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + datasets, err := runtime.replication.ListTargetDatasets(ctx, *target, *prefix) + if err != nil { + logger.BootstrapFatal(err.Error()) + } + + printJSON(datasets) +} + +func runStatus(args []string) { + fs := flag.NewFlagSet("status", flag.ExitOnError) + configPath := fs.String("config", defaultBackupConfigPath, "path to backup config file") + target := fs.String("target", "", "target endpoint host:port") + limit := fs.Int("limit", 50, "max number of events") + fs.Parse(args) + + if *target == "" { + logger.BootstrapFatal("target is required") + } + + runtime, err := newRuntime(*configPath, 0) + if err != nil { + logger.BootstrapFatal(err.Error()) + } + defer runtime.close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + events, err := runtime.replication.ListTargetStatus(ctx, *target, *limit) + if err != nil { + logger.BootstrapFatal(err.Error()) + } + + printJSON(events) +} + +func runPull(args []string) { + fs := flag.NewFlagSet("pull", flag.ExitOnError) + configPath := fs.String("config", defaultBackupConfigPath, "path to backup config file") + target := fs.String("target", "", "backup target endpoint host:port") + sourceDataset := fs.String("source-dataset", "", "source dataset on target") + destinationDataset := fs.String("destination-dataset", "", "destination dataset on local host") + snapshot := fs.String("snapshot", "", "specific snapshot to pull (optional)") + force := fs.Bool("force", false, "force zfs recv rollback") + withIntermediates := fs.Bool("with-intermediates", false, "use -I incremental stream when possible") + fs.Parse(args) + + if *target == "" || *sourceDataset == "" || *destinationDataset == "" { + logger.BootstrapFatal("target, source-dataset, and destination-dataset are required") + } + + runtime, err := newRuntime(*configPath, 0) + if err != nil { + logger.BootstrapFatal(err.Error()) + } + defer runtime.close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + plan, err := runtime.replication.PullDatasetFromNode( + ctx, + *sourceDataset, + *destinationDataset, + *target, + *snapshot, + *force, + *withIntermediates, + ) + if err != nil { + logger.BootstrapFatal(err.Error()) + } + + printJSON(plan) +} + +type backupRuntime struct { + cfg *config.BackupConfig + db *gorm.DB + replication *replication.Service +} + +func newRuntime(configPath string, listenPortOverride int) (*backupRuntime, error) { + cfg, err := config.ParseBackupConfig(configPath) + if err != nil { + return nil, err + } + + if listenPortOverride > 0 { + cfg.ListenPort = listenPortOverride + } + + config.ParsedConfig = &internal.SylveConfig{ + DataPath: cfg.DataPath, + LogLevel: cfg.LogLevel, + TLS: internal.TLSConfig{ + CertFile: cfg.TLS.CertFile, + KeyFile: cfg.TLS.KeyFile, + }, + } + + logger.InitLogger(cfg.DataPath, cfg.LogLevel) + + database, err := db.SetupBackupDatabase(cfg.DataPath, cfg.ClusterKey) + if err != nil { + return nil, err + } + + authService := auth.NewAuthService(database) + clusterService, ok := cluster.NewClusterService(database, authService).(*cluster.Service) + if !ok { + return nil, fmt.Errorf("failed_to_create_cluster_service") + } + + gz := gzfs.NewClient(gzfs.Options{ + Sudo: false, + ZDBCacheTTLSeconds: 0, + }) + + repl := replication.NewService(database, authService, gz, clusterService) + + return &backupRuntime{ + cfg: cfg, + db: database, + replication: repl, + }, nil +} + +func (r *backupRuntime) close() { + if r == nil || r.db == nil { + return + } + + sqlDB, err := r.db.DB() + if err == nil { + _ = sqlDB.Close() + } +} + +func printJSON(v any) { + b, err := json.MarshalIndent(v, "", " ") + if err != nil { + logger.BootstrapFatal(err.Error()) + } + fmt.Println(string(b)) +} + +func printUsage() { + fmt.Println("sylve-backup usage:") + fmt.Println(" sylve-backup serve [--config backup.config.json] [--listen-port 7444]") + fmt.Println(" sylve-backup datasets --target host:port [--prefix pool/path] [--config backup.config.json]") + fmt.Println(" sylve-backup status --target host:port [--limit 50] [--config backup.config.json]") + fmt.Println(" sylve-backup pull --target host:port --source-dataset src --destination-dataset dst [--snapshot snap] [--force] [--with-intermediates] [--config backup.config.json]") +} diff --git a/internal/config/backup.go b/internal/config/backup.go new file mode 100644 index 00000000..54a8269d --- /dev/null +++ b/internal/config/backup.go @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: BSD-2-Clause +// +// Copyright (c) 2025 The FreeBSD Foundation. +// +// This software was developed by Hayzam Sherif +// of Alchemilla Ventures Pvt. Ltd. , +// under sponsorship from the FreeBSD Foundation. + +package config + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" +) + +type BackupConfig struct { + LogLevel int8 `json:"logLevel"` + DataPath string `json:"dataPath"` + ListenPort int `json:"listenPort"` + ClusterKey string `json:"clusterKey"` + TLS struct { + CertFile string `json:"certFile"` + KeyFile string `json:"keyFile"` + } `json:"tlsConfig"` +} + +func ParseBackupConfig(path string) (*BackupConfig, error) { + file, err := os.Open(path) + if err != nil { + return nil, err + } + defer file.Close() + + cfg := &BackupConfig{} + if err := json.NewDecoder(file).Decode(cfg); err != nil { + return nil, err + } + + if cfg.ListenPort <= 0 || cfg.ListenPort > 65535 { + return nil, fmt.Errorf("invalid_listen_port") + } + + if cfg.DataPath == "" { + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + cfg.DataPath = filepath.Join(cwd, "data-backup") + } + + if err := os.MkdirAll(cfg.DataPath, 0755); err != nil { + return nil, err + } + + return cfg, nil +} diff --git a/internal/db/backup.go b/internal/db/backup.go new file mode 100644 index 00000000..82e974e1 --- /dev/null +++ b/internal/db/backup.go @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: BSD-2-Clause +// +// Copyright (c) 2025 The FreeBSD Foundation. +// +// This software was developed by Hayzam Sherif +// of Alchemilla Ventures Pvt. Ltd. , +// under sponsorship from the FreeBSD Foundation. + +package db + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/alchemillahq/sylve/internal/db/models" + clusterModels "github.com/alchemillahq/sylve/internal/db/models/cluster" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + gormLogger "gorm.io/gorm/logger" +) + +func SetupBackupDatabase(dataPath, clusterKey string) (*gorm.DB, error) { + if dataPath == "" { + return nil, fmt.Errorf("backup_data_path_required") + } + + if err := os.MkdirAll(dataPath, 0755); err != nil { + return nil, fmt.Errorf("create_backup_data_path: %w", err) + } + + dbPath := filepath.Join(dataPath, "sylve-backup.db") + db, err := gorm.Open(sqlite.Open(dbPath), &gorm.Config{ + Logger: gormLogger.Default.LogMode(gormLogger.Warn), + TranslateError: true, + }) + if err != nil { + return nil, fmt.Errorf("open_backup_db: %w", err) + } + + sqlDB, err := db.DB() + if err != nil { + return nil, fmt.Errorf("backup_sql_handle: %w", err) + } + sqlDB.SetMaxOpenConns(1) + sqlDB.SetMaxIdleConns(1) + + db.Exec("PRAGMA foreign_keys = OFF") + db.Exec("PRAGMA busy_timeout = 5000") + db.Exec("PRAGMA journal_mode = WAL") + db.Exec("PRAGMA synchronous = NORMAL") + + if err := db.AutoMigrate( + &models.SystemSecrets{}, + &clusterModels.Cluster{}, + &clusterModels.BackupReplicationEvent{}, + ); err != nil { + return nil, fmt.Errorf("migrate_backup_db: %w", err) + } + + var c clusterModels.Cluster + if err := db.Order("id ASC").First(&c).Error; err != nil { + if !errors.Is(err, gorm.ErrRecordNotFound) { + return nil, fmt.Errorf("load_backup_cluster_record: %w", err) + } + + c = clusterModels.Cluster{ + Enabled: false, + Key: clusterKey, + RaftBootstrap: nil, + RaftIP: "", + RaftPort: 0, + } + if err := db.Create(&c).Error; err != nil { + return nil, fmt.Errorf("init_backup_cluster_record: %w", err) + } + } else if clusterKey != "" && c.Key != clusterKey { + if err := db.Model(&c).Update("key", clusterKey).Error; err != nil { + return nil, fmt.Errorf("update_backup_cluster_key: %w", err) + } + } + + db.Exec("PRAGMA foreign_keys = ON") + return db, nil +} diff --git a/internal/db/db.go b/internal/db/db.go index ab9b764d..0463f9af 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -122,6 +122,7 @@ func SetupDatabase(cfg *internal.SylveConfig, isTest bool) *gorm.DB { &clusterModels.ClusterNode{}, &clusterModels.ClusterOption{}, &clusterModels.ClusterNote{}, + &clusterModels.BackupReplicationEvent{}, &models.Migrations{}, ) diff --git a/internal/db/models/cluster/backup.go b/internal/db/models/cluster/backup.go index 0d96f0b1..bc300885 100644 --- a/internal/db/models/cluster/backup.go +++ b/internal/db/models/cluster/backup.go @@ -7,3 +7,22 @@ // under sponsorship from the FreeBSD Foundation. package clusterModels + +import "time" + +type BackupReplicationEvent struct { + ID uint `gorm:"primaryKey" json:"id"` + Direction string `gorm:"index" json:"direction"` + RemoteAddress string `json:"remoteAddress"` + SourceDataset string `json:"sourceDataset"` + DestinationDataset string `json:"destinationDataset"` + BaseSnapshot string `json:"baseSnapshot"` + TargetSnapshot string `json:"targetSnapshot"` + Mode string `json:"mode"` + Status string `gorm:"index" json:"status"` + Error string `gorm:"type:text" json:"error"` + StartedAt time.Time `gorm:"index" json:"startedAt"` + CompletedAt *time.Time `json:"completedAt"` + CreatedAt time.Time `gorm:"autoCreateTime" json:"createdAt"` + UpdatedAt time.Time `gorm:"autoUpdateTime" json:"updatedAt"` +} diff --git a/internal/services/replication/replicate.go b/internal/services/replication/replicate.go index bd907a23..78aad4af 100644 --- a/internal/services/replication/replicate.go +++ b/internal/services/replication/replicate.go @@ -143,6 +143,207 @@ func (s *Service) ReplicateDatasetToNode( return plan, nil } +func (s *Service) PullDatasetFromNode( + ctx context.Context, + srcDataset string, + dstDataset string, + target string, + targetSnapshot string, + force bool, + withIntermediates bool, +) (*Plan, error) { + if srcDataset == "" || dstDataset == "" || target == "" { + return nil, fmt.Errorf("src_dataset_dst_dataset_and_target_are_required") + } + + endpoint, err := s.resolvePeerEndpoint(target) + if err != nil { + return nil, err + } + + remoteSnaps, err := s.fetchRemoteSnapshots(ctx, endpoint, srcDataset) + if err != nil { + return nil, err + } + if len(remoteSnaps) == 0 { + return nil, fmt.Errorf("no_remote_snapshots") + } + + targetName := normalizeSnapshotName(srcDataset, targetSnapshot) + if targetName == "" { + targetName = remoteSnaps[len(remoteSnaps)-1].Name + } + + targetIndex := -1 + for i, snap := range remoteSnaps { + if snap.Name == targetName { + targetIndex = i + break + } + } + if targetIndex == -1 { + return nil, fmt.Errorf("target_snapshot_not_found") + } + + localSnaps, err := s.GZFS.ZFS.ListByType(ctx, gzfs.DatasetTypeSnapshot, false, dstDataset) + if err != nil && !isDatasetMissingErr(err) { + return nil, fmt.Errorf("destination_snapshots: %w", err) + } + + localByGUID := make(map[string]*gzfs.Dataset, len(localSnaps)) + for _, snap := range localSnaps { + localByGUID[snap.GUID] = snap + } + + var baseLocal *gzfs.Dataset + for _, remoteSnap := range remoteSnaps[:targetIndex+1] { + if localSnap, ok := localByGUID[remoteSnap.GUID]; ok { + baseLocal = localSnap + } + } + + plan := &Plan{ + SourceDataset: srcDataset, + DestinationDataset: dstDataset, + Endpoint: endpoint, + TargetSnapshot: targetName, + } + + if baseLocal != nil { + plan.BaseSnapshot = baseLocal.Name + if baseLocal.Name == targetName { + plan.Mode = "noop" + plan.Noop = true + return plan, nil + } + } + + token, err := s.clusterToken() + if err != nil { + return nil, err + } + + conn, stream, err := s.openStream(ctx, endpoint, request{ + Version: 1, + Action: "send", + Token: token, + Dataset: srcDataset, + TargetSnapshot: targetName, + BaseSnapshot: plan.BaseSnapshot, + WithIntermediates: withIntermediates, + }) + if err != nil { + return nil, err + } + defer conn.CloseWithError(0, "done") + + if err := stream.Close(); err != nil { + return nil, err + } + + reader := bufio.NewReader(stream) + var resp response + if err := readJSONLine(reader, maxHeaderBytes, &resp); err != nil { + return nil, err + } + if !resp.OK { + return nil, errors.New(resp.Error) + } + if resp.TargetSnapshot != "" { + plan.TargetSnapshot = resp.TargetSnapshot + } + + if plan.BaseSnapshot == "" { + plan.Mode = "pull_full" + } else if withIntermediates { + plan.Mode = "pull_incremental_intermediates" + } else { + plan.Mode = "pull_incremental" + } + + if err := s.receiveStream(ctx, reader, dstDataset, force); err != nil { + return nil, err + } + + return plan, nil +} + +func (s *Service) ListTargetDatasets(ctx context.Context, target string, prefix string) ([]DatasetInfo, error) { + endpoint, err := s.resolvePeerEndpoint(target) + if err != nil { + return nil, err + } + + token, err := s.clusterToken() + if err != nil { + return nil, err + } + + conn, stream, err := s.openStream(ctx, endpoint, request{ + Version: 1, + Action: "datasets", + Token: token, + Prefix: prefix, + }) + if err != nil { + return nil, err + } + defer conn.CloseWithError(0, "done") + + if err := stream.Close(); err != nil { + return nil, err + } + + reader := bufio.NewReader(stream) + var resp response + if err := readJSONLine(reader, maxHeaderBytes, &resp); err != nil { + return nil, err + } + if !resp.OK { + return nil, errors.New(resp.Error) + } + + return resp.Datasets, nil +} + +func (s *Service) ListTargetStatus(ctx context.Context, target string, limit int) ([]ReplicationEventInfo, error) { + endpoint, err := s.resolvePeerEndpoint(target) + if err != nil { + return nil, err + } + + token, err := s.clusterToken() + if err != nil { + return nil, err + } + + conn, stream, err := s.openStream(ctx, endpoint, request{ + Version: 1, + Action: "status", + Token: token, + Limit: limit, + }) + if err != nil { + return nil, err + } + defer conn.CloseWithError(0, "done") + + if err := stream.Close(); err != nil { + return nil, err + } + + reader := bufio.NewReader(stream) + var resp response + if err := readJSONLine(reader, maxHeaderBytes, &resp); err != nil { + return nil, err + } + if !resp.OK { + return nil, errors.New(resp.Error) + } + + return resp.Events, nil +} + func (s *Service) fetchRemoteSnapshots(ctx context.Context, endpoint, dataset string) ([]SnapInfo, error) { token, err := s.clusterToken() if err != nil { @@ -311,3 +512,12 @@ func (s *Service) sendIncrementalWithIntermediates( return nil } + +func isDatasetMissingErr(err error) bool { + if err == nil { + return false + } + + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "dataset does not exist") || strings.Contains(msg, "not found") +} diff --git a/internal/services/replication/service.go b/internal/services/replication/service.go index 412f4645..db13707a 100644 --- a/internal/services/replication/service.go +++ b/internal/services/replication/service.go @@ -50,17 +50,25 @@ type Service struct { } type request struct { - Version int `json:"version"` - Action string `json:"action"` - Token string `json:"token"` - Dataset string `json:"dataset,omitempty"` - Force bool `json:"force,omitempty"` + Version int `json:"version"` + Action string `json:"action"` + Token string `json:"token"` + Dataset string `json:"dataset,omitempty"` + Prefix string `json:"prefix,omitempty"` + Limit int `json:"limit,omitempty"` + Force bool `json:"force,omitempty"` + BaseSnapshot string `json:"baseSnapshot,omitempty"` + TargetSnapshot string `json:"targetSnapshot,omitempty"` + WithIntermediates bool `json:"withIntermediates,omitempty"` } type response struct { - OK bool `json:"ok"` - Error string `json:"error,omitempty"` - Snapshots []SnapInfo `json:"snapshots,omitempty"` + OK bool `json:"ok"` + Error string `json:"error,omitempty"` + Snapshots []SnapInfo `json:"snapshots,omitempty"` + Datasets []DatasetInfo `json:"datasets,omitempty"` + Events []ReplicationEventInfo `json:"events,omitempty"` + TargetSnapshot string `json:"targetSnapshot,omitempty"` } type SnapInfo struct { @@ -69,6 +77,32 @@ type SnapInfo struct { CreateTXG string `json:"createtxg"` } +type DatasetInfo struct { + Name string `json:"name"` + GUID string `json:"guid"` + Type string `json:"type"` + CreationUnix int64 `json:"creationUnix"` + UsedBytes uint64 `json:"usedBytes"` + ReferencedBytes uint64 `json:"referencedBytes"` + AvailableBytes uint64 `json:"availableBytes"` + Mountpoint string `json:"mountpoint"` +} + +type ReplicationEventInfo struct { + ID uint `json:"id"` + Direction string `json:"direction"` + RemoteAddress string `json:"remoteAddress"` + SourceDataset string `json:"sourceDataset"` + DestinationDataset string `json:"destinationDataset"` + BaseSnapshot string `json:"baseSnapshot"` + TargetSnapshot string `json:"targetSnapshot"` + Mode string `json:"mode"` + Status string `json:"status"` + Error string `json:"error"` + StartedAt time.Time `json:"startedAt"` + CompletedAt *time.Time `json:"completedAt"` +} + type Plan struct { Mode string `json:"mode"` BaseSnapshot string `json:"baseSnapshot,omitempty"` @@ -111,25 +145,40 @@ func (s *Service) Run(ctx context.Context) { } } -func (s *Service) syncListener(ctx context.Context) error { - var c clusterModels.Cluster - if err := s.DB.First(&c).Error; err != nil { - if !errors.Is(err, gorm.ErrRecordNotFound) { - return err - } +func (s *Service) RunStandalone(ctx context.Context, port int) error { + if port <= 0 || port > 65535 { + return fmt.Errorf("invalid_listener_port") + } + if err := s.ensureListener(ctx, port); err != nil { return err } - shouldRun := c.Enabled && c.RaftPort > 0 - if !shouldRun { + <-ctx.Done() + return s.stopListener() +} + +func (s *Service) syncListener(ctx context.Context) error { + var c clusterModels.Cluster + if err := s.DB.Order("id ASC").First(&c).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return s.stopListener() + } + return err + } + + if !c.Enabled || c.RaftPort <= 0 { return s.stopListener() } + return s.ensureListener(ctx, c.RaftPort) +} + +func (s *Service) ensureListener(ctx context.Context, port int) error { s.mu.Lock() defer s.mu.Unlock() - if s.listener != nil && s.port == c.RaftPort { + if s.listener != nil && s.port == port { return nil } @@ -143,17 +192,17 @@ func (s *Service) syncListener(ctx context.Context) error { return err } - addr := fmt.Sprintf(":%d", c.RaftPort) + addr := fmt.Sprintf(":%d", port) listener, err := quic.ListenAddr(addr, tlsConf, nil) if err != nil { return err } s.listener = listener - s.port = c.RaftPort + s.port = port go s.acceptLoop(ctx, listener) - logger.L.Info().Int("udp_port", c.RaftPort).Msg("Replication QUIC Listener started") + logger.L.Info().Int("udp_port", port).Msg("Replication QUIC Listener started") return nil } @@ -185,17 +234,19 @@ func (s *Service) acceptLoop(ctx context.Context, listener *quic.Listener) { } func (s *Service) handleConn(conn *quic.Conn) { + remoteAddr := conn.RemoteAddr().String() + for { stream, err := conn.AcceptStream(context.Background()) if err != nil { return } - go s.handleStream(stream) + go s.handleStream(stream, remoteAddr) } } -func (s *Service) handleStream(stream *quic.Stream) { +func (s *Service) handleStream(stream *quic.Stream, remoteAddr string) { defer stream.Close() reader := bufio.NewReader(stream) @@ -235,18 +286,93 @@ func (s *Service) handleStream(stream *quic.Stream) { } _ = writeJSONLine(stream, response{OK: true, Snapshots: snaps}) + case "datasets": + datasets, err := s.listDatasets(context.Background(), req.Prefix) + if err != nil { + _ = writeJSONLine(stream, response{OK: false, Error: err.Error()}) + return + } + + _ = writeJSONLine(stream, response{OK: true, Datasets: datasets}) + case "status": + events, err := s.listReplicationEvents(req.Limit) + if err != nil { + _ = writeJSONLine(stream, response{OK: false, Error: err.Error()}) + return + } + + _ = writeJSONLine(stream, response{OK: true, Events: events}) case "receive": if req.Dataset == "" { _ = writeJSONLine(stream, response{OK: false, Error: "dataset_required"}) return } - if err := s.receiveStream(context.Background(), reader, req.Dataset, req.Force); err != nil { + event := s.beginReplicationEvent( + "receive", + remoteAddr, + "", + req.Dataset, + "", + "", + "push", + ) + + err := s.receiveStream(context.Background(), reader, req.Dataset, req.Force) + s.completeReplicationEvent(event, err) + if err != nil { _ = writeJSONLine(stream, response{OK: false, Error: err.Error()}) return } _ = writeJSONLine(stream, response{OK: true}) + case "send": + if req.Dataset == "" { + _ = writeJSONLine(stream, response{OK: false, Error: "dataset_required"}) + return + } + + targetSnapshot, err := s.resolveTargetSnapshot(context.Background(), req.Dataset, req.TargetSnapshot) + if err != nil { + _ = writeJSONLine(stream, response{OK: false, Error: err.Error()}) + return + } + + baseSnapshot := normalizeSnapshotName(req.Dataset, req.BaseSnapshot) + if baseSnapshot != "" && baseSnapshot == targetSnapshot { + _ = writeJSONLine(stream, response{OK: false, Error: "base_equals_target_snapshot"}) + return + } + + mode := "full" + if baseSnapshot != "" { + if req.WithIntermediates { + mode = "incremental_intermediates" + } else { + mode = "incremental" + } + } + + event := s.beginReplicationEvent( + "send", + remoteAddr, + req.Dataset, + "", + baseSnapshot, + targetSnapshot, + mode, + ) + + if err := writeJSONLine(stream, response{OK: true, TargetSnapshot: targetSnapshot}); err != nil { + s.completeReplicationEvent(event, err) + return + } + + err = s.sendDataset(context.Background(), targetSnapshot, baseSnapshot, req.WithIntermediates, stream) + s.completeReplicationEvent(event, err) + if err != nil { + logger.L.Warn().Err(err).Str("remote", remoteAddr).Msg("replication_send_failed") + } default: _ = writeJSONLine(stream, response{OK: false, Error: "unknown_action"}) } @@ -276,6 +402,212 @@ func (s *Service) listSnapshots(ctx context.Context, dataset string) ([]SnapInfo return out, nil } +func (s *Service) listDatasets(ctx context.Context, prefix string) ([]DatasetInfo, error) { + cmd := exec.CommandContext( + ctx, + "zfs", + "list", + "-H", + "-p", + "-o", + "name,guid,type,creation,used,refer,avail,mountpoint", + "-t", + "filesystem,volume", + ) + + var stderr bytes.Buffer + cmd.Stderr = &stderr + + out, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("zfs_list_datasets_failed: %w: %s", err, strings.TrimSpace(stderr.String())) + } + + lines := strings.Split(strings.TrimSpace(string(out)), "\n") + datasets := make([]DatasetInfo, 0, len(lines)) + + for _, line := range lines { + if strings.TrimSpace(line) == "" { + continue + } + + fields := strings.Split(line, "\t") + if len(fields) < 8 { + continue + } + + name := fields[0] + if prefix != "" && !strings.HasPrefix(name, prefix) { + continue + } + + creation, _ := strconv.ParseInt(fields[3], 10, 64) + used, _ := strconv.ParseUint(fields[4], 10, 64) + refer, _ := strconv.ParseUint(fields[5], 10, 64) + avail, _ := strconv.ParseUint(fields[6], 10, 64) + + datasets = append(datasets, DatasetInfo{ + Name: name, + GUID: fields[1], + Type: fields[2], + CreationUnix: creation, + UsedBytes: used, + ReferencedBytes: refer, + AvailableBytes: avail, + Mountpoint: fields[7], + }) + } + + sort.Slice(datasets, func(i, j int) bool { + return datasets[i].Name < datasets[j].Name + }) + + return datasets, nil +} + +func (s *Service) listReplicationEvents(limit int) ([]ReplicationEventInfo, error) { + if limit <= 0 { + limit = 50 + } + if limit > 500 { + limit = 500 + } + + var rows []clusterModels.BackupReplicationEvent + if err := s.DB.Order("started_at DESC").Limit(limit).Find(&rows).Error; err != nil { + return nil, err + } + + out := make([]ReplicationEventInfo, 0, len(rows)) + for _, row := range rows { + out = append(out, ReplicationEventInfo{ + ID: row.ID, + Direction: row.Direction, + RemoteAddress: row.RemoteAddress, + SourceDataset: row.SourceDataset, + DestinationDataset: row.DestinationDataset, + BaseSnapshot: row.BaseSnapshot, + TargetSnapshot: row.TargetSnapshot, + Mode: row.Mode, + Status: row.Status, + Error: row.Error, + StartedAt: row.StartedAt, + CompletedAt: row.CompletedAt, + }) + } + + return out, nil +} + +func (s *Service) beginReplicationEvent( + direction string, + remoteAddress string, + sourceDataset string, + destinationDataset string, + baseSnapshot string, + targetSnapshot string, + mode string, +) *clusterModels.BackupReplicationEvent { + if s.DB == nil { + return nil + } + + event := &clusterModels.BackupReplicationEvent{ + Direction: direction, + RemoteAddress: remoteAddress, + SourceDataset: sourceDataset, + DestinationDataset: destinationDataset, + BaseSnapshot: baseSnapshot, + TargetSnapshot: targetSnapshot, + Mode: mode, + Status: "running", + StartedAt: time.Now().UTC(), + } + + if err := s.DB.Create(event).Error; err != nil { + logger.L.Debug().Err(err).Msg("failed_to_create_replication_event") + return nil + } + + return event +} + +func (s *Service) completeReplicationEvent(event *clusterModels.BackupReplicationEvent, runErr error) { + if event == nil || s.DB == nil { + return + } + + now := time.Now().UTC() + updates := map[string]any{ + "completed_at": &now, + "error": "", + "status": "success", + } + + if runErr != nil { + updates["status"] = "failed" + updates["error"] = runErr.Error() + } + + if err := s.DB.Model(&clusterModels.BackupReplicationEvent{}).Where("id = ?", event.ID).Updates(updates).Error; err != nil { + logger.L.Debug().Err(err).Msg("failed_to_update_replication_event") + } +} + +func (s *Service) resolveTargetSnapshot(ctx context.Context, dataset, targetSnapshot string) (string, error) { + snaps, err := s.listSnapshots(ctx, dataset) + if err != nil { + return "", err + } + if len(snaps) == 0 { + return "", fmt.Errorf("no_source_snapshots") + } + + if targetSnapshot == "" { + return snaps[len(snaps)-1].Name, nil + } + + targetSnapshot = normalizeSnapshotName(dataset, targetSnapshot) + for _, snap := range snaps { + if snap.Name == targetSnapshot { + return targetSnapshot, nil + } + } + + return "", fmt.Errorf("target_snapshot_not_found") +} + +func normalizeSnapshotName(dataset, snapshot string) string { + snapshot = strings.TrimSpace(snapshot) + if snapshot == "" { + return "" + } + + if strings.Contains(snapshot, "@") { + return snapshot + } + + return fmt.Sprintf("%s@%s", dataset, snapshot) +} + +func (s *Service) sendDataset( + ctx context.Context, + targetSnapshot string, + baseSnapshot string, + withIntermediates bool, + out io.Writer, +) error { + if baseSnapshot == "" { + return s.sendSnapshot(ctx, targetSnapshot, out) + } + + if withIntermediates { + return s.sendIncrementalWithIntermediates(ctx, baseSnapshot, targetSnapshot, out) + } + + return s.sendIncremental(ctx, baseSnapshot, targetSnapshot, out) +} + func (s *Service) serverTLSConfig() (*tls.Config, error) { base, err := s.Auth.GetSylveCertificate() if err != nil {