replicator

package
v0.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2025 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package replicator provides PostgreSQL replication functionality for pg_flo.

Index

Constants

View Source
const (
	// DefaultSchema is the default PostgreSQL schema name
	DefaultSchema = "public"
)

Variables

View Source
var (
	// ErrReplicatorAlreadyStarted is returned when attempting to start an already running replicator
	ErrReplicatorAlreadyStarted = errors.New("replicator already started")
	// ErrReplicatorNotStarted is returned when attempting to stop a non-running replicator
	ErrReplicatorNotStarted = errors.New("replicator not started")
	// ErrReplicatorAlreadyStopped is returned when attempting to stop an already stopped replicator
	ErrReplicatorAlreadyStopped = errors.New("replicator already stopped")
)

Functions

func GeneratePublicationName

func GeneratePublicationName(group string) string

GeneratePublicationName generates a deterministic publication name based on the group name

func InitializeOIDMap

func InitializeOIDMap(ctx context.Context, conn StandardConnection) error

InitializeOIDMap initializes the OID to type name map with custom types from the database

Types

type BaseFactory added in v0.0.12

type BaseFactory struct{}

BaseFactory provides common functionality for factories

func (*BaseFactory) CreateConnections added in v0.0.12

func (f *BaseFactory) CreateConnections(config Config) (ReplicationConnection, StandardConnection, error)

CreateConnections creates replication and standard connections

type BaseReplicator

type BaseReplicator struct {
	Config               Config
	ReplicationConn      ReplicationConnection
	StandardConn         StandardConnection
	DDLReplicator        *DDLReplicator
	Relations            map[uint32]*pglogrepl.RelationMessage
	Logger               utils.Logger
	TableDetails         map[string][]string
	LastLSN              pglogrepl.LSN
	NATSClient           NATSClient
	TableReplicationKeys map[string]utils.ReplicationKey
	// contains filtered or unexported fields
}

BaseReplicator provides core functionality for PostgreSQL logical replication

func NewBaseReplicator

func NewBaseReplicator(config Config, replicationConn ReplicationConnection, standardConn StandardConnection, natsClient NATSClient) *BaseReplicator

NewBaseReplicator creates a new BaseReplicator instance

func (*BaseReplicator) AddPrimaryKeyInfo

func (r *BaseReplicator) AddPrimaryKeyInfo(message *utils.CDCMessage, table string)

AddPrimaryKeyInfo adds replication key information to the CDCMessage

func (*BaseReplicator) CheckReplicationSlotExists

func (r *BaseReplicator) CheckReplicationSlotExists(slotName string) (bool, error)

CheckReplicationSlotExists checks if a slot with the given name already exists

func (*BaseReplicator) CheckReplicationSlotStatus

func (r *BaseReplicator) CheckReplicationSlotStatus(ctx context.Context) error

CheckReplicationSlotStatus checks the status of the replication slot and validates it for reconnection

func (*BaseReplicator) CreatePublication

func (r *BaseReplicator) CreatePublication() error

CreatePublication creates a new publication if it doesn't exist

func (*BaseReplicator) CreateReplicationSlot

func (r *BaseReplicator) CreateReplicationSlot(ctx context.Context) error

CreateReplicationSlot ensures that a replication slot exists, creating one if necessary

func (*BaseReplicator) CurrentTxBuffer added in v0.0.12

func (r *BaseReplicator) CurrentTxBuffer() []utils.CDCMessage

CurrentTxBuffer returns the current transaction buffer (for testing)

func (*BaseReplicator) GetConfiguredTables

func (r *BaseReplicator) GetConfiguredTables(ctx context.Context) ([]string, error)

GetConfiguredTables returns all tables based on configuration If no specific tables are configured, returns all tables from the configured schema

func (*BaseReplicator) GetLastState

func (r *BaseReplicator) GetLastState() (pglogrepl.LSN, error)

GetLastState retrieves the last saved replication state

func (*BaseReplicator) GracefulShutdown

func (r *BaseReplicator) GracefulShutdown(ctx context.Context) error

GracefulShutdown performs a graceful shutdown of the replicator

func (*BaseReplicator) HandleBeginMessage

func (r *BaseReplicator) HandleBeginMessage(msg *pglogrepl.BeginMessage) error

HandleBeginMessage handles BeginMessage messages

func (*BaseReplicator) HandleCommitMessage

func (r *BaseReplicator) HandleCommitMessage(msg *pglogrepl.CommitMessage) error

HandleCommitMessage processes a commit message and publishes it to NATS

func (*BaseReplicator) HandleDeleteMessage

func (r *BaseReplicator) HandleDeleteMessage(msg *pglogrepl.DeleteMessage, lsn pglogrepl.LSN) error

HandleDeleteMessage handles DeleteMessage messages

func (*BaseReplicator) HandleInsertMessage

func (r *BaseReplicator) HandleInsertMessage(msg *pglogrepl.InsertMessage, lsn pglogrepl.LSN) error

HandleInsertMessage handles InsertMessage messages

func (*BaseReplicator) HandleUpdateMessage

func (r *BaseReplicator) HandleUpdateMessage(msg *pglogrepl.UpdateMessage, lsn pglogrepl.LSN) error

HandleUpdateMessage handles UpdateMessage messages

func (*BaseReplicator) InitializePrimaryKeyInfo

func (r *BaseReplicator) InitializePrimaryKeyInfo() error

InitializePrimaryKeyInfo initializes primary key information for all tables

func (*BaseReplicator) ProcessNextMessage

func (r *BaseReplicator) ProcessNextMessage(ctx context.Context, lastStatusUpdate *time.Time, standbyMessageTimeout time.Duration) error

ProcessNextMessage handles the next replication message with retry logic for connection errors

func (*BaseReplicator) PublishToNATS

func (r *BaseReplicator) PublishToNATS(data utils.CDCMessage) error

PublishToNATS publishes a message to NATS

func (*BaseReplicator) SaveState

func (r *BaseReplicator) SaveState(lsn pglogrepl.LSN) error

SaveState saves the current replication state

func (*BaseReplicator) SendStandbyStatusUpdate

func (r *BaseReplicator) SendStandbyStatusUpdate(ctx context.Context) error

SendStandbyStatusUpdate sends a status update to the primary server

func (*BaseReplicator) SetCurrentTxBuffer added in v0.0.12

func (r *BaseReplicator) SetCurrentTxBuffer(messages []utils.CDCMessage)

SetCurrentTxBuffer sets the current transaction buffer (for testing)

func (*BaseReplicator) Start added in v0.0.12

func (r *BaseReplicator) Start(ctx context.Context) error

Start creates the publication, replication slot, ddl tracking (if any) and starts the replication

func (*BaseReplicator) StartReplicationFromLSN

func (r *BaseReplicator) StartReplicationFromLSN(ctx context.Context, startLSN pglogrepl.LSN, stopChan <-chan struct{}) error

StartReplicationFromLSN initiates the replication process from a given LSN

func (*BaseReplicator) Stop added in v0.0.12

func (r *BaseReplicator) Stop(ctx context.Context) error

Stop triggers a graceful shutdown of the replicator

func (*BaseReplicator) StreamChanges

func (r *BaseReplicator) StreamChanges(ctx context.Context, stopChan <-chan struct{}) error

StreamChanges continuously processes replication messages

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is a structure that holds data to be flushed periodically or when certain conditions are met

func NewBuffer

func NewBuffer(maxRows int, flushTimeout time.Duration) *Buffer

NewBuffer creates a new Buffer instance

func (*Buffer) Add

func (b *Buffer) Add(item interface{}) bool

Add adds an item to the buffer and returns true if the buffer should be flushed

func (*Buffer) Flush

func (b *Buffer) Flush() []interface{}

Flush flushes the buffer and returns the data

type Config

type Config struct {
	Host        string
	Port        uint16
	Database    string
	User        string
	Password    string
	Group       string
	Schema      string
	Tables      []string
	TrackDDL    bool
	RetryConfig utils.RetryConfig
}

Config holds the configuration for the replicator

func (Config) ConnectionString

func (c Config) ConnectionString() string

ConnectionString generates and returns a PostgreSQL connection string

func (Config) GetRetryConfig added in v0.0.15

func (c Config) GetRetryConfig() utils.RetryConfig

GetRetryConfig returns the retry configuration with defaults if not set

type CopyAndStreamReplicator

type CopyAndStreamReplicator struct {
	*BaseReplicator
	MaxCopyWorkersPerTable int
	CopyOnly               bool
}

CopyAndStreamReplicator implements a replication strategy that first copies existing data and then streams changes.

func NewCopyAndStreamReplicator added in v0.0.12

func NewCopyAndStreamReplicator(base *BaseReplicator, maxWorkers int, copyOnly bool) *CopyAndStreamReplicator

NewCopyAndStreamReplicator creates a new copy and stream replicator instance

func (*CopyAndStreamReplicator) CopyTable

func (r *CopyAndStreamReplicator) CopyTable(ctx context.Context, tableName, snapshotID string) error

CopyTable copies a single table using multiple workers.

func (*CopyAndStreamReplicator) CopyTableRange

func (r *CopyAndStreamReplicator) CopyTableRange(ctx context.Context, tableName string, startPage, endPage uint32, snapshotID string, workerID int) (int64, error)

CopyTableRange copies a range of pages from a table.

func (*CopyAndStreamReplicator) CopyTableWorker

func (r *CopyAndStreamReplicator) CopyTableWorker(ctx context.Context, wg *sync.WaitGroup, errChan chan<- error, rangesChan <-chan [2]uint32, tableName, snapshotID string, workerID int)

CopyTableWorker is a worker function that copies ranges of pages from a table.

func (*CopyAndStreamReplicator) CopyTables

func (r *CopyAndStreamReplicator) CopyTables(ctx context.Context, tables []string, snapshotID string) error

CopyTables copies all specified tables in parallel.

func (*CopyAndStreamReplicator) ParallelCopy

func (r *CopyAndStreamReplicator) ParallelCopy(ctx context.Context) error

ParallelCopy performs a parallel copy of all specified tables.

func (*CopyAndStreamReplicator) Start added in v0.0.12

Start begins the replication process.

func (*CopyAndStreamReplicator) Stop added in v0.0.12

Stop stops the copy and stream replicator

type CopyAndStreamReplicatorFactory added in v0.0.12

type CopyAndStreamReplicatorFactory struct {
	BaseFactory
	MaxCopyWorkersPerTable int
	CopyOnly               bool
}

CopyAndStreamReplicatorFactory creates `CopyAndStreamReplicator` instances

func (*CopyAndStreamReplicatorFactory) CreateReplicator added in v0.0.12

func (f *CopyAndStreamReplicatorFactory) CreateReplicator(config Config, natsClient NATSClient) (Replicator, error)

CreateReplicator creates a new `CopyAndStreamReplicator`

type DDLReplicator

type DDLReplicator struct {
	DDLConn  StandardConnection
	BaseRepl *BaseReplicator
	Config   Config
}

DDLReplicator handles Data Definition Language (DDL) statement replication

func NewDDLReplicator

func NewDDLReplicator(config Config, BaseRepl *BaseReplicator, ddlConn StandardConnection) (*DDLReplicator, error)

NewDDLReplicator creates a new DDLReplicator instance

func (*DDLReplicator) Close

func (d *DDLReplicator) Close(ctx context.Context) error

Close closes the DDL connection

func (*DDLReplicator) HasPendingDDLEvents

func (d *DDLReplicator) HasPendingDDLEvents(ctx context.Context) (bool, error)

HasPendingDDLEvents checks if there are pending DDL events in the log

func (*DDLReplicator) ProcessDDLEvents

func (d *DDLReplicator) ProcessDDLEvents(ctx context.Context) error

ProcessDDLEvents processes DDL events from the log table

func (*DDLReplicator) SetupDDLTracking

func (d *DDLReplicator) SetupDDLTracking(ctx context.Context) error

SetupDDLTracking sets up the necessary schema, table, and triggers for DDL tracking

func (*DDLReplicator) Shutdown

func (d *DDLReplicator) Shutdown(ctx context.Context) error

Shutdown performs a graceful shutdown of the DDL replicator

func (*DDLReplicator) StartDDLReplication

func (d *DDLReplicator) StartDDLReplication(ctx context.Context)

StartDDLReplication starts the DDL replication process

type Factory added in v0.0.12

type Factory interface {
	CreateReplicator(config Config, natsClient NATSClient) (Replicator, error)
}

Factory defines the interface for creating replicators

type NATSClient

type NATSClient interface {
	PublishMessage(subject string, data []byte) error
	Close() error
	SaveState(state pgflonats.State) error
	GetState() (pgflonats.State, error)
	JetStream() nats.JetStreamContext
}

NATSClient defines the interface for NATS messaging operations

type PgxPoolConn

type PgxPoolConn interface {
	BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
	Release()
}

PgxPoolConn defines the interface for PostgreSQL connection pool connections

type PgxPoolConnWrapper

type PgxPoolConnWrapper struct {
	*pgxpool.Conn
}

PgxPoolConnWrapper wraps a pgxpool connection to implement the PgxPoolConn interface

type PostgresReplicationConnection

type PostgresReplicationConnection struct {
	Config Config
	Conn   *pgconn.PgConn
}

PostgresReplicationConnection implements the ReplicationConnection interface for PostgreSQL databases.

func (*PostgresReplicationConnection) Close

Close terminates the connection to the PostgreSQL database.

func (*PostgresReplicationConnection) Connect

Connect establishes a connection to the PostgreSQL database for replication.

func (*PostgresReplicationConnection) CreateReplicationSlot

CreateReplicationSlot creates a new replication slot in the PostgreSQL database.

func (*PostgresReplicationConnection) IsHealthy added in v0.0.15

IsHealthy checks if the connection is healthy

func (*PostgresReplicationConnection) ReceiveMessage

ReceiveMessage receives a message from the PostgreSQL replication stream.

func (*PostgresReplicationConnection) Reconnect added in v0.0.15

Reconnect closes the existing connection and establishes a new one

func (*PostgresReplicationConnection) SendStandbyStatusUpdate

func (rc *PostgresReplicationConnection) SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error

SendStandbyStatusUpdate sends a status update to the PostgreSQL server during replication.

func (*PostgresReplicationConnection) StartReplication

func (rc *PostgresReplicationConnection) StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error

StartReplication initiates the replication process from the specified LSN.

type ReplicationConnection

type ReplicationConnection interface {
	Connect(ctx context.Context) error
	Close(ctx context.Context) error
	Reconnect(ctx context.Context) error
	CreateReplicationSlot(ctx context.Context, slotName string) (pglogrepl.CreateReplicationSlotResult, error)
	StartReplication(ctx context.Context, slotName string, startLSN pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error
	ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error)
	SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error
	IsHealthy(ctx context.Context) bool
}

ReplicationConnection defines the interface for PostgreSQL logical replication connections

func NewReplicationConnection

func NewReplicationConnection(config Config) ReplicationConnection

NewReplicationConnection creates a new PostgresReplicationConnection instance.

type ReplicationError

type ReplicationError struct {
	Op  string // The operation that caused the error
	Err error  // The underlying error
}

ReplicationError represents an error that occurred during replication.

func (*ReplicationError) Error

func (e *ReplicationError) Error() string

Error returns a formatted error message.

type Replicator

type Replicator interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

Replicator defines the interface for PostgreSQL replicators

type StandardConnection

type StandardConnection interface {
	Connect(ctx context.Context) error
	Close(ctx context.Context) error
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
	BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
	Acquire(ctx context.Context) (PgxPoolConn, error)
}

StandardConnection defines the interface for standard PostgreSQL connections

type StandardConnectionImpl

type StandardConnectionImpl struct {
	// contains filtered or unexported fields
}

StandardConnectionImpl implements the StandardConnection interface for PostgreSQL databases.

func NewStandardConnection

func NewStandardConnection(config Config) (*StandardConnectionImpl, error)

NewStandardConnection creates a new StandardConnectionImpl instance and establishes a connection.

func (*StandardConnectionImpl) Acquire

Acquire acquires a connection from the pool.

func (*StandardConnectionImpl) BeginTx

func (s *StandardConnectionImpl) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)

BeginTx starts a new transaction with the specified options.

func (*StandardConnectionImpl) Close

Close terminates the connection to the PostgreSQL database.

func (*StandardConnectionImpl) Connect

func (s *StandardConnectionImpl) Connect(ctx context.Context) error

Connect establishes a connection to the PostgreSQL database.

func (*StandardConnectionImpl) Exec

func (s *StandardConnectionImpl) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)

Exec executes a SQL query without returning any rows.

func (*StandardConnectionImpl) Query

func (s *StandardConnectionImpl) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes a query that returns rows, typically a SELECT.

func (*StandardConnectionImpl) QueryRow

func (s *StandardConnectionImpl) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes a query that is expected to return at most one row.

type StreamReplicator

type StreamReplicator struct {
	*BaseReplicator
}

StreamReplicator handles PostgreSQL logical replication streaming

func NewStreamReplicator added in v0.0.12

func NewStreamReplicator(base *BaseReplicator) *StreamReplicator

NewStreamReplicator creates a new stream replicator instance

func (*StreamReplicator) Start added in v0.0.12

func (r *StreamReplicator) Start(ctx context.Context) error

Start begins the stream replication process

func (*StreamReplicator) Stop added in v0.0.12

func (r *StreamReplicator) Stop(ctx context.Context) error

Stop stops the stream replicator

type StreamReplicatorFactory added in v0.0.12

type StreamReplicatorFactory struct {
	BaseFactory
}

StreamReplicatorFactory creates `StreamReplicator` instances

func (*StreamReplicatorFactory) CreateReplicator added in v0.0.12

func (f *StreamReplicatorFactory) CreateReplicator(config Config, natsClient NATSClient) (Replicator, error)

CreateReplicator creates a new `StreamReplicator`

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL