wal

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BucketAssigner

type BucketAssigner interface {
	AssignBuckets(ctx context.Context, table string, recordID string, operation synchro.Operation, data map[string]any) ([]string, error)
}

BucketAssigner resolves which buckets a changed record belongs to.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reads from a PostgreSQL logical replication slot and writes changelog entries for each captured change.

func NewConsumer

func NewConsumer(cfg ConsumerConfig) *Consumer

NewConsumer creates a new WAL consumer.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start begins consuming WAL events. Blocks until ctx is cancelled.

type ConsumerConfig

type ConsumerConfig struct {
	// ConnString is the PostgreSQL connection string with replication=database.
	ConnString string

	// SlotName is the replication slot name.
	SlotName string

	// PublicationName is the publication to subscribe to.
	PublicationName string

	// Registry is used to look up table configurations.
	Registry *synchro.Registry

	// Assigner determines bucket IDs for changed records.
	Assigner BucketAssigner

	// ChangelogDB writes entries to sync sidecar tables and persists WAL position.
	ChangelogDB synchro.DB

	// Logger for consumer events.
	Logger *slog.Logger

	// StandbyTimeout is how often to send standby status updates.
	// Defaults to 10 seconds.
	StandbyTimeout time.Duration
}

ConsumerConfig configures the WAL consumer.

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

Decoder decodes pgoutput protocol messages into WALEvents.

func NewDecoder

func NewDecoder(registry *synchro.Registry) *Decoder

NewDecoder creates a new WAL message decoder.

func (*Decoder) Decode

func (d *Decoder) Decode(walData []byte) ([]WALEvent, error)

Decode processes a raw WAL message and returns any sync-relevant events.

type Position

type Position struct {
	// contains filtered or unexported fields
}

Position tracks the confirmed LSN with persistent storage for crash recovery.

func NewPosition

func NewPosition(db synchro.DB, slotName string) *Position

NewPosition creates a new Position tracker backed by persistent storage. On first call, loads the last confirmed LSN from sync_wal_position.

func (*Position) Confirmed

func (p *Position) Confirmed() pglogrepl.LSN

Confirmed returns the last confirmed LSN.

func (*Position) Load

func (p *Position) Load(ctx context.Context) (pglogrepl.LSN, error)

Load reads the persisted LSN from the database. Returns 0 if no row exists.

func (*Position) SetConfirmed

func (p *Position) SetConfirmed(ctx context.Context, lsn pglogrepl.LSN) error

SetConfirmed persists the confirmed LSN and only then updates the in-memory value.

type WALEvent

type WALEvent struct {
	TableName string
	RecordID  string
	Operation synchro.Operation
	Data      map[string]any
}

WALEvent represents a decoded change from the WAL.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL