Documentation
¶
Index ¶
- Constants
- func AcknowledgeLSN(ctx context.Context, db *sqlx.DB, socket *Socket, fakeAck bool) error
- func AdvanceLSN(ctx context.Context, db *sqlx.DB, slot, currentWalPos string) error
- func Cleanup(ctx context.Context, socket *Socket)
- type ChangeFilter
- type Config
- type ReplicationSlot
- type Replicator
- type Socket
- type WALMessage
- type WALState
Constants ¶
View Source
const AdvanceLSNTemplate = "SELECT * FROM pg_replication_slot_advance('%s', '%s')"
View Source
const (
ReplicationSlotTempl = "" /* 131-byte string literal not displayed */
)
Variables ¶
This section is empty.
Functions ¶
func AcknowledgeLSN ¶ added in v0.2.6
Confirm that Logs has been recorded in fake ack prev confirmed flush lsn is sent
func AdvanceLSN ¶ added in v0.2.6
advanceLSN advances the logical replication position to the current WAL position.
Types ¶
type ChangeFilter ¶
type ChangeFilter struct {
// contains filtered or unexported fields
}
func NewChangeFilter ¶
func NewChangeFilter(typeConverter func(value interface{}, columnType string) (interface{}, error), streams ...types.StreamInterface) ChangeFilter
type ReplicationSlot ¶
type Replicator ¶ added in v0.2.6
type Replicator interface {
// info about socket
Socket() *Socket
// StreamChanges processes messages until it emits changes via insertFn or exits per logic.
StreamChanges(ctx context.Context, db *sqlx.DB, insertFn abstract.CDCMsgFn) error
}
Replicator defines an abstraction over different logical decoding plugins.
type Socket ¶
type Socket struct {
// clientXLogPos tracks the current position (while reading logs) in the Write-Ahead Log (WAL)
ClientXLogPos pglogrepl.LSN
// confirmedLSN is the position from which replication should start (Prev marked lsn)
ConfirmedFlushLSN pglogrepl.LSN
// wal position at a point of time
CurrentWalPosition pglogrepl.LSN
// replicationSlot is the name of the PostgreSQL replication slot being used
ReplicationSlot string
// contains filtered or unexported fields
}
Socket represents a connection to PostgreSQL's logical replication stream
type WALMessage ¶
type WALMessage struct {
NextLSN string `json:"nextlsn"`
Timestamp typeutils.Time `json:"timestamp"`
Change []struct {
Kind string `json:"kind"`
Schema string `json:"schema"`
Table string `json:"table"`
Columnnames []string `json:"columnnames"`
Columntypes []string `json:"columntypes"`
Columnvalues []interface{} `json:"columnvalues"`
Oldkeys struct {
Keynames []string `json:"keynames"`
Keytypes []string `json:"keytypes"`
Keyvalues []interface{} `json:"keyvalues"`
} `json:"oldkeys"`
} `json:"change"`
}
Click to show internal directories.
Click to hide internal directories.