migration

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2026 License: LGPL-2.1 Imports: 18 Imported by: 0

README

Migration Package

Note: this package is currently in transition to an engine-centric lifecycle model. For the latest architecture and API usage guidance, start with:

  • docs/ENGINE_ARCHITECTURE_OVERVIEW.md
  • docs/API_USAGE_GUIDE.md
  • docs/ENGINE_API_CUTOVER.md

The migration package orchestrates the Migration Engine: it prepares or accepts the database (pkg/db), builds config (including YAML state), runs traversal and copy via pkg/queue, and performs verification. All persistence uses a single database instance (*db.DB, DuckDB).


Overview

  • Database: Uses *db.DB from pkg/db—either opened by the engine at Database.Path or supplied by the caller (Config.DatabaseInstance). The same DB holds nodes, staging, stats, and logs for traversal and copy.
  • Configuration: YAML config files serialize/deserialize migration state (roots, status, options); config path defaults to {Database.Path}.yaml (e.g. migration.duckdb.yaml).
  • State and verification: InspectMigrationStatus(database) and VerifyMigration(database, opts) read from the DB’s node and stats tables (see pkg/db).
  • Execution: Coordinates queue setup, root seeding, traversal, copy (when applicable), and verification; supports resume from saved YAML state.

Core Components

Config

The Config struct aggregates all parameters required to run a migration:

type Config struct {
    DatabaseInstance *db.DB  // If nil, engine opens DB at Database.Path
    CloseWhenDone    bool    // If true and engine opened the DB, close it when done
    Database         DatabaseConfig
    Source           Service
    Destination      Service
    SeedRoots        bool
    WorkerCount      int
    MaxRetries       int
    CoordinatorLead  int
    LogAddress       string
    LogLevel         string
    Verification     VerifyOptions
    // ... (ConfigPath, YAMLConfig, ShutdownContext, etc.)
}
Service

Represents a filesystem service participating in the migration:

type Service struct {
    Name    string
    Adapter types.FSAdapter  // From Sylos-FS; must be provided by caller
    Root    types.Folder    // Source or destination root folder
}

Important: The Adapter field must be provided by the caller. The Migration Engine never creates or closes adapters - the caller owns the adapter lifecycle. This design eliminates connection lock issues (especially with Spectra which only supports one connection at a time) and allows the API to manage adapter lifecycle centrally.

LetsMigrate

The main entry point for executing a migration (synchronous):

result, err := migration.LetsMigrate(cfg)

This function:

  1. Opens the database (if Config.DatabaseInstance is nil, opens at Config.Database.Path via db.Open)
  2. Loads YAML config if present and inspects DB state via InspectMigrationStatus(database)
  3. Decides whether to run fresh or resume
  4. Seeds roots (if needed), runs traversal (and copy when applicable) using the same DB
  5. Runs verification against the DB
  6. Returns results

Note: This function blocks until the migration completes or is shutdown. It automatically handles SIGINT/SIGTERM signals for graceful shutdown.

Adapter Lifecycle: The engine validates that adapters are provided (non-nil) but never creates or closes them. The caller is responsible for adapter lifecycle management.

StartMigration

For programmatic control over a running migration:

controller := migration.StartMigration(cfg)

// Later, trigger shutdown programmatically:
controller.Shutdown()

// Wait for completion:
result, err := controller.Wait()

MigrationController provides:

  • Shutdown() – Triggers force shutdown, checkpoints the database, saves YAML with "suspended" status
  • GetDB() – Returns the *db.DB in use (caller can keep using it when the engine does not close it)
  • Wait() – Blocks until migration completes, returns result and error
  • Result() – Returns the migration result (nil if not complete)
  • Error() – Returns any error that occurred

Use Cases:

  • Programmatic shutdown control
  • Integration with external orchestration systems
  • Testing scenarios requiring controlled shutdown

YAML Configuration System

The migration package includes a comprehensive YAML-based configuration system that allows you to serialize and deserialize migration sessions.

Automatic State Persistence

The config YAML is automatically saved at critical milestones:

  • Root selection – When SetRootFolders() is called
  • Roots seeded – After root tasks are seeded into the database
  • Traversal started – When queues are initialized and ready
  • Round advancement – When source or destination rounds advance
  • Traversal complete – When migration finishes
Config file location

By default, the config YAML path is derived from the database path: {Database.Path}.yaml (e.g. migration.duckdbmigration.duckdb.yaml). Override with DatabaseConfig.ConfigPath.

Serialization (Save)

Configs are automatically saved during migration, but you can also save manually:

// Create YAML config from migration.Config
yamlCfg, err := migration.NewMigrationConfigYAML(cfg, status)
if err != nil {
    return err
}

// Save to file
err = migration.SaveMigrationConfig("migration.yaml", yamlCfg)
Deserialization (Load)
Option 1: Load YAML Config Only

For inspection or reading state without resuming:

yamlCfg, err := migration.LoadMigrationConfig("migration.yaml")
if err != nil {
    return err
}

// Access config data
fmt.Printf("Status: %s\n", yamlCfg.State.Status)
fmt.Printf("Last Round Src: %d\n", *yamlCfg.State.LastRoundSrc)
Option 2: Reconstruct Full Config (For Resuming)

To resume a migration, you need to reconstruct a migration.Config from the YAML. Adapters must be provided directly - the engine never creates adapters:

// Create adapters (caller is responsible for adapter lifecycle)
srcAdapter, err := createSourceAdapter(...)
if err != nil {
    return err
}

dstAdapter, err := createDestinationAdapter(...)
if err != nil {
    return err
}

// Load and reconstruct the config with provided adapters
cfg, err := migration.LoadMigrationConfigFromYAML("migration.yaml", srcAdapter, dstAdapter)
if err != nil {
    return err
}

// Resume the migration
result, err := migration.LetsMigrate(cfg)

Important: The engine never creates or closes adapters. The caller is responsible for adapter lifecycle management.

YAML Config Structure

The YAML config includes:

  • Metadata - Migration ID, creation time, last modified time
  • State - Current status, last rounds/levels reached
  • Services - Source and destination service configurations
  • Service Configs - Embedded service-specific configs (e.g., spectra.json)
  • Migration Options - Worker count, retries, coordinator lead, etc.
  • Logging - Log service address, port, and level
  • Database – Database file path and settings (path, config path, etc.)
  • Verification - Verification options
  • Extensions - Unstructured fields for future extensions
Key Functions
  • LoadMigrationConfig(path) - Loads YAML config file
  • SaveMigrationConfig(path, cfg) - Saves YAML config file
  • LoadMigrationConfigFromYAML(path, factory) - Loads YAML and reconstructs migration.Config
  • ToMigrationConfig(factory) - Converts MigrationConfigYAML to migration.Config
  • NewMigrationConfigYAML(cfg, status) - Creates YAML config from migration.Config

Migration Lifecycle

1. Fresh Migration
cfg := migration.Config{
    Database: migration.DatabaseConfig{
        Path: "migration.db",
    },
    Source:      sourceService,
    Destination: destinationService,
    SeedRoots:   true,
    // ... other options
}

result, err := migration.LetsMigrate(cfg)
2. Resume migration

When the database file already exists and/or YAML config exists, LetsMigrate inspects state and resumes if there is pending work:

// Same config, but database already exists with state
cfg := migration.Config{
    Database: migration.DatabaseConfig{
        Path: "migration.db", // Existing database
    },
    // ... same config
}

// Automatically resumes from last checkpoint
result, err := migration.LetsMigrate(cfg)
3. Resume from YAML

Load a saved migration session and resume:

// Load config from YAML
cfg, err := migration.LoadMigrationConfigFromYAML("migration.yaml", adapterFactory)
if err != nil {
    return err
}

// Resume migration
result, err := migration.LetsMigrate(cfg)

State Management

MigrationStatus

Tracks the current state of a migration:

type MigrationStatus struct {
    SrcTotal           int
    DstTotal           int
    SrcPending         int
    DstPending         int
    SrcFailed          int
    DstFailed          int
    MinPendingDepthSrc *int
    MinPendingDepthDst *int
}
InspectMigrationStatus

Query the current migration state from the database (node counts and stats tables in pkg/db):

status, err := migration.InspectMigrationStatus(database)
if status.HasPending() {
    fmt.Println("Migration has pending work")
}
if status.IsComplete() {
    fmt.Println("Migration is complete")
}

Implementation uses db.CountNodes, database.GetStatsCount, and database.GetStatsBreakdown for SRC and DST.


Verification

After migration completes, verification checks the results:

verifyOpts := migration.VerifyOptions{
    AllowPending:  false,
    AllowNotOnSrc: true,
}

report, err := migration.VerifyMigration(database, verifyOpts)
if report.Success(verifyOpts) {
    fmt.Println("Migration verified successfully")
}

Verification reads from the same database: node counts and stats tables to report totals, pending, failed, successful, and (for DST) not-on-src.


Database management

Relationship with pkg/db

The migration package never opens or closes the database by itself unless you use it in “standalone” mode (no DatabaseInstance provided). It expects a DuckDB instance from pkg/db: either you pass Config.DatabaseInstance (already opened) or you set Config.Database.Path and the engine calls db.Open(db.Options{Path: cfg.Database.Path}). All traversal, copy, status, and verification use that single *db.DB (same node tables, staging, stats, and logs as described in pkg/db).

SetupDatabase

Opens the database at the given path (creates if missing). Caller is responsible for closing it when done.

database, wasFresh, err := migration.SetupDatabase(migration.DatabaseConfig{
    Path:           "migration.duckdb",
    RemoveExisting: false,
})
DatabaseConfig
type DatabaseConfig struct {
    Path           string // DuckDB file path (e.g. migration.duckdb)
    RemoveExisting bool   // If true, delete existing file before creating
    ConfigPath     string // Optional: custom YAML config path; default is {Path}.yaml
    RequireOpen    bool   // If true (API mode), DB instance must already be provided
}

Error Handling

The migration package uses structured error handling:

  • Database errors are wrapped with context
  • Adapter creation errors include service type information
  • State inspection errors indicate what operation failed
  • Verification errors provide detailed failure reports

Always check errors and handle them appropriately:

result, err := migration.LetsMigrate(cfg)
if err != nil {
    // Handle error - migration may be partially complete
    fmt.Printf("Migration failed: %v\n", err)
    
    // Check verification report for details
    if result.Verification.SrcFailed > 0 {
        fmt.Printf("Source failures: %d\n", result.Verification.SrcFailed)
    }
}

Best Practices

  1. Always specify ConfigPath - Makes it easier to locate and manage config files
  2. Check migration status - Before resuming, inspect status to understand current state
  3. Handle errors gracefully - Migrations can be partially complete
  4. Use verification - Always verify migration results before considering it complete
  5. Save configs explicitly - For important migrations, save configs at key points
  6. Provide adapters explicitly - The engine never creates adapters; all adapters must be provided by the caller
  7. Manage adapter lifecycle - The engine never closes adapters; the caller is responsible for cleanup

Retry sweeps

Retry sweeps allow re-processing of failed or pending nodes to discover new or changed content. This is useful for:

  • Recovering from transient failures
  • Re-scanning subtrees marked as pending for investigation
  • Testing migration behavior with partial tree reprocessing
How Retry Sweeps Work
  1. Mark Nodes for Retry: Change node status from successful to pending (or leave as failed)
  2. Delete subtree data: Remove all descendant nodes and their metadata from the database (via pkg/db Writer)
  3. Run Retry Sweep: Execute migration in retry mode (QueueModeRetry)
  4. Re-discover Content: Queues re-traverse marked subtrees as if doing fresh traversal
Retry Sweep Flow

SRC Queue:

  • Scans all known levels for pending/failed tasks
  • Re-processes marked nodes (lists children, applies filters)
  • On successful completion of SRC folder tasks:
    • Triggers DST cleanup for corresponding DST nodes
    • Marks DST parent as pending
    • Deletes DST children

DST Queue:

  • Waits for SRC to complete (coordinator gating)
  • Processes pending DST tasks
  • Loads expected children from SRC via join-lookup tables
  • Performs comparison and discovers new/changed content
DST Cleanup During SRC Completion

To prevent node duplication and maintain consistency, DST cleanup happens during SRC task completion, not during pull:

// In completeTask() for SRC folder tasks in retry mode (database is *db.DB):
if q.name == "src" && q.getMode() == QueueModeRetry && task.IsFolder() {
    dstID, err := db.GetDstIDFromSrcID(database, srcNodeID)
    outputBuffer.AddStatusUpdate("DST", dstDepth, oldStatus, db.StatusPending, dstID)
    childIDs, err := db.GetChildrenIDsByParentID(database, "DST", dstID)
    for _, childID := range childIDs {
        outputBuffer.AddNodeDeletion("DST", childID, childDepth, childStatus)
    }
}

This ensures:

  • DST cleanup is buffered along with other task completion writes
  • Cleanup only occurs when SRC task successfully completes
  • All operations are atomic within the output buffer
  • No race conditions between cleanup and other queue operations
Testing Retry Sweeps

The pkg/tests/retry_sweep package provides a comprehensive test:

  1. Setup: Runs normal migration to completion (all nodes successful)
  2. Prepare Retry:
    • Randomly selects a top-level folder
    • Marks it as pending
    • Deletes its entire subtree (children, status, join tables, stats)
  3. Execute Retry: Runs retry sweep migration
  4. Verify:
    • Confirms node counts match expected (subtree size)
    • Validates no duplicates were created
    • Checks both SRC and DST queues completed successfully

Run the test:

powershell -File pkg/tests/retry_sweep/run.ps1

Examples

See the main package (main.go) and test packages for complete examples of:

  • Setting up migrations
  • Creating service adapters
  • Handling migration results
  • Resuming from saved state

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HandleShutdownSignals

func HandleShutdownSignals(cancel context.CancelFunc)

HandleShutdownSignals sets up signal handlers for SIGINT (Ctrl+C) and SIGTERM. When either signal is received, it cancels the provided context. If shutdown doesn't complete within 10 seconds, it hard-kills the process. This function should be called in a goroutine at the start of the migration. On Windows, SIGINT is supported but SIGTERM may not be available.

func RunCopyPhase

func RunCopyPhase(cfg CopyPhaseConfig) (queue.QueueStats, error)

RunCopyPhase executes the copy phase (two-pass: folders then files).

func SetupDatabase

func SetupDatabase(cfg DatabaseConfig) (*db.DB, bool, error)

SetupDatabase opens a DuckDB database at cfg.Path. Returns the DB and whether it was fresh (true if new or removed). The caller is responsible for closing the database when done.

Types

type Config

type Config struct {
	// Database config (path, etc.). MigrationManager opens and owns this connection lifecycle.
	Database DatabaseConfig

	Source      Service
	Destination Service

	SeedRoots       bool
	WorkerCount     int
	MaxRetries      int
	CoordinatorLead int

	LogAddress   string
	LogLevel     string
	SkipListener bool
	StartupDelay time.Duration
	ProgressTick time.Duration

	Verification VerifyOptions

	// ShutdownContext is an optional context for force shutdown control.
	// If not provided, LetsMigrate will create one internally.
	// Set this when using StartMigration for programmatic shutdown control.
	ShutdownContext context.Context
}

Config aggregates all of the knobs required to run the migration engine once.

func (*Config) SetRootFolders

func (c *Config) SetRootFolders(src, dst types.Folder) error

SetRootFolders assigns the source and destination root folders that will seed the migration queues. It normalizes required defaults (location path, type, display name) and validates identifiers.

type CopyPhaseConfig

type CopyPhaseConfig struct {
	DuckDB          *db.DB
	SrcAdapter      types.FSAdapter
	DstAdapter      types.FSAdapter
	WorkerCount     int
	MaxRetries      int
	LogAddress      string
	LogLevel        string
	SkipListener    bool
	StartupDelay    time.Duration
	ProgressTick    time.Duration
	ShutdownContext context.Context
}

CopyPhaseConfig configures the copy phase execution.

type CreateMigrationConfig

type CreateMigrationConfig struct {
	Name            string
	ServiceMetadata any
	RootConfig      any
}

CreateMigrationConfig defines metadata persisted for a migration record.

type DatabaseConfig

type DatabaseConfig struct {
	// Path is the DuckDB file path to create/open (e.g. migration.duckdb).
	Path string
	// RemoveExisting deletes the database file if it already exists before creating a new database.
	RemoveExisting bool
	// RequireOpen determines whether the DB instance must already be open (true) or can be auto-opened (false).
	// When true (API mode): DB instance must be provided and already open, error if nil/closed.
	// When false (standalone mode): Can auto-open DB if instance is nil or not open.
	RequireOpen bool
}

DatabaseConfig defines how the migration engine should prepare its backing store.

type DiffItem

type DiffItem struct {
	Path               string
	Name               string
	Depth              int
	Type               string
	SrcNodeID          string
	DstNodeID          string
	SrcTraversalStatus string
	DstTraversalStatus string
	CopyStatus         string
	Excluded           bool
	MissingOnSource    bool
	MissingOnDest      bool
	Size               int64
}

DiffItem is a path review row comparing source and destination state.

type DiffsStats

type DiffsStats struct {
	Total           int
	Folders         int
	Files           int
	MissingOnSource int
	MissingOnDest   int
	Excluded        int
}

type ListChildrenDiffsRequest

type ListChildrenDiffsRequest struct {
	Path          string
	Limit         int
	Offset        int
	SortBy        string
	SortDirection string
	FoldersOnly   bool
	Status        string
}

type ListChildrenDiffsResult

type ListChildrenDiffsResult struct {
	Items  []DiffItem
	Total  int
	Limit  int
	Offset int
}

type LogEntry

type LogEntry struct {
	Timestamp time.Time
	Level     string
	Message   string
}

LogEntry is a user-facing runtime log projection.

type LogsProjection

type LogsProjection struct {
	Entries []LogEntry
	ByLevel map[string][]LogEntry
}

type Migration

type Migration struct {
	ID   string
	Name string
	// contains filtered or unexported fields
}

Migration is the first-class domain object for a single migration lifecycle.

func (*Migration) AddRoots

func (m *Migration) AddRoots(srcRoot, dstRoot types.Folder) (RootSeedSummary, error)

AddRoots seeds source and destination root tasks into the migration DB. This is the explicit root insert step before starting traversal.

func (*Migration) BulkExclude

func (m *Migration) BulkExclude(filter NodeQueryFilter, excluded bool) (int, error)

BulkExclude applies exclusion over a query slice.

func (*Migration) BulkExcludeWithPropagation

func (m *Migration) BulkExcludeWithPropagation(filter NodeQueryFilter, excluded bool) (int, error)

func (*Migration) GetChildrenDiffsStats

func (m *Migration) GetChildrenDiffsStats(path string, foldersOnly bool) (DiffsStats, error)

func (*Migration) GetLogs

func (m *Migration) GetLogs(limit int, groupByLevel bool) (LogsProjection, error)

func (*Migration) GetQueueMetrics

func (m *Migration) GetQueueMetrics() (QueueMetricsSnapshot, error)

func (*Migration) GetRecentLogs

func (m *Migration) GetRecentLogs(limit int) ([]LogEntry, error)

func (*Migration) GetRuntimeStatus

func (m *Migration) GetRuntimeStatus() RuntimeState

func (*Migration) GetTraversalSummary

func (m *Migration) GetTraversalSummary() (TraversalSummary, error)

func (*Migration) ListChildrenDiffs

func (m *Migration) ListChildrenDiffs(req ListChildrenDiffsRequest) (ListChildrenDiffsResult, error)

func (*Migration) MarkNodeForRetryCopy

func (m *Migration) MarkNodeForRetryCopy(nodeID string) error

func (*Migration) MarkNodeForRetryDiscovery

func (m *Migration) MarkNodeForRetryDiscovery(nodeID string) error

func (*Migration) Phase

func (m *Migration) Phase() Phase

func (*Migration) QueryNodes

func (m *Migration) QueryNodes(filter NodeQueryFilter) ([]db.NodeState, error)

QueryNodes provides review-phase node search/filter without exposing SQL to API.

func (*Migration) RetryAllFailed

func (m *Migration) RetryAllFailed() error

func (*Migration) RunRetrySweep

func (m *Migration) RunRetrySweep(opts RetrySweepOptions) (RuntimeStats, error)

func (*Migration) SearchPathReviewItems

func (m *Migration) SearchPathReviewItems(req SearchRequest) (SearchResult, error)

func (*Migration) SetNodeExcluded

func (m *Migration) SetNodeExcluded(queueType, nodeID string, excluded bool) error

SetNodeExcluded mutates review exclusions in engine-owned store.

func (*Migration) SetNodeExcludedWithPropagation

func (m *Migration) SetNodeExcludedWithPropagation(queueType, nodeID string, excluded bool) error

func (*Migration) StartCopy

func (m *Migration) StartCopy() (queue.QueueStats, error)

StartCopy transitions review->copying and runs copy phase.

func (*Migration) StartTraversal

func (m *Migration) StartTraversal(cfg Config) (RuntimeStats, error)

StartTraversal begins traversal lifecycle and transitions to review on success.

func (*Migration) Stop

func (m *Migration) Stop() (StopResult, error)

func (*Migration) UnmarkNodeForRetryCopy

func (m *Migration) UnmarkNodeForRetryCopy(nodeID string) error

func (*Migration) UnmarkNodeForRetryDiscovery

func (m *Migration) UnmarkNodeForRetryDiscovery(nodeID string) error

type MigrationConfig

type MigrationConfig struct {
	DB              *db.DB // DuckDB instance (required; manager-owned)
	DBPath          string // Reserved for compatibility; ignored when DB is set
	SrcAdapter      types.FSAdapter
	DstAdapter      types.FSAdapter
	SrcRoot         types.Folder
	DstRoot         types.Folder
	SrcServiceName  string
	WorkerCount     int
	MaxRetries      int
	CoordinatorLead int
	MaxSrcAhead     int // Max rounds SRC may run ahead of DST (default 3); 0 uses default
	LogAddress      string
	LogLevel        string
	SkipListener    bool
	StartupDelay    time.Duration
	ProgressTick    time.Duration
	ResumeStatus    *MigrationStatus
	ShutdownContext context.Context
}

MigrationConfig is the configuration passed to RunMigration.

type MigrationController

type MigrationController struct {
	// contains filtered or unexported fields
}

MigrationController provides programmatic control over a running migration. It allows you to trigger force shutdown and check migration status.

func StartMigration

func StartMigration(cfg Config) *MigrationController

StartMigration starts a migration asynchronously and returns a MigrationController that allows programmatic shutdown. Use this when you need to control the migration lifecycle or run migrations in the background.

Example:

controller := migration.StartMigration(cfg)
defer controller.Shutdown()

// Later, trigger shutdown programmatically:
controller.Shutdown()

// Wait for completion:
result, err := controller.Wait()

func (*MigrationController) Done

func (mc *MigrationController) Done() <-chan struct{}

Done returns a channel that is closed when the migration completes or is shutdown.

func (*MigrationController) Shutdown

func (mc *MigrationController) Shutdown()

Shutdown triggers a force shutdown of the migration. This is safe to call multiple times or after the migration has completed.

func (*MigrationController) Wait

func (mc *MigrationController) Wait() (Result, error)

Wait blocks until the migration completes or is shutdown, then returns the result and error.

type MigrationDetails

type MigrationDetails struct {
	ID                  string
	Name                string
	Phase               Phase
	CreatedAt           time.Time
	UpdatedAt           time.Time
	ServiceMetadataJSON string
	RootConfigJSON      string
}

MigrationDetails is the full migration record from the DB, for API detail views. The engine owns all DB access; use this type via GetMigrationDetails so the API never touches the DB.

type MigrationManager

type MigrationManager struct {
	// contains filtered or unexported fields
}

MigrationManager owns migration lifecycle authority and persistence access.

func NewMigrationManager

func NewMigrationManager(cfg DatabaseConfig) (*MigrationManager, error)

NewMigrationManager opens the migration DB and becomes the lifecycle owner.

func (*MigrationManager) Close

func (m *MigrationManager) Close() error

Close releases manager-owned resources.

func (*MigrationManager) CreateMigration

func (m *MigrationManager) CreateMigration(cfg CreateMigrationConfig) (*Migration, error)

CreateMigration registers a migration record and returns a domain object handle.

func (*MigrationManager) DeleteMigration

func (m *MigrationManager) DeleteMigration(id string) error

DeleteMigration removes persisted metadata and in-memory runtime state.

func (*MigrationManager) GetMigration

func (m *MigrationManager) GetMigration(id string) (*Migration, error)

GetMigration loads or returns a cached migration domain object.

func (*MigrationManager) GetMigrationDetails

func (m *MigrationManager) GetMigrationDetails(id string) (*MigrationDetails, error)

GetMigrationDetails returns the full migration record from the DB. Use this for "GET migration by ID" detail views; the API should not query the DB directly.

func (*MigrationManager) ListMigrations

func (m *MigrationManager) ListMigrations() ([]MigrationSummary, error)

ListMigrations returns persisted migration summaries.

type MigrationStatus

type MigrationStatus struct {
	SrcTotal int
	DstTotal int

	SrcPending int
	DstPending int

	SrcFailed int
	DstFailed int

	MinPendingDepthSrc *int
	MinPendingDepthDst *int
}

MigrationStatus summarizes the current state of a migration in the database.

func InspectMigrationStatus

func InspectMigrationStatus(database *db.DB) (MigrationStatus, error)

InspectMigrationStatus inspects the DuckDB node data and stats tables and returns a MigrationStatus.

func (MigrationStatus) HasFailures

func (s MigrationStatus) HasFailures() bool

HasFailures returns true if any src or dst nodes failed traversal.

func (MigrationStatus) HasPending

func (s MigrationStatus) HasPending() bool

HasPending returns true if any src or dst nodes are still pending.

func (MigrationStatus) IsComplete

func (s MigrationStatus) IsComplete() bool

IsComplete returns true when there are nodes and no pending or failed work.

func (MigrationStatus) IsEmpty

func (s MigrationStatus) IsEmpty() bool

IsEmpty returns true if no nodes have been discovered yet.

type MigrationSummary

type MigrationSummary struct {
	ID        string
	Name      string
	Phase     Phase
	CreatedAt time.Time
	UpdatedAt time.Time
}

MigrationSummary is a compact projection returned by ListMigrations.

type NodeQueryFilter

type NodeQueryFilter struct {
	Queue       string
	Depth       *int
	Status      string
	Excluded    *bool
	PathLike    string
	Limit       int
	Offset      int
	OrderByPath bool
}

NodeQueryFilter controls review-phase node query behavior.

type Phase

type Phase string

Phase is the migration lifecycle state, stored as the canonical status string.

const (
	PhaseCreated    Phase = "Roots-Set"             // User has set root folders and services (initial state)
	PhaseFiltersSet Phase = "Filters-Set"           // Filters configured, ready for traversal (also used for retry)
	PhaseTraversing Phase = "Traversal-In-Progress" // Traversal is currently running
	PhaseReview     Phase = "Awaiting-Path-Review"  // Traversal finished, user can review results and see copy plan
	PhaseCopying    Phase = "Copy-In-Progress"      // Copy phase is currently running
	PhaseCompleted  Phase = "Complete"              // Migration completed successfully
	PhaseSuspended  Phase = "Suspended"             // Migration suspended (can be resumed)
)

func ParsePhase

func ParsePhase(v string) (Phase, error)

func (Phase) String

func (p Phase) String() string

type QueueMetricsSnapshot

type QueueMetricsSnapshot struct {
	Queues map[string]map[string]any
}

type Result

type Result struct {
	RootsSeeded  bool
	RootSummary  RootSeedSummary
	Runtime      RuntimeStats
	Verification VerificationReport
}

Result captures the outcome of a migration run.

func LetsMigrate

func LetsMigrate(cfg Config) (Result, error)

LetsMigrate executes setup, traversal, and verification using the supplied configuration. This is the synchronous version - it blocks until the migration completes or is shutdown. For programmatic shutdown control, use StartMigration instead.

type RetrySweepOptions

type RetrySweepOptions struct {
	WorkerCount   int
	MaxRetries    int
	LogAddress    string
	LogLevel      string
	SkipListener  bool
	MaxKnownDepth int
}

RetrySweepOptions are manager-level knobs for retry sweep runs.

type RootSeedSummary

type RootSeedSummary struct {
	SrcRoots int
	DstRoots int
}

RootSeedSummary captures verification details after root task seeding.

func SeedRootTasks

func SeedRootTasks(srcRoot types.Folder, dstRoot types.Folder, database *db.DB) (RootSeedSummary, error)

SeedRootTasks inserts the supplied source and destination root folders into the database. The folders should already contain root-relative metadata (LocationPath="/", DepthLevel=0).

type RuntimeState

type RuntimeState struct {
	NodesDiscovered int64
	TasksPending    int64
	TasksCompleted  int64
	BytesCopied     int64
	Errors          int64
}

RuntimeState is the in-memory live status projection for a migration.

type RuntimeStats

type RuntimeStats struct {
	Duration time.Duration
	Src      queue.QueueStats
	Dst      queue.QueueStats
}

RuntimeStats captures execution statistics at the end of a migration run.

func RunMigration

func RunMigration(cfg MigrationConfig) (RuntimeStats, error)

RunMigration executes the migration traversal using the provided configuration.

func RunRetrySweep

func RunRetrySweep(cfg SweepConfig) (RuntimeStats, error)

RunRetrySweep runs a retry sweep to re-process failed or pending tasks from a previous traversal. This allows re-traversing paths that previously failed (e.g., due to permissions) and discovering new content in those paths.

The sweep checks all known levels up to maxKnownDepth (or auto-detects from DB if -1), then uses normal traversal logic for deeper levels discovered during retry.

Example:

config := migration.SweepConfig{
    DuckDB:        dbInstance,
    SrcAdapter:    srcAdapter,
    DstAdapter:    dstAdapter,
    WorkerCount:   10,
    MaxRetries:    3,
    MaxKnownDepth: 5, // Or -1 to auto-detect
}
stats, err := migration.RunRetrySweep(config)

type SearchRequest

type SearchRequest struct {
	Query         string
	Path          string
	Limit         int
	Offset        int
	SortBy        string
	SortDirection string
}

type SearchResult

type SearchResult struct {
	Items  []DiffItem
	Total  int
	Limit  int
	Offset int
}

type Service

type Service struct {
	Name    string
	Adapter types.FSAdapter
	Root    types.Folder
}

Service defines a single filesystem service participating in a migration.

type StopResult

type StopResult struct {
	MigrationID   string
	Phase         Phase
	RuntimeStatus RuntimeState
	Stopped       bool
}

StopResult reports stop/suspend state after a stop request.

type SweepConfig

type SweepConfig struct {
	DuckDB          *db.DB
	SrcAdapter      types.FSAdapter
	DstAdapter      types.FSAdapter
	WorkerCount     int
	MaxRetries      int
	LogAddress      string
	LogLevel        string
	SkipListener    bool
	StartupDelay    time.Duration
	ProgressTick    time.Duration
	ShutdownContext context.Context
	// For retry sweeps only
	MaxKnownDepth          int    // Maximum known depth from previous traversal (-1 to auto-detect)
	SkipAutoETLBeforeRetry bool   // If true, skip automatic ETL from DuckDB to DuckDB before retry sweep
	DuckDBPath             string // Optional: Path to DuckDB file (auto-derived from DuckDB path if empty and ETL is enabled)
}

Gotta, sweep sweep sweep!!! 🧹🧹🧹 SweepConfig is the configuration for running retry sweeps.

type TraversalSummary

type TraversalSummary struct {
	SrcTotal    int
	DstTotal    int
	SrcPending  int
	DstPending  int
	SrcFailed   int
	DstFailed   int
	SrcExcluded int
	DstExcluded int
}

TraversalSummary is the review projection returned by GetTraversalSummary.

type VerificationReport

type VerificationReport struct {
	SrcTotal      int
	DstTotal      int
	SrcPending    int
	DstPending    int
	SrcFailed     int
	DstFailed     int
	DstNotOnSrc   int
	SrcSuccessful int // Count of successful SRC nodes
	DstSuccessful int // Count of successful DST nodes
}

VerificationReport captures aggregate statistics from the verification pass.

func VerifyMigration

func VerifyMigration(database *db.DB, opts VerifyOptions) (VerificationReport, error)

VerifyMigration inspects the DuckDB node tables and stats for pending, failed, or missing nodes and returns a report.

func (VerificationReport) Success

func (r VerificationReport) Success(opts VerifyOptions) bool

Success returns true when the report satisfies the supplied VerifyOptions. Migration is not considered successful unless at least one node was actually moved/traversed.

type VerifyOptions

type VerifyOptions struct {
	AllowPending  bool
	AllowNotOnSrc bool
}

VerifyOptions define the expectations for post-migration validation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL