Documentation
¶
Overview ¶
Package state provides React-like hooks for state management.
This package implements a centralized state management system for pgcopy operations, following patterns similar to React hooks. It provides:
- Thread-safe state mutations via mutex-protected hook methods
- State transition validation to catch programming errors
- Event emission for reactive UI updates
- Snapshot generation for consistent state reads
Thread Safety ¶
All exported methods are safe for concurrent use. The CopyState struct uses a read-write mutex (sync.RWMutex) to protect all shared state. Writers acquire an exclusive lock, while readers can operate concurrently.
State Transitions ¶
Both operation status and table status follow defined state machines:
Operation States:
Initializing -> Preparing -> Copying -> Completed
-> Confirming -> Preparing -> ...
-> Failed | Cancelled (from any state)
Table States:
Pending -> Queued -> Copying -> Completed | Failed | Retrying
-> Skipped | Cancelled (from any non-terminal state)
Invalid transitions are logged but allowed for robustness - this helps identify bugs without breaking functionality in production.
Package state provides a centralized state management system for pgcopy operations Following a React-like hooks pattern for state updates and subscriptions
Index ¶
- type ConnectionDetails
- type ConnectionInfo
- type ConnectionStatus
- type CopyState
- func (s *CopyState) AddError(errorType, message, component string, isFatal bool, context map[string]any)
- func (s *CopyState) AddForeignKey(fk ForeignKey)
- func (s *CopyState) AddLog(level LogLevel, message string, component string, table string, ...)
- func (s *CopyState) AddTable(schema, name string, totalRows int64)
- func (s *CopyState) AddTableError(schema, name string, errorMsg string, context map[string]any)
- func (s *CopyState) GetAllTables() []TableState
- func (s *CopyState) GetSnapshot() CopyStateSnapshot
- func (s *CopyState) GetTablesByStatus(status TableStatus) []string
- func (s *CopyState) SetStatus(status OperationStatus)
- func (s *CopyState) Subscribe(listener Listener)
- func (s *CopyState) Unsubscribe(listener Listener)
- func (s *CopyState) UpdateConnectionDetails(connType, display string, status ConnectionStatus)
- func (s *CopyState) UpdateForeignKeys(mode string, isUsingReplica bool, totalFKs int)
- func (s *CopyState) UpdateMetrics(metrics Metrics)
- func (s *CopyState) UpdateTableProgress(schema, name string, syncedRows int64)
- func (s *CopyState) UpdateTableStatus(schema, name string, status TableStatus)
- type CopyStateSnapshot
- type ErrorEntry
- type Event
- type EventType
- type ForeignKey
- type ForeignKeyState
- type Listener
- type LogEntry
- type LogLevel
- type MetricPoint
- type Metrics
- type OperationConfig
- type OperationStatus
- type Summary
- type TableError
- type TableState
- type TableStatus
- type Warning
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConnectionDetails ¶
type ConnectionDetails struct {
Source ConnectionInfo `json:"source"`
Target ConnectionInfo `json:"target"`
}
ConnectionDetails holds information about source and target connections
type ConnectionInfo ¶
type ConnectionInfo struct {
Display string `json:"display"`
Host string `json:"host"`
Port string `json:"port"`
Database string `json:"database"`
IsFile bool `json:"isFile"`
FilePath string `json:"filePath,omitempty"`
ConnectionID string `json:"connectionId"`
Status ConnectionStatus `json:"status"`
LastPing *time.Time `json:"lastPing,omitempty"`
Metadata map[string]string `json:"metadata"`
}
ConnectionInfo represents connection information
type ConnectionStatus ¶
type ConnectionStatus string
ConnectionStatus represents the status of a database connection
const ( // ConnectionStatusUnknown indicates the connection status is unknown ConnectionStatusUnknown ConnectionStatus = "unknown" // ConnectionStatusConnecting indicates the connection is being established ConnectionStatusConnecting ConnectionStatus = "connecting" // ConnectionStatusConnected indicates the connection is active ConnectionStatusConnected ConnectionStatus = "connected" // ConnectionStatusDisconnected indicates the connection has been disconnected ConnectionStatusDisconnected ConnectionStatus = "disconnected" // ConnectionStatusError indicates the connection has an error ConnectionStatusError ConnectionStatus = "error" )
type CopyState ¶
type CopyState struct {
// Operation metadata
ID string `json:"id"`
Status OperationStatus `json:"status"`
StartTime time.Time `json:"startTime"`
EndTime *time.Time `json:"endTime,omitempty"`
Config OperationConfig `json:"config"`
Connections ConnectionDetails `json:"connections"`
// Progress tracking
Tables []TableState `json:"tables"`
Summary Summary `json:"summary"`
Logs []LogEntry `json:"logs"`
Errors []ErrorEntry `json:"errors"`
Warnings []Warning `json:"warnings"`
// Foreign key management
ForeignKeys ForeignKeyState `json:"foreignKeys"`
// Real-time metrics
Metrics Metrics `json:"metrics"`
// contains filtered or unexported fields
}
CopyState represents the complete state of a pgcopy operation
func NewCopyState ¶
func NewCopyState(id string, config OperationConfig) *CopyState
NewCopyState creates a new copy state instance
func (*CopyState) AddError ¶
func (s *CopyState) AddError(errorType, message, component string, isFatal bool, context map[string]any)
AddError adds a global error to the state
func (*CopyState) AddForeignKey ¶
func (s *CopyState) AddForeignKey(fk ForeignKey)
AddForeignKey adds a foreign key to the state
func (*CopyState) AddLog ¶
func (s *CopyState) AddLog(level LogLevel, message string, component string, table string, context map[string]any)
AddLog adds a log entry to the state
func (*CopyState) AddTableError ¶
AddTableError adds an error to a specific table
func (*CopyState) GetAllTables ¶
func (s *CopyState) GetAllTables() []TableState
GetAllTables returns a copy of all tables in the state
func (*CopyState) GetSnapshot ¶
func (s *CopyState) GetSnapshot() CopyStateSnapshot
GetSnapshot returns a read-only snapshot of the current state
func (*CopyState) GetTablesByStatus ¶
func (s *CopyState) GetTablesByStatus(status TableStatus) []string
GetTablesByStatus returns a slice of table names that have the specified status
func (*CopyState) SetStatus ¶
func (s *CopyState) SetStatus(status OperationStatus)
SetStatus updates the operation status with state transition validation.
Thread Safety: Acquires exclusive lock during state modification. Events: Emits EventOperationStarted, EventOperationCompleted, or EventOperationFailed.
Invalid transitions are logged but allowed for robustness.
func (*CopyState) Unsubscribe ¶
Unsubscribe removes a state listener
func (*CopyState) UpdateConnectionDetails ¶
func (s *CopyState) UpdateConnectionDetails(connType, display string, status ConnectionStatus)
UpdateConnectionDetails updates connection information
func (*CopyState) UpdateForeignKeys ¶
UpdateForeignKeys updates the foreign key state
func (*CopyState) UpdateMetrics ¶
UpdateMetrics updates the real-time metrics
func (*CopyState) UpdateTableProgress ¶
UpdateTableProgress updates the progress of a specific table
func (*CopyState) UpdateTableStatus ¶
func (s *CopyState) UpdateTableStatus(schema, name string, status TableStatus)
UpdateTableStatus updates the status of a specific table
type CopyStateSnapshot ¶
type CopyStateSnapshot struct {
// Operation metadata
ID string `json:"id"`
Status OperationStatus `json:"status"`
StartTime time.Time `json:"startTime"`
EndTime *time.Time `json:"endTime,omitempty"`
Config OperationConfig `json:"config"`
Connections ConnectionDetails `json:"connections"`
// Progress tracking
Tables []TableState `json:"tables"`
Summary Summary `json:"summary"`
Logs []LogEntry `json:"logs"`
Errors []ErrorEntry `json:"errors"`
Warnings []Warning `json:"warnings"`
// Foreign key management
ForeignKeys ForeignKeyState `json:"foreignKeys"`
// Real-time metrics
Metrics Metrics `json:"metrics"`
}
CopyStateSnapshot represents a read-only snapshot of CopyState without mutex
type ErrorEntry ¶
type ErrorEntry struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
Type string `json:"type"`
Message string `json:"message"`
Table string `json:"table,omitempty"`
Component string `json:"component"`
Context map[string]any `json:"context,omitempty"`
Stack string `json:"stack,omitempty"`
IsFatal bool `json:"isFatal"`
IsRetried bool `json:"isRetried"`
}
ErrorEntry represents an error that occurred during the operation
type Event ¶
type Event struct {
Type EventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
Data map[string]any `json:"data,omitempty"`
TableName string `json:"tableName,omitempty"`
}
Event represents different types of state changes
type EventType ¶
type EventType string
EventType represents the type of state change event
const ( // EventOperationStarted indicates the copy operation has started EventOperationStarted EventType = "operation_started" // EventOperationCompleted indicates the copy operation has completed successfully EventOperationCompleted EventType = "operation_completed" // EventOperationFailed indicates the copy operation has failed EventOperationFailed EventType = "operation_failed" // EventTableStarted indicates a table copy has started EventTableStarted EventType = "table_started" // EventTableCompleted indicates a table copy has completed EventTableCompleted EventType = "table_completed" // EventTableFailed indicates a table copy has failed EventTableFailed EventType = "table_failed" // EventTableProgress indicates progress update for a table EventTableProgress EventType = "table_progress" // EventFKDetected indicates a foreign key was detected EventFKDetected EventType = "fk_detected" // EventFKDropped indicates a foreign key was dropped EventFKDropped EventType = "fk_dropped" // EventFKRestored indicates a foreign key was restored EventFKRestored EventType = "fk_restored" // EventLogAdded indicates a new log entry was added EventLogAdded EventType = "log_added" // EventErrorAdded indicates a new error was added EventErrorAdded EventType = "error_added" // EventWarningAdded indicates a new warning was added EventWarningAdded EventType = "warning_added" // EventMetricsUpdated indicates metrics were updated EventMetricsUpdated EventType = "metrics_updated" // EventConfigChanged indicates configuration was changed EventConfigChanged EventType = "config_changed" )
type ForeignKey ¶
type ForeignKey struct {
ConstraintName string `json:"constraintName"`
SourceTable string `json:"sourceTable"`
SourceColumns []string `json:"sourceColumns"`
TargetTable string `json:"targetTable"`
TargetColumns []string `json:"targetColumns"`
OnDelete string `json:"onDelete"`
OnUpdate string `json:"onUpdate"`
IsDeferred bool `json:"isDeferred"`
Definition string `json:"definition"`
}
ForeignKey represents a foreign key constraint
type ForeignKeyState ¶
type ForeignKeyState struct {
Mode string `json:"mode"` // "replica" or "drop_restore"
IsUsingReplicaMode bool `json:"isUsingReplicaMode"`
TotalFKs int `json:"totalFks"`
DroppedFKs int `json:"droppedFks"`
RestoredFKs int `json:"restoredFks"`
FailedRestores int `json:"failedRestores"`
ForeignKeys []ForeignKey `json:"foreignKeys"`
BackupFile string `json:"backupFile"`
HasBackupFile bool `json:"hasBackupFile"`
BackupCreated *time.Time `json:"backupCreated,omitempty"`
Status string `json:"status"`
}
ForeignKeyState tracks the state of foreign key management
type LogEntry ¶
type LogEntry struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
Level LogLevel `json:"level"`
Message string `json:"message"`
Table string `json:"table,omitempty"`
Component string `json:"component"` // "copier", "fk_manager", "validator", etc.
Context map[string]any `json:"context,omitempty"`
}
LogEntry represents a log message
type LogLevel ¶
type LogLevel string
LogLevel represents log severity
const ( // LogLevelDebug indicates debug-level log messages LogLevelDebug LogLevel = "debug" // LogLevelInfo indicates informational log messages LogLevelInfo LogLevel = "info" // LogLevelWarn indicates warning log messages LogLevelWarn LogLevel = "warn" // LogLevelError indicates error log messages LogLevelError LogLevel = "error" )
type MetricPoint ¶
MetricPoint represents a single data point for historical metrics
type Metrics ¶
type Metrics struct {
// Performance metrics
RowsPerSecond float64 `json:"rowsPerSecond"`
TablesPerMinute float64 `json:"tablesPerMinute"`
AverageTableTime float64 `json:"averageTableTime"` // seconds
PeakRowsPerSecond float64 `json:"peakRowsPerSecond"`
// Resource usage
ActiveWorkers int `json:"activeWorkers"`
QueuedTables int `json:"queuedTables"`
MemoryUsageMB float64 `json:"memoryUsageMb"`
CPUUsagePercent float64 `json:"cpuUsagePercent"`
// Database metrics
SourceConnections int `json:"sourceConnections"`
TargetConnections int `json:"targetConnections"`
ActiveQueries int `json:"activeQueries"`
// Network metrics
NetworkLatencyMS float64 `json:"networkLatencyMs"`
DataTransferredMB float64 `json:"dataTransferredMb"`
// Historical data for charts (last 60 data points)
RowsPerSecondHistory []MetricPoint `json:"rowsPerSecondHistory"`
MemoryHistory []MetricPoint `json:"memoryHistory"`
CPUHistory []MetricPoint `json:"cpuHistory"`
}
Metrics holds real-time performance metrics
type OperationConfig ¶
type OperationConfig struct {
// Connection settings
SourceConn string `json:"sourceConn"`
TargetConn string `json:"targetConn"`
// Operation settings
Parallel int `json:"parallel"`
BatchSize int `json:"batchSize"`
IncludeTables []string `json:"includeTables"`
ExcludeTables []string `json:"excludeTables"`
DryRun bool `json:"dryRun"`
SkipBackup bool `json:"skipBackup"`
OutputMode string `json:"outputMode"`
UseCopyPipe bool `json:"useCopyPipe"` // Stream source->dest with COPY ... TO STDOUT / FROM STDIN
CompressPipe bool `json:"compressPipe"` // Gzip compress COPY stream in-flight (local pipe)
// Row count behavior
// When true, pgcopy will compute exact row counts using COUNT(*) per table during discovery.
// This is slower but avoids inaccurate estimates (e.g., after TRUNCATE).
ExactRows bool `json:"exactRows"`
// Unlimited timeouts: when true, pgcopy won't set deadlines on internal DB operations
// (best-effort; still subject to external server timeouts)
NoTimeouts bool `json:"noTimeouts"`
// Snapshot: when true, each table is read inside a REPEATABLE READ transaction
// ensuring a stable snapshot for pagination (new rows added after the table
// copy started won't appear or shift ordering). This helps avoid pagination
// anomalies when using keyset iteration.
Snapshot bool `json:"snapshot"`
}
OperationConfig holds the configuration for the operation
type OperationStatus ¶
type OperationStatus string
OperationStatus represents the current status of the operation
const ( // StatusInitializing indicates the operation is being initialized StatusInitializing OperationStatus = "initializing" // StatusConfirming indicates the operation is waiting for user confirmation StatusConfirming OperationStatus = "confirming" // StatusPreparing indicates the operation is being prepared StatusPreparing OperationStatus = "preparing" // StatusCopying indicates the operation is actively copying data StatusCopying OperationStatus = "copying" // StatusCompleted indicates the operation has completed successfully StatusCompleted OperationStatus = "completed" // StatusFailed indicates the operation has failed StatusFailed OperationStatus = "failed" // StatusCancelled indicates the operation was cancelled by user StatusCancelled OperationStatus = "cancelled" )
type Summary ¶
type Summary struct {
TotalTables int `json:"totalTables"`
CompletedTables int `json:"completedTables"`
FailedTables int `json:"failedTables"`
SkippedTables int `json:"skippedTables"`
TotalRows int64 `json:"totalRows"`
SyncedRows int64 `json:"syncedRows"`
ErrorRows int64 `json:"errorRows"`
OverallProgress float64 `json:"overallProgress"` // 0-100
OverallSpeed float64 `json:"overallSpeed"` // rows per second
EstimatedTimeLeft *int64 `json:"estimatedTimeLeft,omitempty"` // seconds
ElapsedTime int64 `json:"elapsedTime"` // seconds
}
Summary provides overall statistics
type TableError ¶
type TableError struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
Type string `json:"type"`
Message string `json:"message"`
RowNumber *int64 `json:"rowNumber,omitempty"`
Context map[string]any `json:"context,omitempty"`
IsRetried bool `json:"isRetried"`
}
TableError represents an error specific to a table operation
type TableState ¶
type TableState struct {
// Identity
Schema string `json:"schema"`
Name string `json:"name"`
FullName string `json:"fullName"`
// Metadata
Columns []string `json:"columns"`
PKColumns []string `json:"pkColumns"`
HasForeignKey bool `json:"hasForeignKey"`
ForeignKeys []ForeignKey `json:"foreignKeys"`
Dependencies []string `json:"dependencies"`
Dependents []string `json:"dependents"`
// Row counts
TotalRows int64 `json:"totalRows"`
SyncedRows int64 `json:"syncedRows"`
ErrorRows int64 `json:"errorRows"`
SkippedRows int64 `json:"skippedRows"`
// Status and timing
Status TableStatus `json:"status"`
StartTime *time.Time `json:"startTime,omitempty"`
EndTime *time.Time `json:"endTime,omitempty"`
Duration *int64 `json:"duration,omitempty"` // milliseconds
// Progress and performance
Progress float64 `json:"progress"` // 0-100
Speed float64 `json:"speed"` // rows per second
ETA *int64 `json:"eta,omitempty"` // seconds remaining
BatchesDone int `json:"batchesDone"`
BatchesTotal int `json:"batchesTotal"`
// Error tracking
Errors []TableError `json:"errors"`
Warnings []Warning `json:"warnings"`
// Worker assignment
WorkerID int `json:"workerId"`
IsParallel bool `json:"isParallel"`
// Retry logic
RetryCount int `json:"retryCount"`
MaxRetries int `json:"maxRetries"`
LastRetry *time.Time `json:"lastRetry,omitempty"`
RetryStrategy string `json:"retryStrategy"`
}
TableState represents the state of a single table during copying
type TableStatus ¶
type TableStatus string
TableStatus represents the current status of a table
const ( // TableStatusPending indicates the table is pending processing TableStatusPending TableStatus = "pending" // TableStatusQueued indicates the table is queued for processing TableStatusQueued TableStatus = "queued" // TableStatusCopying indicates the table is currently being copied TableStatusCopying TableStatus = "copying" // TableStatusCompleted indicates the table has been copied successfully TableStatusCompleted TableStatus = "completed" // TableStatusFailed indicates the table copy failed TableStatusFailed TableStatus = "failed" // TableStatusSkipped indicates the table was skipped TableStatusSkipped TableStatus = "skipped" // TableStatusRetrying indicates the table is being retried after failure TableStatusRetrying TableStatus = "retrying" // TableStatusCancelled indicates the table copy was cancelled TableStatusCancelled TableStatus = "cancelled" )