Documentation
¶
Overview ¶
Package replicator provides PostgreSQL replication functionality for pg_flo.
Index ¶
- Constants
- Variables
- func GeneratePublicationName(group string) string
- func InitializeOIDMap(ctx context.Context, conn StandardConnection) error
- type BaseFactory
- type BaseReplicator
- func (r *BaseReplicator) AddPrimaryKeyInfo(message *utils.CDCMessage, table string)
- func (r *BaseReplicator) CheckReplicationSlotExists(slotName string) (bool, error)
- func (r *BaseReplicator) CheckReplicationSlotStatus(ctx context.Context) error
- func (r *BaseReplicator) CreatePublication() error
- func (r *BaseReplicator) CreateReplicationSlot(ctx context.Context) error
- func (r *BaseReplicator) CurrentTxBuffer() []utils.CDCMessage
- func (r *BaseReplicator) GetConfiguredTables(ctx context.Context) ([]string, error)
- func (r *BaseReplicator) GetLastState() (pglogrepl.LSN, error)
- func (r *BaseReplicator) GracefulShutdown(ctx context.Context) error
- func (r *BaseReplicator) HandleBeginMessage(msg *pglogrepl.BeginMessage) error
- func (r *BaseReplicator) HandleCommitMessage(msg *pglogrepl.CommitMessage) error
- func (r *BaseReplicator) HandleDeleteMessage(msg *pglogrepl.DeleteMessage, lsn pglogrepl.LSN) error
- func (r *BaseReplicator) HandleInsertMessage(msg *pglogrepl.InsertMessage, lsn pglogrepl.LSN) error
- func (r *BaseReplicator) HandleUpdateMessage(msg *pglogrepl.UpdateMessage, lsn pglogrepl.LSN) error
- func (r *BaseReplicator) InitializePrimaryKeyInfo() error
- func (r *BaseReplicator) ProcessNextMessage(ctx context.Context, lastStatusUpdate *time.Time, ...) error
- func (r *BaseReplicator) PublishToNATS(data utils.CDCMessage) error
- func (r *BaseReplicator) SaveState(lsn pglogrepl.LSN) error
- func (r *BaseReplicator) SendStandbyStatusUpdate(ctx context.Context) error
- func (r *BaseReplicator) SetCurrentTxBuffer(messages []utils.CDCMessage)
- func (r *BaseReplicator) Start(ctx context.Context) error
- func (r *BaseReplicator) StartReplicationFromLSN(ctx context.Context, startLSN pglogrepl.LSN, stopChan <-chan struct{}) error
- func (r *BaseReplicator) Stop(ctx context.Context) error
- func (r *BaseReplicator) StreamChanges(ctx context.Context, stopChan <-chan struct{}) error
- type Buffer
- type Config
- type CopyAndStreamReplicator
- func (r *CopyAndStreamReplicator) CopyTable(ctx context.Context, tableName, snapshotID string) error
- func (r *CopyAndStreamReplicator) CopyTableRange(ctx context.Context, tableName string, startPage, endPage uint32, ...) (int64, error)
- func (r *CopyAndStreamReplicator) CopyTableWorker(ctx context.Context, wg *sync.WaitGroup, errChan chan<- error, ...)
- func (r *CopyAndStreamReplicator) CopyTables(ctx context.Context, tables []string, snapshotID string) error
- func (r *CopyAndStreamReplicator) ParallelCopy(ctx context.Context) error
- func (r *CopyAndStreamReplicator) Start(ctx context.Context) error
- func (r *CopyAndStreamReplicator) Stop(ctx context.Context) error
- type CopyAndStreamReplicatorFactory
- type DDLReplicator
- func (d *DDLReplicator) Close(ctx context.Context) error
- func (d *DDLReplicator) HasPendingDDLEvents(ctx context.Context) (bool, error)
- func (d *DDLReplicator) ProcessDDLEvents(ctx context.Context) error
- func (d *DDLReplicator) SetupDDLTracking(ctx context.Context) error
- func (d *DDLReplicator) Shutdown(ctx context.Context) error
- func (d *DDLReplicator) StartDDLReplication(ctx context.Context)
- type Factory
- type NATSClient
- type PgxPoolConn
- type PgxPoolConnWrapper
- type PostgresReplicationConnection
- func (rc *PostgresReplicationConnection) Close(ctx context.Context) error
- func (rc *PostgresReplicationConnection) Connect(ctx context.Context) error
- func (rc *PostgresReplicationConnection) CreateReplicationSlot(ctx context.Context, slotName string) (pglogrepl.CreateReplicationSlotResult, error)
- func (rc *PostgresReplicationConnection) IsHealthy(ctx context.Context) bool
- func (rc *PostgresReplicationConnection) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error)
- func (rc *PostgresReplicationConnection) Reconnect(ctx context.Context) error
- func (rc *PostgresReplicationConnection) SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error
- func (rc *PostgresReplicationConnection) StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, ...) error
- type ReplicationConnection
- type ReplicationError
- type Replicator
- type StandardConnection
- type StandardConnectionImpl
- func (s *StandardConnectionImpl) Acquire(ctx context.Context) (PgxPoolConn, error)
- func (s *StandardConnectionImpl) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
- func (s *StandardConnectionImpl) Close(_ context.Context) error
- func (s *StandardConnectionImpl) Connect(ctx context.Context) error
- func (s *StandardConnectionImpl) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
- func (s *StandardConnectionImpl) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
- func (s *StandardConnectionImpl) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
- type StreamReplicator
- type StreamReplicatorFactory
Constants ¶
const (
// DefaultSchema is the default PostgreSQL schema name
DefaultSchema = "public"
)
Variables ¶
var ( // ErrReplicatorAlreadyStarted is returned when attempting to start an already running replicator ErrReplicatorAlreadyStarted = errors.New("replicator already started") // ErrReplicatorNotStarted is returned when attempting to stop a non-running replicator ErrReplicatorNotStarted = errors.New("replicator not started") // ErrReplicatorAlreadyStopped is returned when attempting to stop an already stopped replicator ErrReplicatorAlreadyStopped = errors.New("replicator already stopped") )
Functions ¶
func GeneratePublicationName ¶
GeneratePublicationName generates a deterministic publication name based on the group name
func InitializeOIDMap ¶
func InitializeOIDMap(ctx context.Context, conn StandardConnection) error
InitializeOIDMap initializes the OID to type name map with custom types from the database
Types ¶
type BaseFactory ¶ added in v0.0.12
type BaseFactory struct{}
BaseFactory provides common functionality for factories
func (*BaseFactory) CreateConnections ¶ added in v0.0.12
func (f *BaseFactory) CreateConnections(config Config) (ReplicationConnection, StandardConnection, error)
CreateConnections creates replication and standard connections
type BaseReplicator ¶
type BaseReplicator struct {
Config Config
ReplicationConn ReplicationConnection
StandardConn StandardConnection
DDLReplicator *DDLReplicator
Relations map[uint32]*pglogrepl.RelationMessage
Logger utils.Logger
TableDetails map[string][]string
LastLSN pglogrepl.LSN
NATSClient NATSClient
TableReplicationKeys map[string]utils.ReplicationKey
// contains filtered or unexported fields
}
BaseReplicator provides core functionality for PostgreSQL logical replication
func NewBaseReplicator ¶
func NewBaseReplicator(config Config, replicationConn ReplicationConnection, standardConn StandardConnection, natsClient NATSClient) *BaseReplicator
NewBaseReplicator creates a new BaseReplicator instance
func (*BaseReplicator) AddPrimaryKeyInfo ¶
func (r *BaseReplicator) AddPrimaryKeyInfo(message *utils.CDCMessage, table string)
AddPrimaryKeyInfo adds replication key information to the CDCMessage
func (*BaseReplicator) CheckReplicationSlotExists ¶
func (r *BaseReplicator) CheckReplicationSlotExists(slotName string) (bool, error)
CheckReplicationSlotExists checks if a slot with the given name already exists
func (*BaseReplicator) CheckReplicationSlotStatus ¶
func (r *BaseReplicator) CheckReplicationSlotStatus(ctx context.Context) error
CheckReplicationSlotStatus checks the status of the replication slot and validates it for reconnection
func (*BaseReplicator) CreatePublication ¶
func (r *BaseReplicator) CreatePublication() error
CreatePublication creates a new publication if it doesn't exist
func (*BaseReplicator) CreateReplicationSlot ¶
func (r *BaseReplicator) CreateReplicationSlot(ctx context.Context) error
CreateReplicationSlot ensures that a replication slot exists, creating one if necessary
func (*BaseReplicator) CurrentTxBuffer ¶ added in v0.0.12
func (r *BaseReplicator) CurrentTxBuffer() []utils.CDCMessage
CurrentTxBuffer returns the current transaction buffer (for testing)
func (*BaseReplicator) GetConfiguredTables ¶
func (r *BaseReplicator) GetConfiguredTables(ctx context.Context) ([]string, error)
GetConfiguredTables returns all tables based on configuration If no specific tables are configured, returns all tables from the configured schema
func (*BaseReplicator) GetLastState ¶
func (r *BaseReplicator) GetLastState() (pglogrepl.LSN, error)
GetLastState retrieves the last saved replication state
func (*BaseReplicator) GracefulShutdown ¶
func (r *BaseReplicator) GracefulShutdown(ctx context.Context) error
GracefulShutdown performs a graceful shutdown of the replicator
func (*BaseReplicator) HandleBeginMessage ¶
func (r *BaseReplicator) HandleBeginMessage(msg *pglogrepl.BeginMessage) error
HandleBeginMessage handles BeginMessage messages
func (*BaseReplicator) HandleCommitMessage ¶
func (r *BaseReplicator) HandleCommitMessage(msg *pglogrepl.CommitMessage) error
HandleCommitMessage processes a commit message and publishes it to NATS
func (*BaseReplicator) HandleDeleteMessage ¶
func (r *BaseReplicator) HandleDeleteMessage(msg *pglogrepl.DeleteMessage, lsn pglogrepl.LSN) error
HandleDeleteMessage handles DeleteMessage messages
func (*BaseReplicator) HandleInsertMessage ¶
func (r *BaseReplicator) HandleInsertMessage(msg *pglogrepl.InsertMessage, lsn pglogrepl.LSN) error
HandleInsertMessage handles InsertMessage messages
func (*BaseReplicator) HandleUpdateMessage ¶
func (r *BaseReplicator) HandleUpdateMessage(msg *pglogrepl.UpdateMessage, lsn pglogrepl.LSN) error
HandleUpdateMessage handles UpdateMessage messages
func (*BaseReplicator) InitializePrimaryKeyInfo ¶
func (r *BaseReplicator) InitializePrimaryKeyInfo() error
InitializePrimaryKeyInfo initializes primary key information for all tables
func (*BaseReplicator) ProcessNextMessage ¶
func (r *BaseReplicator) ProcessNextMessage(ctx context.Context, lastStatusUpdate *time.Time, standbyMessageTimeout time.Duration) error
ProcessNextMessage handles the next replication message with retry logic for connection errors
func (*BaseReplicator) PublishToNATS ¶
func (r *BaseReplicator) PublishToNATS(data utils.CDCMessage) error
PublishToNATS publishes a message to NATS
func (*BaseReplicator) SaveState ¶
func (r *BaseReplicator) SaveState(lsn pglogrepl.LSN) error
SaveState saves the current replication state
func (*BaseReplicator) SendStandbyStatusUpdate ¶
func (r *BaseReplicator) SendStandbyStatusUpdate(ctx context.Context) error
SendStandbyStatusUpdate sends a status update to the primary server
func (*BaseReplicator) SetCurrentTxBuffer ¶ added in v0.0.12
func (r *BaseReplicator) SetCurrentTxBuffer(messages []utils.CDCMessage)
SetCurrentTxBuffer sets the current transaction buffer (for testing)
func (*BaseReplicator) Start ¶ added in v0.0.12
func (r *BaseReplicator) Start(ctx context.Context) error
Start creates the publication, replication slot, ddl tracking (if any) and starts the replication
func (*BaseReplicator) StartReplicationFromLSN ¶
func (r *BaseReplicator) StartReplicationFromLSN(ctx context.Context, startLSN pglogrepl.LSN, stopChan <-chan struct{}) error
StartReplicationFromLSN initiates the replication process from a given LSN
func (*BaseReplicator) Stop ¶ added in v0.0.12
func (r *BaseReplicator) Stop(ctx context.Context) error
Stop triggers a graceful shutdown of the replicator
func (*BaseReplicator) StreamChanges ¶
func (r *BaseReplicator) StreamChanges(ctx context.Context, stopChan <-chan struct{}) error
StreamChanges continuously processes replication messages
type Buffer ¶
type Buffer struct {
// contains filtered or unexported fields
}
Buffer is a structure that holds data to be flushed periodically or when certain conditions are met
type Config ¶
type Config struct {
Host string
Port uint16
Database string
User string
Password string
Group string
Schema string
Tables []string
TrackDDL bool
RetryConfig utils.RetryConfig
}
Config holds the configuration for the replicator
func (Config) ConnectionString ¶
ConnectionString generates and returns a PostgreSQL connection string
func (Config) GetRetryConfig ¶ added in v0.0.15
func (c Config) GetRetryConfig() utils.RetryConfig
GetRetryConfig returns the retry configuration with defaults if not set
type CopyAndStreamReplicator ¶
type CopyAndStreamReplicator struct {
*BaseReplicator
MaxCopyWorkersPerTable int
CopyOnly bool
}
CopyAndStreamReplicator implements a replication strategy that first copies existing data and then streams changes.
func NewCopyAndStreamReplicator ¶ added in v0.0.12
func NewCopyAndStreamReplicator(base *BaseReplicator, maxWorkers int, copyOnly bool) *CopyAndStreamReplicator
NewCopyAndStreamReplicator creates a new copy and stream replicator instance
func (*CopyAndStreamReplicator) CopyTable ¶
func (r *CopyAndStreamReplicator) CopyTable(ctx context.Context, tableName, snapshotID string) error
CopyTable copies a single table using multiple workers.
func (*CopyAndStreamReplicator) CopyTableRange ¶
func (r *CopyAndStreamReplicator) CopyTableRange(ctx context.Context, tableName string, startPage, endPage uint32, snapshotID string, workerID int) (int64, error)
CopyTableRange copies a range of pages from a table.
func (*CopyAndStreamReplicator) CopyTableWorker ¶
func (r *CopyAndStreamReplicator) CopyTableWorker(ctx context.Context, wg *sync.WaitGroup, errChan chan<- error, rangesChan <-chan [2]uint32, tableName, snapshotID string, workerID int)
CopyTableWorker is a worker function that copies ranges of pages from a table.
func (*CopyAndStreamReplicator) CopyTables ¶
func (r *CopyAndStreamReplicator) CopyTables(ctx context.Context, tables []string, snapshotID string) error
CopyTables copies all specified tables in parallel.
func (*CopyAndStreamReplicator) ParallelCopy ¶
func (r *CopyAndStreamReplicator) ParallelCopy(ctx context.Context) error
ParallelCopy performs a parallel copy of all specified tables.
type CopyAndStreamReplicatorFactory ¶ added in v0.0.12
type CopyAndStreamReplicatorFactory struct {
BaseFactory
MaxCopyWorkersPerTable int
CopyOnly bool
}
CopyAndStreamReplicatorFactory creates `CopyAndStreamReplicator` instances
func (*CopyAndStreamReplicatorFactory) CreateReplicator ¶ added in v0.0.12
func (f *CopyAndStreamReplicatorFactory) CreateReplicator(config Config, natsClient NATSClient) (Replicator, error)
CreateReplicator creates a new `CopyAndStreamReplicator`
type DDLReplicator ¶
type DDLReplicator struct {
DDLConn StandardConnection
BaseRepl *BaseReplicator
Config Config
}
DDLReplicator handles Data Definition Language (DDL) statement replication
func NewDDLReplicator ¶
func NewDDLReplicator(config Config, BaseRepl *BaseReplicator, ddlConn StandardConnection) (*DDLReplicator, error)
NewDDLReplicator creates a new DDLReplicator instance
func (*DDLReplicator) Close ¶
func (d *DDLReplicator) Close(ctx context.Context) error
Close closes the DDL connection
func (*DDLReplicator) HasPendingDDLEvents ¶
func (d *DDLReplicator) HasPendingDDLEvents(ctx context.Context) (bool, error)
HasPendingDDLEvents checks if there are pending DDL events in the log
func (*DDLReplicator) ProcessDDLEvents ¶
func (d *DDLReplicator) ProcessDDLEvents(ctx context.Context) error
ProcessDDLEvents processes DDL events from the log table
func (*DDLReplicator) SetupDDLTracking ¶
func (d *DDLReplicator) SetupDDLTracking(ctx context.Context) error
SetupDDLTracking sets up the necessary schema, table, and triggers for DDL tracking
func (*DDLReplicator) Shutdown ¶
func (d *DDLReplicator) Shutdown(ctx context.Context) error
Shutdown performs a graceful shutdown of the DDL replicator
func (*DDLReplicator) StartDDLReplication ¶
func (d *DDLReplicator) StartDDLReplication(ctx context.Context)
StartDDLReplication starts the DDL replication process
type Factory ¶ added in v0.0.12
type Factory interface {
CreateReplicator(config Config, natsClient NATSClient) (Replicator, error)
}
Factory defines the interface for creating replicators
type NATSClient ¶
type NATSClient interface {
PublishMessage(subject string, data []byte) error
Close() error
SaveState(state pgflonats.State) error
GetState() (pgflonats.State, error)
JetStream() nats.JetStreamContext
}
NATSClient defines the interface for NATS messaging operations
type PgxPoolConn ¶
type PgxPoolConn interface {
BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
Release()
}
PgxPoolConn defines the interface for PostgreSQL connection pool connections
type PgxPoolConnWrapper ¶
PgxPoolConnWrapper wraps a pgxpool connection to implement the PgxPoolConn interface
type PostgresReplicationConnection ¶
PostgresReplicationConnection implements the ReplicationConnection interface for PostgreSQL databases.
func (*PostgresReplicationConnection) Close ¶
func (rc *PostgresReplicationConnection) Close(ctx context.Context) error
Close terminates the connection to the PostgreSQL database.
func (*PostgresReplicationConnection) Connect ¶
func (rc *PostgresReplicationConnection) Connect(ctx context.Context) error
Connect establishes a connection to the PostgreSQL database for replication.
func (*PostgresReplicationConnection) CreateReplicationSlot ¶
func (rc *PostgresReplicationConnection) CreateReplicationSlot(ctx context.Context, slotName string) (pglogrepl.CreateReplicationSlotResult, error)
CreateReplicationSlot creates a new replication slot in the PostgreSQL database.
func (*PostgresReplicationConnection) IsHealthy ¶ added in v0.0.15
func (rc *PostgresReplicationConnection) IsHealthy(ctx context.Context) bool
IsHealthy checks if the connection is healthy
func (*PostgresReplicationConnection) ReceiveMessage ¶
func (rc *PostgresReplicationConnection) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error)
ReceiveMessage receives a message from the PostgreSQL replication stream.
func (*PostgresReplicationConnection) Reconnect ¶ added in v0.0.15
func (rc *PostgresReplicationConnection) Reconnect(ctx context.Context) error
Reconnect closes the existing connection and establishes a new one
func (*PostgresReplicationConnection) SendStandbyStatusUpdate ¶
func (rc *PostgresReplicationConnection) SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error
SendStandbyStatusUpdate sends a status update to the PostgreSQL server during replication.
func (*PostgresReplicationConnection) StartReplication ¶
func (rc *PostgresReplicationConnection) StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error
StartReplication initiates the replication process from the specified LSN.
type ReplicationConnection ¶
type ReplicationConnection interface {
Connect(ctx context.Context) error
Close(ctx context.Context) error
Reconnect(ctx context.Context) error
CreateReplicationSlot(ctx context.Context, slotName string) (pglogrepl.CreateReplicationSlotResult, error)
StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error
ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error)
SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error
IsHealthy(ctx context.Context) bool
}
ReplicationConnection defines the interface for PostgreSQL logical replication connections
func NewReplicationConnection ¶
func NewReplicationConnection(config Config) ReplicationConnection
NewReplicationConnection creates a new PostgresReplicationConnection instance.
type ReplicationError ¶
type ReplicationError struct {
Op string // The operation that caused the error
Err error // The underlying error
}
ReplicationError represents an error that occurred during replication.
func (*ReplicationError) Error ¶
func (e *ReplicationError) Error() string
Error returns a formatted error message.
type Replicator ¶
Replicator defines the interface for PostgreSQL replicators
type StandardConnection ¶
type StandardConnection interface {
Connect(ctx context.Context) error
Close(ctx context.Context) error
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
Acquire(ctx context.Context) (PgxPoolConn, error)
}
StandardConnection defines the interface for standard PostgreSQL connections
type StandardConnectionImpl ¶
type StandardConnectionImpl struct {
// contains filtered or unexported fields
}
StandardConnectionImpl implements the StandardConnection interface for PostgreSQL databases.
func NewStandardConnection ¶
func NewStandardConnection(config Config) (*StandardConnectionImpl, error)
NewStandardConnection creates a new StandardConnectionImpl instance and establishes a connection.
func (*StandardConnectionImpl) Acquire ¶
func (s *StandardConnectionImpl) Acquire(ctx context.Context) (PgxPoolConn, error)
Acquire acquires a connection from the pool.
func (*StandardConnectionImpl) BeginTx ¶
func (s *StandardConnectionImpl) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
BeginTx starts a new transaction with the specified options.
func (*StandardConnectionImpl) Close ¶
func (s *StandardConnectionImpl) Close(_ context.Context) error
Close terminates the connection to the PostgreSQL database.
func (*StandardConnectionImpl) Connect ¶
func (s *StandardConnectionImpl) Connect(ctx context.Context) error
Connect establishes a connection to the PostgreSQL database.
func (*StandardConnectionImpl) Exec ¶
func (s *StandardConnectionImpl) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
Exec executes a SQL query without returning any rows.
type StreamReplicator ¶
type StreamReplicator struct {
*BaseReplicator
}
StreamReplicator handles PostgreSQL logical replication streaming
func NewStreamReplicator ¶ added in v0.0.12
func NewStreamReplicator(base *BaseReplicator) *StreamReplicator
NewStreamReplicator creates a new stream replicator instance
type StreamReplicatorFactory ¶ added in v0.0.12
type StreamReplicatorFactory struct {
BaseFactory
}
StreamReplicatorFactory creates `StreamReplicator` instances
func (*StreamReplicatorFactory) CreateReplicator ¶ added in v0.0.12
func (f *StreamReplicatorFactory) CreateReplicator(config Config, natsClient NATSClient) (Replicator, error)
CreateReplicator creates a new `StreamReplicator`