Documentation
¶
Index ¶
- func HandleShutdownSignals(cancel context.CancelFunc)
- func RunCopyPhase(cfg CopyPhaseConfig) (queue.QueueStats, error)
- func SetupDatabase(cfg DatabaseConfig) (*db.DB, bool, error)
- type Config
- type CopyPhaseConfig
- type CreateMigrationConfig
- type DatabaseConfig
- type DiffItem
- type DiffsStats
- type ListChildrenDiffsRequest
- type ListChildrenDiffsResult
- type LogEntry
- type LogsProjection
- type Migration
- func (m *Migration) AddRoots(srcRoot, dstRoot types.Folder) (RootSeedSummary, error)
- func (m *Migration) BulkExclude(filter NodeQueryFilter, excluded bool) (int, error)
- func (m *Migration) BulkExcludeWithPropagation(filter NodeQueryFilter, excluded bool) (int, error)
- func (m *Migration) GetChildrenDiffsStats(path string, foldersOnly bool) (DiffsStats, error)
- func (m *Migration) GetLogs(limit int, groupByLevel bool) (LogsProjection, error)
- func (m *Migration) GetQueueMetrics() (QueueMetricsSnapshot, error)
- func (m *Migration) GetRecentLogs(limit int) ([]LogEntry, error)
- func (m *Migration) GetRuntimeStatus() RuntimeState
- func (m *Migration) GetTraversalSummary() (TraversalSummary, error)
- func (m *Migration) ListChildrenDiffs(req ListChildrenDiffsRequest) (ListChildrenDiffsResult, error)
- func (m *Migration) MarkNodeForRetryCopy(nodeID string) error
- func (m *Migration) MarkNodeForRetryDiscovery(nodeID string) error
- func (m *Migration) Phase() Phase
- func (m *Migration) QueryNodes(filter NodeQueryFilter) ([]db.NodeState, error)
- func (m *Migration) RetryAllFailed() error
- func (m *Migration) RunRetrySweep(opts RetrySweepOptions) (RuntimeStats, error)
- func (m *Migration) SearchPathReviewItems(req SearchRequest) (SearchResult, error)
- func (m *Migration) SetNodeExcluded(queueType, nodeID string, excluded bool) error
- func (m *Migration) SetNodeExcludedWithPropagation(queueType, nodeID string, excluded bool) error
- func (m *Migration) StartCopy() (queue.QueueStats, error)
- func (m *Migration) StartTraversal(cfg Config) (RuntimeStats, error)
- func (m *Migration) Stop() (StopResult, error)
- func (m *Migration) UnmarkNodeForRetryCopy(nodeID string) error
- func (m *Migration) UnmarkNodeForRetryDiscovery(nodeID string) error
- type MigrationConfig
- type MigrationController
- type MigrationDetails
- type MigrationManager
- func (m *MigrationManager) Close() error
- func (m *MigrationManager) CreateMigration(cfg CreateMigrationConfig) (*Migration, error)
- func (m *MigrationManager) DeleteMigration(id string) error
- func (m *MigrationManager) GetMigration(id string) (*Migration, error)
- func (m *MigrationManager) GetMigrationDetails(id string) (*MigrationDetails, error)
- func (m *MigrationManager) ListMigrations() ([]MigrationSummary, error)
- type MigrationStatus
- type MigrationSummary
- type NodeQueryFilter
- type Phase
- type QueueMetricsSnapshot
- type Result
- type RetrySweepOptions
- type RootSeedSummary
- type RuntimeState
- type RuntimeStats
- type SearchRequest
- type SearchResult
- type Service
- type StopResult
- type SweepConfig
- type TraversalSummary
- type VerificationReport
- type VerifyOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HandleShutdownSignals ¶
func HandleShutdownSignals(cancel context.CancelFunc)
HandleShutdownSignals sets up signal handlers for SIGINT (Ctrl+C) and SIGTERM. When either signal is received, it cancels the provided context. If shutdown doesn't complete within 10 seconds, it hard-kills the process. This function should be called in a goroutine at the start of the migration. On Windows, SIGINT is supported but SIGTERM may not be available.
func RunCopyPhase ¶
func RunCopyPhase(cfg CopyPhaseConfig) (queue.QueueStats, error)
RunCopyPhase executes the copy phase (two-pass: folders then files).
func SetupDatabase ¶
func SetupDatabase(cfg DatabaseConfig) (*db.DB, bool, error)
SetupDatabase opens a DuckDB database at cfg.Path. Returns the DB and whether it was fresh (true if new or removed). The caller is responsible for closing the database when done.
Types ¶
type Config ¶
type Config struct {
// Database config (path, etc.). MigrationManager opens and owns this connection lifecycle.
Database DatabaseConfig
Source Service
Destination Service
SeedRoots bool
WorkerCount int
MaxRetries int
CoordinatorLead int
LogAddress string
LogLevel string
SkipListener bool
StartupDelay time.Duration
ProgressTick time.Duration
Verification VerifyOptions
// ShutdownContext is an optional context for force shutdown control.
// If not provided, LetsMigrate will create one internally.
// Set this when using StartMigration for programmatic shutdown control.
ShutdownContext context.Context
}
Config aggregates all of the knobs required to run the migration engine once.
type CopyPhaseConfig ¶
type CopyPhaseConfig struct {
DuckDB *db.DB
SrcAdapter types.FSAdapter
DstAdapter types.FSAdapter
WorkerCount int
MaxRetries int
LogAddress string
LogLevel string
SkipListener bool
StartupDelay time.Duration
ProgressTick time.Duration
ShutdownContext context.Context
}
CopyPhaseConfig configures the copy phase execution.
type CreateMigrationConfig ¶
CreateMigrationConfig defines metadata persisted for a migration record.
type DatabaseConfig ¶
type DatabaseConfig struct {
// Path is the DuckDB file path to create/open (e.g. migration.duckdb).
Path string
// RemoveExisting deletes the database file if it already exists before creating a new database.
RemoveExisting bool
// RequireOpen determines whether the DB instance must already be open (true) or can be auto-opened (false).
// When true (API mode): DB instance must be provided and already open, error if nil/closed.
// When false (standalone mode): Can auto-open DB if instance is nil or not open.
RequireOpen bool
}
DatabaseConfig defines how the migration engine should prepare its backing store.
type DiffItem ¶
type DiffItem struct {
Path string
Name string
Depth int
Type string
SrcNodeID string
DstNodeID string
SrcTraversalStatus string
DstTraversalStatus string
CopyStatus string
Excluded bool
MissingOnSource bool
MissingOnDest bool
Size int64
}
DiffItem is a path review row comparing source and destination state.
type DiffsStats ¶
type ListChildrenDiffsResult ¶
type LogsProjection ¶
type Migration ¶
Migration is the first-class domain object for a single migration lifecycle.
func (*Migration) AddRoots ¶
func (m *Migration) AddRoots(srcRoot, dstRoot types.Folder) (RootSeedSummary, error)
AddRoots seeds source and destination root tasks into the migration DB. This is the explicit root insert step before starting traversal.
func (*Migration) BulkExclude ¶
func (m *Migration) BulkExclude(filter NodeQueryFilter, excluded bool) (int, error)
BulkExclude applies exclusion over a query slice.
func (*Migration) BulkExcludeWithPropagation ¶
func (m *Migration) BulkExcludeWithPropagation(filter NodeQueryFilter, excluded bool) (int, error)
func (*Migration) GetChildrenDiffsStats ¶
func (m *Migration) GetChildrenDiffsStats(path string, foldersOnly bool) (DiffsStats, error)
func (*Migration) GetLogs ¶
func (m *Migration) GetLogs(limit int, groupByLevel bool) (LogsProjection, error)
func (*Migration) GetQueueMetrics ¶
func (m *Migration) GetQueueMetrics() (QueueMetricsSnapshot, error)
func (*Migration) GetRuntimeStatus ¶
func (m *Migration) GetRuntimeStatus() RuntimeState
func (*Migration) GetTraversalSummary ¶
func (m *Migration) GetTraversalSummary() (TraversalSummary, error)
func (*Migration) ListChildrenDiffs ¶
func (m *Migration) ListChildrenDiffs(req ListChildrenDiffsRequest) (ListChildrenDiffsResult, error)
func (*Migration) MarkNodeForRetryCopy ¶
func (*Migration) MarkNodeForRetryDiscovery ¶
func (*Migration) QueryNodes ¶
func (m *Migration) QueryNodes(filter NodeQueryFilter) ([]db.NodeState, error)
QueryNodes provides review-phase node search/filter without exposing SQL to API.
func (*Migration) RetryAllFailed ¶
func (*Migration) RunRetrySweep ¶
func (m *Migration) RunRetrySweep(opts RetrySweepOptions) (RuntimeStats, error)
func (*Migration) SearchPathReviewItems ¶
func (m *Migration) SearchPathReviewItems(req SearchRequest) (SearchResult, error)
func (*Migration) SetNodeExcluded ¶
SetNodeExcluded mutates review exclusions in engine-owned store.
func (*Migration) SetNodeExcludedWithPropagation ¶
func (*Migration) StartCopy ¶
func (m *Migration) StartCopy() (queue.QueueStats, error)
StartCopy transitions review->copying and runs copy phase.
func (*Migration) StartTraversal ¶
func (m *Migration) StartTraversal(cfg Config) (RuntimeStats, error)
StartTraversal begins traversal lifecycle and transitions to review on success.
func (*Migration) Stop ¶
func (m *Migration) Stop() (StopResult, error)
func (*Migration) UnmarkNodeForRetryCopy ¶
func (*Migration) UnmarkNodeForRetryDiscovery ¶
type MigrationConfig ¶
type MigrationConfig struct {
DB *db.DB // DuckDB instance (required; manager-owned)
DBPath string // Reserved for compatibility; ignored when DB is set
SrcAdapter types.FSAdapter
DstAdapter types.FSAdapter
SrcRoot types.Folder
DstRoot types.Folder
SrcServiceName string
WorkerCount int
MaxRetries int
CoordinatorLead int
MaxSrcAhead int // Max rounds SRC may run ahead of DST (default 3); 0 uses default
LogAddress string
LogLevel string
SkipListener bool
StartupDelay time.Duration
ProgressTick time.Duration
ResumeStatus *MigrationStatus
ShutdownContext context.Context
}
MigrationConfig is the configuration passed to RunMigration.
type MigrationController ¶
type MigrationController struct {
// contains filtered or unexported fields
}
MigrationController provides programmatic control over a running migration. It allows you to trigger force shutdown and check migration status.
func StartMigration ¶
func StartMigration(cfg Config) *MigrationController
StartMigration starts a migration asynchronously and returns a MigrationController that allows programmatic shutdown. Use this when you need to control the migration lifecycle or run migrations in the background.
Example:
controller := migration.StartMigration(cfg) defer controller.Shutdown() // Later, trigger shutdown programmatically: controller.Shutdown() // Wait for completion: result, err := controller.Wait()
func (*MigrationController) Done ¶
func (mc *MigrationController) Done() <-chan struct{}
Done returns a channel that is closed when the migration completes or is shutdown.
func (*MigrationController) Shutdown ¶
func (mc *MigrationController) Shutdown()
Shutdown triggers a force shutdown of the migration. This is safe to call multiple times or after the migration has completed.
func (*MigrationController) Wait ¶
func (mc *MigrationController) Wait() (Result, error)
Wait blocks until the migration completes or is shutdown, then returns the result and error.
type MigrationDetails ¶
type MigrationDetails struct {
ID string
Name string
Phase Phase
CreatedAt time.Time
UpdatedAt time.Time
ServiceMetadataJSON string
RootConfigJSON string
}
MigrationDetails is the full migration record from the DB, for API detail views. The engine owns all DB access; use this type via GetMigrationDetails so the API never touches the DB.
type MigrationManager ¶
type MigrationManager struct {
// contains filtered or unexported fields
}
MigrationManager owns migration lifecycle authority and persistence access.
func NewMigrationManager ¶
func NewMigrationManager(cfg DatabaseConfig) (*MigrationManager, error)
NewMigrationManager opens the migration DB and becomes the lifecycle owner.
func (*MigrationManager) Close ¶
func (m *MigrationManager) Close() error
Close releases manager-owned resources.
func (*MigrationManager) CreateMigration ¶
func (m *MigrationManager) CreateMigration(cfg CreateMigrationConfig) (*Migration, error)
CreateMigration registers a migration record and returns a domain object handle.
func (*MigrationManager) DeleteMigration ¶
func (m *MigrationManager) DeleteMigration(id string) error
DeleteMigration removes persisted metadata and in-memory runtime state.
func (*MigrationManager) GetMigration ¶
func (m *MigrationManager) GetMigration(id string) (*Migration, error)
GetMigration loads or returns a cached migration domain object.
func (*MigrationManager) GetMigrationDetails ¶
func (m *MigrationManager) GetMigrationDetails(id string) (*MigrationDetails, error)
GetMigrationDetails returns the full migration record from the DB. Use this for "GET migration by ID" detail views; the API should not query the DB directly.
func (*MigrationManager) ListMigrations ¶
func (m *MigrationManager) ListMigrations() ([]MigrationSummary, error)
ListMigrations returns persisted migration summaries.
type MigrationStatus ¶
type MigrationStatus struct {
SrcTotal int
DstTotal int
SrcPending int
DstPending int
SrcFailed int
DstFailed int
MinPendingDepthSrc *int
MinPendingDepthDst *int
}
MigrationStatus summarizes the current state of a migration in the database.
func InspectMigrationStatus ¶
func InspectMigrationStatus(database *db.DB) (MigrationStatus, error)
InspectMigrationStatus inspects the DuckDB node data and stats tables and returns a MigrationStatus.
func (MigrationStatus) HasFailures ¶
func (s MigrationStatus) HasFailures() bool
HasFailures returns true if any src or dst nodes failed traversal.
func (MigrationStatus) HasPending ¶
func (s MigrationStatus) HasPending() bool
HasPending returns true if any src or dst nodes are still pending.
func (MigrationStatus) IsComplete ¶
func (s MigrationStatus) IsComplete() bool
IsComplete returns true when there are nodes and no pending or failed work.
func (MigrationStatus) IsEmpty ¶
func (s MigrationStatus) IsEmpty() bool
IsEmpty returns true if no nodes have been discovered yet.
type MigrationSummary ¶
type MigrationSummary struct {
ID string
Name string
Phase Phase
CreatedAt time.Time
UpdatedAt time.Time
}
MigrationSummary is a compact projection returned by ListMigrations.
type NodeQueryFilter ¶
type NodeQueryFilter struct {
Queue string
Depth *int
Status string
Excluded *bool
PathLike string
Limit int
Offset int
OrderByPath bool
}
NodeQueryFilter controls review-phase node query behavior.
type Phase ¶
type Phase string
Phase is the migration lifecycle state, stored as the canonical status string.
const ( PhaseCreated Phase = "Roots-Set" // User has set root folders and services (initial state) PhaseFiltersSet Phase = "Filters-Set" // Filters configured, ready for traversal (also used for retry) PhaseTraversing Phase = "Traversal-In-Progress" // Traversal is currently running PhaseReview Phase = "Awaiting-Path-Review" // Traversal finished, user can review results and see copy plan PhaseCopying Phase = "Copy-In-Progress" // Copy phase is currently running PhaseCompleted Phase = "Complete" // Migration completed successfully PhaseSuspended Phase = "Suspended" // Migration suspended (can be resumed) )
func ParsePhase ¶
type QueueMetricsSnapshot ¶
type Result ¶
type Result struct {
RootsSeeded bool
RootSummary RootSeedSummary
Runtime RuntimeStats
Verification VerificationReport
}
Result captures the outcome of a migration run.
func LetsMigrate ¶
LetsMigrate executes setup, traversal, and verification using the supplied configuration. This is the synchronous version - it blocks until the migration completes or is shutdown. For programmatic shutdown control, use StartMigration instead.
type RetrySweepOptions ¶
type RetrySweepOptions struct {
WorkerCount int
MaxRetries int
LogAddress string
LogLevel string
SkipListener bool
MaxKnownDepth int
}
RetrySweepOptions are manager-level knobs for retry sweep runs.
type RootSeedSummary ¶
RootSeedSummary captures verification details after root task seeding.
func SeedRootTasks ¶
func SeedRootTasks(srcRoot types.Folder, dstRoot types.Folder, database *db.DB) (RootSeedSummary, error)
SeedRootTasks inserts the supplied source and destination root folders into the database. The folders should already contain root-relative metadata (LocationPath="/", DepthLevel=0).
type RuntimeState ¶
type RuntimeState struct {
NodesDiscovered int64
TasksPending int64
TasksCompleted int64
BytesCopied int64
Errors int64
}
RuntimeState is the in-memory live status projection for a migration.
type RuntimeStats ¶
type RuntimeStats struct {
Duration time.Duration
Src queue.QueueStats
Dst queue.QueueStats
}
RuntimeStats captures execution statistics at the end of a migration run.
func RunMigration ¶
func RunMigration(cfg MigrationConfig) (RuntimeStats, error)
RunMigration executes the migration traversal using the provided configuration.
func RunRetrySweep ¶
func RunRetrySweep(cfg SweepConfig) (RuntimeStats, error)
RunRetrySweep runs a retry sweep to re-process failed or pending tasks from a previous traversal. This allows re-traversing paths that previously failed (e.g., due to permissions) and discovering new content in those paths.
The sweep checks all known levels up to maxKnownDepth (or auto-detects from DB if -1), then uses normal traversal logic for deeper levels discovered during retry.
Example:
config := migration.SweepConfig{
DuckDB: dbInstance,
SrcAdapter: srcAdapter,
DstAdapter: dstAdapter,
WorkerCount: 10,
MaxRetries: 3,
MaxKnownDepth: 5, // Or -1 to auto-detect
}
stats, err := migration.RunRetrySweep(config)
type SearchRequest ¶
type StopResult ¶
type StopResult struct {
MigrationID string
Phase Phase
RuntimeStatus RuntimeState
Stopped bool
}
StopResult reports stop/suspend state after a stop request.
type SweepConfig ¶
type SweepConfig struct {
DuckDB *db.DB
SrcAdapter types.FSAdapter
DstAdapter types.FSAdapter
WorkerCount int
MaxRetries int
LogAddress string
LogLevel string
SkipListener bool
StartupDelay time.Duration
ProgressTick time.Duration
ShutdownContext context.Context
// For retry sweeps only
MaxKnownDepth int // Maximum known depth from previous traversal (-1 to auto-detect)
SkipAutoETLBeforeRetry bool // If true, skip automatic ETL from DuckDB to DuckDB before retry sweep
DuckDBPath string // Optional: Path to DuckDB file (auto-derived from DuckDB path if empty and ETL is enabled)
}
Gotta, sweep sweep sweep!!! 🧹🧹🧹 SweepConfig is the configuration for running retry sweeps.
type TraversalSummary ¶
type TraversalSummary struct {
SrcTotal int
DstTotal int
SrcPending int
DstPending int
SrcFailed int
DstFailed int
SrcExcluded int
DstExcluded int
}
TraversalSummary is the review projection returned by GetTraversalSummary.
type VerificationReport ¶
type VerificationReport struct {
SrcTotal int
DstTotal int
SrcPending int
DstPending int
SrcFailed int
DstFailed int
DstNotOnSrc int
SrcSuccessful int // Count of successful SRC nodes
DstSuccessful int // Count of successful DST nodes
}
VerificationReport captures aggregate statistics from the verification pass.
func VerifyMigration ¶
func VerifyMigration(database *db.DB, opts VerifyOptions) (VerificationReport, error)
VerifyMigration inspects the DuckDB node tables and stats for pending, failed, or missing nodes and returns a report.
func (VerificationReport) Success ¶
func (r VerificationReport) Success(opts VerifyOptions) bool
Success returns true when the report satisfies the supplied VerifyOptions. Migration is not considered successful unless at least one node was actually moved/traversed.
type VerifyOptions ¶
VerifyOptions define the expectations for post-migration validation.