waljs

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReplicationSlotTempl = "" /* 131-byte string literal not displayed */

)

Variables

This section is empty.

Functions

This section is empty.

Types

type ChangeFilter

type ChangeFilter struct {
	// contains filtered or unexported fields
}

func NewChangeFilter

func NewChangeFilter(typeConverter func(value interface{}, columnType string) (interface{}, error), streams ...types.StreamInterface) ChangeFilter

func (ChangeFilter) FilterChange

func (c ChangeFilter) FilterChange(change []byte, OnFiltered abstract.CDCMsgFn) (*pglogrepl.LSN, int, error)

type Config

type Config struct {
	Tables              *types.Set[types.StreamInterface]
	Connection          url.URL
	ReplicationSlotName string
	InitialWaitTime     time.Duration
	TLSConfig           *tls.Config
	BatchSize           int
}

type ReplicationSlot

type ReplicationSlot struct {
	SlotType   string        `db:"slot_type"`
	Plugin     string        `db:"plugin"`
	LSN        pglogrepl.LSN `db:"confirmed_flush_lsn"`
	CurrentLSN pglogrepl.LSN `db:"current_lsn"`
}

type Socket

type Socket struct {

	// clientXLogPos tracks the current position in the Write-Ahead Log (WAL)
	ClientXLogPos pglogrepl.LSN

	// confirmedLSN is the position from which replication should start (Prev marked lsn)
	ConfirmedFlushLSN pglogrepl.LSN
	// contains filtered or unexported fields
}

Socket represents a connection to PostgreSQL's logical replication stream

func NewConnection

func NewConnection(ctx context.Context, db *sqlx.DB, config *Config, typeConverter func(value interface{}, columnType string) (interface{}, error)) (*Socket, error)

func (*Socket) AcknowledgeLSN

func (s *Socket) AcknowledgeLSN(ctx context.Context, fakeAck bool) error

Confirm that Logs has been recorded

func (*Socket) Cleanup

func (s *Socket) Cleanup(ctx context.Context)

cleanUpOnFailure drops replication slot and publication if database snapshotting was failed for any reason

func (*Socket) StreamMessages

func (s *Socket) StreamMessages(ctx context.Context, callback abstract.CDCMsgFn) error

type WALMessage

type WALMessage struct {
	NextLSN   string         `json:"nextlsn"`
	Timestamp typeutils.Time `json:"timestamp"`
	Change    []struct {
		Kind         string        `json:"kind"`
		Schema       string        `json:"schema"`
		Table        string        `json:"table"`
		Columnnames  []string      `json:"columnnames"`
		Columntypes  []string      `json:"columntypes"`
		Columnvalues []interface{} `json:"columnvalues"`
		Oldkeys      struct {
			Keynames  []string      `json:"keynames"`
			Keytypes  []string      `json:"keytypes"`
			Keyvalues []interface{} `json:"keyvalues"`
		} `json:"oldkeys"`
	} `json:"change"`
}

type WALState

type WALState struct {
	LSN string `json:"lsn"`
}

func (*WALState) IsEmpty

func (s *WALState) IsEmpty() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL