waljs

package
v0.3.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const AdvanceLSNTemplate = "SELECT * FROM pg_replication_slot_advance('%s', '%s')"
View Source
const (
	ReplicationSlotTempl = "" /* 131-byte string literal not displayed */
)

Variables

This section is empty.

Functions

func AcknowledgeLSN added in v0.2.6

func AcknowledgeLSN(ctx context.Context, db *sqlx.DB, socket *Socket, fakeAck bool) error

Confirm that Logs has been recorded in fake ack prev confirmed flush lsn is sent

func AdvanceLSN added in v0.2.6

func AdvanceLSN(ctx context.Context, db *sqlx.DB, slot, currentWalPos string) error

advanceLSN advances the logical replication position to the current WAL position.

func Cleanup added in v0.2.6

func Cleanup(ctx context.Context, socket *Socket)

cleanup replicator

Types

type ChangeFilter

type ChangeFilter struct {
	// contains filtered or unexported fields
}

func NewChangeFilter

func NewChangeFilter(typeConverter func(value interface{}, columnType string) (interface{}, error), streams ...types.StreamInterface) ChangeFilter

func (ChangeFilter) FilterWalJsChange added in v0.2.6

func (c ChangeFilter) FilterWalJsChange(ctx context.Context, change []byte, OnFiltered abstract.CDCMsgFn) (*pglogrepl.LSN, int, error)

type Config

type Config struct {
	Tables              *types.Set[types.StreamInterface]
	Connection          url.URL
	SSHClient           *ssh.Client
	ReplicationSlotName string
	InitialWaitTime     time.Duration
	TLSConfig           *tls.Config
	BatchSize           int
	// Publications is used with pgoutput
	Publication string
}

type ReplicationSlot

type ReplicationSlot struct {
	SlotType   string        `db:"slot_type"`
	Plugin     string        `db:"plugin"`
	LSN        pglogrepl.LSN `db:"confirmed_flush_lsn"`
	CurrentLSN pglogrepl.LSN `db:"current_lsn"`
}

type Replicator added in v0.2.6

type Replicator interface {
	// info about socket
	Socket() *Socket
	// StreamChanges processes messages until it emits changes via insertFn or exits per logic.
	StreamChanges(ctx context.Context, db *sqlx.DB, insertFn abstract.CDCMsgFn) error
}

Replicator defines an abstraction over different logical decoding plugins.

func NewReplicator added in v0.2.6

func NewReplicator(ctx context.Context, db *sqlx.DB, config *Config, typeConverter func(value interface{}, columnType string) (interface{}, error)) (Replicator, error)

type Socket

type Socket struct {

	// clientXLogPos tracks the current position (while reading logs) in the Write-Ahead Log (WAL)
	ClientXLogPos pglogrepl.LSN

	// confirmedLSN is the position from which replication should start (Prev marked lsn)
	ConfirmedFlushLSN pglogrepl.LSN
	// wal position at a point of time
	CurrentWalPosition pglogrepl.LSN
	// replicationSlot is the name of the PostgreSQL replication slot being used
	ReplicationSlot string
	// contains filtered or unexported fields
}

Socket represents a connection to PostgreSQL's logical replication stream

type WALMessage

type WALMessage struct {
	NextLSN   string         `json:"nextlsn"`
	Timestamp typeutils.Time `json:"timestamp"`
	Change    []struct {
		Kind         string        `json:"kind"`
		Schema       string        `json:"schema"`
		Table        string        `json:"table"`
		Columnnames  []string      `json:"columnnames"`
		Columntypes  []string      `json:"columntypes"`
		Columnvalues []interface{} `json:"columnvalues"`
		Oldkeys      struct {
			Keynames  []string      `json:"keynames"`
			Keytypes  []string      `json:"keytypes"`
			Keyvalues []interface{} `json:"keyvalues"`
		} `json:"oldkeys"`
	} `json:"change"`
}

type WALState

type WALState struct {
	LSN string `json:"lsn"`
}

func (*WALState) IsEmpty

func (s *WALState) IsEmpty() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL