copier

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2025 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Package copier provides functionality for copying data between PostgreSQL databases.

It implements a robust, parallel data migration system that handles:

  • Automatic table discovery and dependency ordering
  • Foreign key constraint management (drop/restore or replica mode)
  • Parallel worker pool with configurable concurrency
  • Progress tracking with multiple display modes
  • Graceful shutdown and error recovery

Thread Safety

The Copier struct uses a centralized state management system (state.CopyState) that provides thread-safe operations. Worker goroutines communicate through channels and the shared state system.

Error Handling

Errors are categorized and handled appropriately:

  • Fatal errors: Connection failures, invalid configuration
  • Recoverable errors: Individual table copy failures (continue with other tables)
  • Transient errors: Network timeouts (retry with exponential backoff)

Resource Management

The Copier properly manages database connections, file handles, and goroutines. Always call Close() when done to release resources:

copier, err := New(config)
if err != nil {
    log.Fatal(err)
}
defer copier.Close()
err = copier.Copy(ctx)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ValidateConfig

func ValidateConfig(config *Config) error

ValidateConfig validates the configuration

Types

type Config

type Config = state.OperationConfig

Config is a type alias for state.OperationConfig for backward compatibility

type Copier

type Copier struct {
	// contains filtered or unexported fields
}

Copier handles the data copying operation using centralized state management.

Copier orchestrates the entire data migration process:

  • Discovering tables and their metadata from the source database
  • Managing foreign key constraints (drop/restore or replica mode)
  • Coordinating parallel worker goroutines for data copying
  • Tracking progress and emitting updates to display components

Lifecycle

Create a Copier with New() or NewWithState(), then call Copy() to execute. Always call Close() when finished to release database connections and files.

Configuration

The Copier behavior is controlled by state.OperationConfig:

  • Parallel: Number of concurrent worker goroutines (default: CPU cores)
  • BatchSize: Rows per INSERT batch (affects memory and performance)
  • IncludeTables/ExcludeTables: Table filtering (supports wildcards)
  • DryRun: Validate without modifying destination

Error Recovery

Individual table failures don't stop the entire operation. Failed tables are tracked in state and reported at completion. Foreign keys are restored even on partial failure.

Thread Safety: The Copier itself is not safe for concurrent Copy() calls. Use separate Copier instances for concurrent operations.

func New

func New(config *Config) (*Copier, error)

New creates a new Copier instance

func NewWithState added in v0.4.0

func NewWithState(config *Config, copyState *state.CopyState) (*Copier, error)

NewWithState creates a new Copier instance with an existing state

func NewWithWebPort added in v0.4.0

func NewWithWebPort(config *Config, webPort ...int) (*Copier, error)

NewWithWebPort creates a new Copier instance with web server support

func (*Copier) Close

func (c *Copier) Close()

Close closes database connections and web server

func (*Copier) Copy

func (c *Copier) Copy(ctx context.Context) error

Copy performs the data copying operation

func (*Copier) HasVitalOperations added in v0.7.0

func (c *Copier) HasVitalOperations() bool

HasVitalOperations returns true if there are operations in progress that require graceful shutdown (e.g., FK restoration after copy has started). This is used by the CLI to determine shutdown behavior.

func (*Copier) OnStateChange added in v0.7.0

func (c *Copier) OnStateChange(_ *state.CopyState, event state.Event)

OnStateChange implements state.Listener to capture error events for errors.log

func (*Copier) State added in v0.6.0

func (c *Copier) State() *state.CopyState

State returns the underlying copy state for read-only observation (e.g., status polling). Callers should use the snapshot methods on the state for thread-safe reads.

type CopyStats

type CopyStats = state.Summary

CopyStats is deprecated - use state.Summary instead

type Discovery added in v0.4.0

type Discovery interface {
	DiscoverTables() ([]*TableInfo, error)
	DetectForeignKeys(tables []*TableInfo) error
}

Discovery is responsible for table / FK discovery and basic stats.

type DisplayMode added in v0.2.0

type DisplayMode string

DisplayMode represents different output modes for progress tracking

const (
	// DisplayModeRaw shows minimal output, suitable for headless/CI environments (default)
	// Note: Internal name remains "raw" for backwards compatibility in code
	DisplayModeRaw DisplayMode = "raw"
	// DisplayModeProgress shows a progress bar
	DisplayModeProgress DisplayMode = "progress"
	// DisplayModeInteractive shows an interactive live display with table details
	DisplayModeInteractive DisplayMode = "interactive"
	// DisplayModeWeb launches a web interface for monitoring the copy process
	DisplayModeWeb DisplayMode = "web"
)

type Executor added in v0.4.0

type Executor interface {
	Execute(ctx context.Context, tables []*TableInfo) error
}

Executor runs the data movement for a planned set.

type ForeignKey

type ForeignKey struct {
	ConstraintName    string
	Schema            string
	Table             string
	Columns           []string
	ReferencedSchema  string
	ReferencedTable   string
	ReferencedColumns []string
	OnDelete          string
	OnUpdate          string
	Definition        string // Full CREATE CONSTRAINT statement
}

ForeignKey represents a foreign key constraint

type ForeignKeyManager

type ForeignKeyManager struct {
	// contains filtered or unexported fields
}

ForeignKeyManager handles foreign key operations

func NewForeignKeyManager

func NewForeignKeyManager(db *sql.DB, logger *utils.SimpleLogger, noTimeouts bool) *ForeignKeyManager

NewForeignKeyManager creates a new foreign key manager

func (*ForeignKeyManager) Cleanup added in v0.4.0

func (fkm *ForeignKeyManager) Cleanup() error

Cleanup removes residual FK backup artifacts after successful run.

func (*ForeignKeyManager) CleanupBackupFile

func (fkm *ForeignKeyManager) CleanupBackupFile() error

CleanupBackupFile removes the backup file on successful completion

func (*ForeignKeyManager) Detect added in v0.4.0

func (fkm *ForeignKeyManager) Detect(tables []*TableInfo) error

Detect discovers foreign key constraints for provided tables (ForeignKeyStrategy implementation).

func (*ForeignKeyManager) DetectForeignKeys

func (fkm *ForeignKeyManager) DetectForeignKeys(tables []*TableInfo) error

DetectForeignKeys discovers all foreign keys in the database

func (*ForeignKeyManager) DisableReplicaMode

func (fkm *ForeignKeyManager) DisableReplicaMode() error

DisableReplicaMode disables replica mode

func (*ForeignKeyManager) DropAllForeignKeys added in v0.6.0

func (fkm *ForeignKeyManager) DropAllForeignKeys(tables []*TableInfo) error

DropAllForeignKeys drops all incoming foreign keys referencing any of the planned tables. This prepares the database for TRUNCATE without CASCADE across the whole plan.

func (*ForeignKeyManager) DropForeignKeysForTable

func (fkm *ForeignKeyManager) DropForeignKeysForTable(table *TableInfo) error

DropForeignKeysForTable drops all foreign keys that reference or are referenced by a table

func (*ForeignKeyManager) GetForeignKeyStats

func (fkm *ForeignKeyManager) GetForeignKeyStats() (total, dropped int)

GetForeignKeyStats returns statistics about foreign keys

func (*ForeignKeyManager) IsUsingReplicaMode

func (fkm *ForeignKeyManager) IsUsingReplicaMode() bool

IsUsingReplicaMode returns whether replica mode is enabled

func (*ForeignKeyManager) RecoverFromBackupFile

func (fkm *ForeignKeyManager) RecoverFromBackupFile() error

RecoverFromBackupFile attempts to restore FKs from backup file if they exist but weren't tracked

func (*ForeignKeyManager) RestoreAllForeignKeys added in v0.6.0

func (fkm *ForeignKeyManager) RestoreAllForeignKeys() error

RestoreAllForeignKeys attempts to restore all previously dropped FKs.

func (*ForeignKeyManager) SetLogger added in v0.2.0

func (fkm *ForeignKeyManager) SetLogger(logger *utils.SimpleLogger)

SetLogger updates the logger for the foreign key manager

func (*ForeignKeyManager) TryUseReplicaMode

func (fkm *ForeignKeyManager) TryUseReplicaMode() error

TryUseReplicaMode attempts to use replica mode for FK handling

func (*ForeignKeyManager) ValidateRestoredForeignKeys added in v0.7.0

func (fkm *ForeignKeyManager) ValidateRestoredForeignKeys() error

ValidateRestoredForeignKeys validates constraints created as NOT VALID during restoration.

type ForeignKeyStrategy added in v0.4.0

type ForeignKeyStrategy interface {
	Detect(tables []*TableInfo) error
	Cleanup() error // after all tables
}

ForeignKeyStrategy abstracts FK handling modes.

type InteractiveDisplay added in v0.2.0

type InteractiveDisplay struct {
	// contains filtered or unexported fields
}

InteractiveDisplay manages the interactive CLI display

func NewInteractiveDisplay added in v0.2.0

func NewInteractiveDisplay(state *state.CopyState) *InteractiveDisplay

NewInteractiveDisplay creates a new interactive display

func (*InteractiveDisplay) Start added in v0.2.0

func (d *InteractiveDisplay) Start()

Start begins the interactive display loop

func (*InteractiveDisplay) Stop added in v0.2.0

func (d *InteractiveDisplay) Stop()

Stop stops the interactive display

type Persistence added in v0.4.0

type Persistence interface {
}

Persistence handles durable logging / summaries.

type Planner added in v0.4.0

type Planner interface {
	PlanTables(tables []*TableInfo) ([]*TableInfo, error)
	PlanLayers(tables []*TableInfo) ([][]*TableInfo, error)
}

Planner orders tables & resolves dependencies (currently passthrough).

type ProgressSink added in v0.4.0

type ProgressSink interface {
	UpdateTable(schema, table string, rowsCopied int64)
	Log(level, msg, scope, table string)
	Done()
}

ProgressSink receives progress events decoupled from execution.

type Reporter added in v0.4.0

type Reporter interface {
}

Reporter handles state broadcasting / console rendering (future use).

type TableInfo

type TableInfo = state.TableState

TableInfo is a type alias for state.TableState for backward compatibility

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL