state

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package state provides React-like hooks for state management.

This package implements a centralized state management system for pgcopy operations, following patterns similar to React hooks. It provides:

  • Thread-safe state mutations via mutex-protected hook methods
  • State transition validation to catch programming errors
  • Event emission for reactive UI updates
  • Snapshot generation for consistent state reads

Thread Safety

All exported methods are safe for concurrent use. The CopyState struct uses a read-write mutex (sync.RWMutex) to protect all shared state. Writers acquire an exclusive lock, while readers can operate concurrently.

State Transitions

Both operation status and table status follow defined state machines:

Operation States:

Initializing -> Preparing -> Copying -> Completed
             -> Confirming -> Preparing -> ...
             -> Failed | Cancelled (from any state)

Table States:

Pending -> Queued -> Copying -> Completed | Failed | Retrying
        -> Skipped | Cancelled (from any non-terminal state)

Invalid transitions are logged but allowed for robustness - this helps identify bugs without breaking functionality in production.

Package state provides a centralized state management system for pgcopy operations Following a React-like hooks pattern for state updates and subscriptions

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnectionDetails

type ConnectionDetails struct {
	Source ConnectionInfo `json:"source"`
	Target ConnectionInfo `json:"target"`
}

ConnectionDetails holds information about source and target connections

type ConnectionInfo

type ConnectionInfo struct {
	Display      string            `json:"display"`
	Host         string            `json:"host"`
	Port         string            `json:"port"`
	Database     string            `json:"database"`
	IsFile       bool              `json:"isFile"`
	FilePath     string            `json:"filePath,omitempty"`
	ConnectionID string            `json:"connectionId"`
	Status       ConnectionStatus  `json:"status"`
	LastPing     *time.Time        `json:"lastPing,omitempty"`
	Metadata     map[string]string `json:"metadata"`
}

ConnectionInfo represents connection information

type ConnectionStatus

type ConnectionStatus string

ConnectionStatus represents the status of a database connection

const (
	// ConnectionStatusUnknown indicates the connection status is unknown
	ConnectionStatusUnknown ConnectionStatus = "unknown"
	// ConnectionStatusConnecting indicates the connection is being established
	ConnectionStatusConnecting ConnectionStatus = "connecting"
	// ConnectionStatusConnected indicates the connection is active
	ConnectionStatusConnected ConnectionStatus = "connected"
	// ConnectionStatusDisconnected indicates the connection has been disconnected
	ConnectionStatusDisconnected ConnectionStatus = "disconnected"
	// ConnectionStatusError indicates the connection has an error
	ConnectionStatusError ConnectionStatus = "error"
)

type CopyState

type CopyState struct {

	// Operation metadata
	ID          string            `json:"id"`
	Status      OperationStatus   `json:"status"`
	StartTime   time.Time         `json:"startTime"`
	EndTime     *time.Time        `json:"endTime,omitempty"`
	Config      OperationConfig   `json:"config"`
	Connections ConnectionDetails `json:"connections"`

	// Progress tracking
	Tables   []TableState `json:"tables"`
	Summary  Summary      `json:"summary"`
	Logs     []LogEntry   `json:"logs"`
	Errors   []ErrorEntry `json:"errors"`
	Warnings []Warning    `json:"warnings"`

	// Foreign key management
	ForeignKeys ForeignKeyState `json:"foreignKeys"`

	// Real-time metrics
	Metrics Metrics `json:"metrics"`
	// contains filtered or unexported fields
}

CopyState represents the complete state of a pgcopy operation

func NewCopyState

func NewCopyState(id string, config OperationConfig) *CopyState

NewCopyState creates a new copy state instance

func (*CopyState) AddError

func (s *CopyState) AddError(errorType, message, component string, isFatal bool, context map[string]any)

AddError adds a global error to the state

func (*CopyState) AddForeignKey

func (s *CopyState) AddForeignKey(fk ForeignKey)

AddForeignKey adds a foreign key to the state

func (*CopyState) AddLog

func (s *CopyState) AddLog(level LogLevel, message string, component string, table string, context map[string]any)

AddLog adds a log entry to the state

func (*CopyState) AddTable

func (s *CopyState) AddTable(schema, name string, totalRows int64)

AddTable adds a new table to the state

func (*CopyState) AddTableError

func (s *CopyState) AddTableError(schema, name string, errorMsg string, context map[string]any)

AddTableError adds an error to a specific table

func (*CopyState) GetAllTables

func (s *CopyState) GetAllTables() []TableState

GetAllTables returns a copy of all tables in the state

func (*CopyState) GetSnapshot

func (s *CopyState) GetSnapshot() CopyStateSnapshot

GetSnapshot returns a read-only snapshot of the current state

func (*CopyState) GetTablesByStatus

func (s *CopyState) GetTablesByStatus(status TableStatus) []string

GetTablesByStatus returns a slice of table names that have the specified status

func (*CopyState) SetStatus

func (s *CopyState) SetStatus(status OperationStatus)

SetStatus updates the operation status with state transition validation.

Thread Safety: Acquires exclusive lock during state modification. Events: Emits EventOperationStarted, EventOperationCompleted, or EventOperationFailed.

Invalid transitions are logged but allowed for robustness.

func (*CopyState) Subscribe

func (s *CopyState) Subscribe(listener Listener)

Subscribe adds a state listener

func (*CopyState) Unsubscribe

func (s *CopyState) Unsubscribe(listener Listener)

Unsubscribe removes a state listener

func (*CopyState) UpdateConnectionDetails

func (s *CopyState) UpdateConnectionDetails(connType, display string, status ConnectionStatus)

UpdateConnectionDetails updates connection information

func (*CopyState) UpdateForeignKeys

func (s *CopyState) UpdateForeignKeys(mode string, isUsingReplica bool, totalFKs int)

UpdateForeignKeys updates the foreign key state

func (*CopyState) UpdateMetrics

func (s *CopyState) UpdateMetrics(metrics Metrics)

UpdateMetrics updates the real-time metrics

func (*CopyState) UpdateTableProgress

func (s *CopyState) UpdateTableProgress(schema, name string, syncedRows int64)

UpdateTableProgress updates the progress of a specific table

func (*CopyState) UpdateTableStatus

func (s *CopyState) UpdateTableStatus(schema, name string, status TableStatus)

UpdateTableStatus updates the status of a specific table

type CopyStateSnapshot

type CopyStateSnapshot struct {
	// Operation metadata
	ID          string            `json:"id"`
	Status      OperationStatus   `json:"status"`
	StartTime   time.Time         `json:"startTime"`
	EndTime     *time.Time        `json:"endTime,omitempty"`
	Config      OperationConfig   `json:"config"`
	Connections ConnectionDetails `json:"connections"`

	// Progress tracking
	Tables   []TableState `json:"tables"`
	Summary  Summary      `json:"summary"`
	Logs     []LogEntry   `json:"logs"`
	Errors   []ErrorEntry `json:"errors"`
	Warnings []Warning    `json:"warnings"`

	// Foreign key management
	ForeignKeys ForeignKeyState `json:"foreignKeys"`

	// Real-time metrics
	Metrics Metrics `json:"metrics"`
}

CopyStateSnapshot represents a read-only snapshot of CopyState without mutex

type ErrorEntry

type ErrorEntry struct {
	ID        string         `json:"id"`
	Timestamp time.Time      `json:"timestamp"`
	Type      string         `json:"type"`
	Message   string         `json:"message"`
	Table     string         `json:"table,omitempty"`
	Component string         `json:"component"`
	Context   map[string]any `json:"context,omitempty"`
	Stack     string         `json:"stack,omitempty"`
	IsFatal   bool           `json:"isFatal"`
	IsRetried bool           `json:"isRetried"`
}

ErrorEntry represents an error that occurred during the operation

type Event

type Event struct {
	Type      EventType      `json:"type"`
	Timestamp time.Time      `json:"timestamp"`
	Data      map[string]any `json:"data,omitempty"`
	TableName string         `json:"tableName,omitempty"`
}

Event represents different types of state changes

type EventType

type EventType string

EventType represents the type of state change event

const (
	// EventOperationStarted indicates the copy operation has started
	EventOperationStarted EventType = "operation_started"
	// EventOperationCompleted indicates the copy operation has completed successfully
	EventOperationCompleted EventType = "operation_completed"
	// EventOperationFailed indicates the copy operation has failed
	EventOperationFailed EventType = "operation_failed"
	// EventTableStarted indicates a table copy has started
	EventTableStarted EventType = "table_started"
	// EventTableCompleted indicates a table copy has completed
	EventTableCompleted EventType = "table_completed"
	// EventTableFailed indicates a table copy has failed
	EventTableFailed EventType = "table_failed"
	// EventTableProgress indicates progress update for a table
	EventTableProgress EventType = "table_progress"
	// EventFKDetected indicates a foreign key was detected
	EventFKDetected EventType = "fk_detected"
	// EventFKDropped indicates a foreign key was dropped
	EventFKDropped EventType = "fk_dropped"
	// EventFKRestored indicates a foreign key was restored
	EventFKRestored EventType = "fk_restored"
	// EventLogAdded indicates a new log entry was added
	EventLogAdded EventType = "log_added"
	// EventErrorAdded indicates a new error was added
	EventErrorAdded EventType = "error_added"
	// EventWarningAdded indicates a new warning was added
	EventWarningAdded EventType = "warning_added"
	// EventMetricsUpdated indicates metrics were updated
	EventMetricsUpdated EventType = "metrics_updated"
	// EventConfigChanged indicates configuration was changed
	EventConfigChanged EventType = "config_changed"
)

type ForeignKey

type ForeignKey struct {
	ConstraintName string   `json:"constraintName"`
	SourceTable    string   `json:"sourceTable"`
	SourceColumns  []string `json:"sourceColumns"`
	TargetTable    string   `json:"targetTable"`
	TargetColumns  []string `json:"targetColumns"`
	OnDelete       string   `json:"onDelete"`
	OnUpdate       string   `json:"onUpdate"`
	IsDeferred     bool     `json:"isDeferred"`
	Definition     string   `json:"definition"`
}

ForeignKey represents a foreign key constraint

type ForeignKeyState

type ForeignKeyState struct {
	Mode               string       `json:"mode"` // "replica" or "drop_restore"
	IsUsingReplicaMode bool         `json:"isUsingReplicaMode"`
	TotalFKs           int          `json:"totalFks"`
	DroppedFKs         int          `json:"droppedFks"`
	RestoredFKs        int          `json:"restoredFks"`
	FailedRestores     int          `json:"failedRestores"`
	ForeignKeys        []ForeignKey `json:"foreignKeys"`
	BackupFile         string       `json:"backupFile"`
	HasBackupFile      bool         `json:"hasBackupFile"`
	BackupCreated      *time.Time   `json:"backupCreated,omitempty"`
	Status             string       `json:"status"`
}

ForeignKeyState tracks the state of foreign key management

type Listener

type Listener interface {
	OnStateChange(state *CopyState, event Event)
}

Listener defines the interface for state change listeners

type LogEntry

type LogEntry struct {
	ID        string         `json:"id"`
	Timestamp time.Time      `json:"timestamp"`
	Level     LogLevel       `json:"level"`
	Message   string         `json:"message"`
	Table     string         `json:"table,omitempty"`
	Component string         `json:"component"` // "copier", "fk_manager", "validator", etc.
	Context   map[string]any `json:"context,omitempty"`
}

LogEntry represents a log message

type LogLevel

type LogLevel string

LogLevel represents log severity

const (
	// LogLevelDebug indicates debug-level log messages
	LogLevelDebug LogLevel = "debug"
	// LogLevelInfo indicates informational log messages
	LogLevelInfo LogLevel = "info"
	// LogLevelWarn indicates warning log messages
	LogLevelWarn LogLevel = "warn"
	// LogLevelError indicates error log messages
	LogLevelError LogLevel = "error"
)

type MetricPoint

type MetricPoint struct {
	Timestamp time.Time `json:"timestamp"`
	Value     float64   `json:"value"`
}

MetricPoint represents a single data point for historical metrics

type Metrics

type Metrics struct {
	// Performance metrics
	RowsPerSecond     float64 `json:"rowsPerSecond"`
	TablesPerMinute   float64 `json:"tablesPerMinute"`
	AverageTableTime  float64 `json:"averageTableTime"` // seconds
	PeakRowsPerSecond float64 `json:"peakRowsPerSecond"`

	// Resource usage
	ActiveWorkers   int     `json:"activeWorkers"`
	QueuedTables    int     `json:"queuedTables"`
	MemoryUsageMB   float64 `json:"memoryUsageMb"`
	CPUUsagePercent float64 `json:"cpuUsagePercent"`

	// Database metrics
	SourceConnections int `json:"sourceConnections"`
	TargetConnections int `json:"targetConnections"`
	ActiveQueries     int `json:"activeQueries"`

	// Network metrics
	NetworkLatencyMS  float64 `json:"networkLatencyMs"`
	DataTransferredMB float64 `json:"dataTransferredMb"`

	// Historical data for charts (last 60 data points)
	RowsPerSecondHistory []MetricPoint `json:"rowsPerSecondHistory"`
	MemoryHistory        []MetricPoint `json:"memoryHistory"`
	CPUHistory           []MetricPoint `json:"cpuHistory"`
}

Metrics holds real-time performance metrics

type OperationConfig

type OperationConfig struct {
	// Connection settings
	SourceConn string `json:"sourceConn"`
	TargetConn string `json:"targetConn"`

	// Operation settings
	Parallel      int      `json:"parallel"`
	BatchSize     int      `json:"batchSize"`
	IncludeTables []string `json:"includeTables"`
	ExcludeTables []string `json:"excludeTables"`
	DryRun        bool     `json:"dryRun"`
	SkipBackup    bool     `json:"skipBackup"`
	OutputMode    string   `json:"outputMode"`
	UseCopyPipe   bool     `json:"useCopyPipe"`  // Stream source->dest with COPY ... TO STDOUT / FROM STDIN
	CompressPipe  bool     `json:"compressPipe"` // Gzip compress COPY stream in-flight (local pipe)

	// Row count behavior
	// When true, pgcopy will compute exact row counts using COUNT(*) per table during discovery.
	// This is slower but avoids inaccurate estimates (e.g., after TRUNCATE).
	ExactRows bool `json:"exactRows"`

	// Unlimited timeouts: when true, pgcopy won't set deadlines on internal DB operations
	// (best-effort; still subject to external server timeouts)
	NoTimeouts bool `json:"noTimeouts"`

	// Snapshot: when true, each table is read inside a REPEATABLE READ transaction
	// ensuring a stable snapshot for pagination (new rows added after the table
	// copy started won't appear or shift ordering). This helps avoid pagination
	// anomalies when using keyset iteration.
	Snapshot bool `json:"snapshot"`
}

OperationConfig holds the configuration for the operation

type OperationStatus

type OperationStatus string

OperationStatus represents the current status of the operation

const (
	// StatusInitializing indicates the operation is being initialized
	StatusInitializing OperationStatus = "initializing"
	// StatusConfirming indicates the operation is waiting for user confirmation
	StatusConfirming OperationStatus = "confirming"
	// StatusPreparing indicates the operation is being prepared
	StatusPreparing OperationStatus = "preparing"
	// StatusCopying indicates the operation is actively copying data
	StatusCopying OperationStatus = "copying"
	// StatusCompleted indicates the operation has completed successfully
	StatusCompleted OperationStatus = "completed"
	// StatusFailed indicates the operation has failed
	StatusFailed OperationStatus = "failed"
	// StatusCancelled indicates the operation was cancelled by user
	StatusCancelled OperationStatus = "cancelled"
)

type Summary

type Summary struct {
	TotalTables       int     `json:"totalTables"`
	CompletedTables   int     `json:"completedTables"`
	FailedTables      int     `json:"failedTables"`
	SkippedTables     int     `json:"skippedTables"`
	TotalRows         int64   `json:"totalRows"`
	SyncedRows        int64   `json:"syncedRows"`
	ErrorRows         int64   `json:"errorRows"`
	OverallProgress   float64 `json:"overallProgress"`             // 0-100
	OverallSpeed      float64 `json:"overallSpeed"`                // rows per second
	EstimatedTimeLeft *int64  `json:"estimatedTimeLeft,omitempty"` // seconds
	ElapsedTime       int64   `json:"elapsedTime"`                 // seconds
}

Summary provides overall statistics

type TableError

type TableError struct {
	ID        string         `json:"id"`
	Timestamp time.Time      `json:"timestamp"`
	Type      string         `json:"type"`
	Message   string         `json:"message"`
	RowNumber *int64         `json:"rowNumber,omitempty"`
	Context   map[string]any `json:"context,omitempty"`
	IsRetried bool           `json:"isRetried"`
}

TableError represents an error specific to a table operation

type TableState

type TableState struct {
	// Identity
	Schema   string `json:"schema"`
	Name     string `json:"name"`
	FullName string `json:"fullName"`

	// Metadata
	Columns       []string     `json:"columns"`
	PKColumns     []string     `json:"pkColumns"`
	HasForeignKey bool         `json:"hasForeignKey"`
	ForeignKeys   []ForeignKey `json:"foreignKeys"`
	Dependencies  []string     `json:"dependencies"`
	Dependents    []string     `json:"dependents"`

	// Row counts
	TotalRows   int64 `json:"totalRows"`
	SyncedRows  int64 `json:"syncedRows"`
	ErrorRows   int64 `json:"errorRows"`
	SkippedRows int64 `json:"skippedRows"`

	// Status and timing
	Status    TableStatus `json:"status"`
	StartTime *time.Time  `json:"startTime,omitempty"`
	EndTime   *time.Time  `json:"endTime,omitempty"`
	Duration  *int64      `json:"duration,omitempty"` // milliseconds

	// Progress and performance
	Progress     float64 `json:"progress"`      // 0-100
	Speed        float64 `json:"speed"`         // rows per second
	ETA          *int64  `json:"eta,omitempty"` // seconds remaining
	BatchesDone  int     `json:"batchesDone"`
	BatchesTotal int     `json:"batchesTotal"`

	// Error tracking
	Errors   []TableError `json:"errors"`
	Warnings []Warning    `json:"warnings"`

	// Worker assignment
	WorkerID   int  `json:"workerId"`
	IsParallel bool `json:"isParallel"`

	// Retry logic
	RetryCount    int        `json:"retryCount"`
	MaxRetries    int        `json:"maxRetries"`
	LastRetry     *time.Time `json:"lastRetry,omitempty"`
	RetryStrategy string     `json:"retryStrategy"`
}

TableState represents the state of a single table during copying

type TableStatus

type TableStatus string

TableStatus represents the current status of a table

const (
	// TableStatusPending indicates the table is pending processing
	TableStatusPending TableStatus = "pending"
	// TableStatusQueued indicates the table is queued for processing
	TableStatusQueued TableStatus = "queued"
	// TableStatusCopying indicates the table is currently being copied
	TableStatusCopying TableStatus = "copying"
	// TableStatusCompleted indicates the table has been copied successfully
	TableStatusCompleted TableStatus = "completed"
	// TableStatusFailed indicates the table copy failed
	TableStatusFailed TableStatus = "failed"
	// TableStatusSkipped indicates the table was skipped
	TableStatusSkipped TableStatus = "skipped"
	// TableStatusRetrying indicates the table is being retried after failure
	TableStatusRetrying TableStatus = "retrying"
	// TableStatusCancelled indicates the table copy was cancelled
	TableStatusCancelled TableStatus = "cancelled"
)

type Warning

type Warning struct {
	ID        string         `json:"id"`
	Timestamp time.Time      `json:"timestamp"`
	Type      string         `json:"type"`
	Message   string         `json:"message"`
	Table     string         `json:"table,omitempty"`
	Context   map[string]any `json:"context,omitempty"`
}

Warning represents a warning message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL